2 * Asterisk -- An open source telephony toolkit.
4 * Copyright (C) 2012-2013, Digium, Inc.
6 * Mark Michelson <mmmichelson@digium.com>
8 * See http://www.asterisk.org for more information about
9 * the Asterisk project. Please do not directly contact
10 * any of the maintainers of this project for assistance;
11 * the project provides a web site, mailing lists and IRC
12 * channels for your use.
14 * This program is free software, distributed under the terms of
15 * the GNU General Public License Version 2. See the LICENSE file
16 * at the top of the source tree.
22 #include "asterisk/threadpool.h"
23 #include "asterisk/taskprocessor.h"
24 #include "asterisk/astobj2.h"
25 #include "asterisk/utils.h"
27 /* Needs to stay prime if increased */
28 #define THREAD_BUCKETS 89
31 * \brief An opaque threadpool structure
33 * A threadpool is a collection of threads that execute
34 * tasks from a common queue.
36 struct ast_threadpool {
37 /*! Threadpool listener */
38 struct ast_threadpool_listener *listener;
40 * \brief The container of active threads.
41 * Active threads are those that are currently running tasks
43 struct ao2_container *active_threads;
45 * \brief The container of idle threads.
46 * Idle threads are those that are currenly waiting to run tasks
48 struct ao2_container *idle_threads;
50 * \brief The container of zombie threads.
51 * Zombie threads may be running tasks, but they are scheduled to die soon
53 struct ao2_container *zombie_threads;
55 * \brief The main taskprocessor
57 * Tasks that are queued in this taskprocessor are
58 * doled out to the worker threads. Worker threads that
59 * execute tasks from the threadpool are executing tasks
60 * in this taskprocessor.
62 * The threadpool itself is actually the private data for
63 * this taskprocessor's listener. This way, as taskprocessor
64 * changes occur, the threadpool can alert its listeners
67 struct ast_taskprocessor *tps;
69 * \brief The control taskprocessor
71 * This is a standard taskprocessor that uses the default
72 * taskprocessor listener. In other words, all tasks queued to
73 * this taskprocessor have a single thread that executes the
76 * All tasks that modify the state of the threadpool and all tasks
77 * that call out to threadpool listeners are pushed to this
80 * For instance, when the threadpool changes sizes, a task is put
81 * into this taskprocessor to do so. When it comes time to tell the
82 * threadpool listener that worker threads have changed state,
83 * the task is placed in this taskprocessor.
85 * This is done for three main reasons
86 * 1) It ensures that listeners are given an accurate portrayal
87 * of the threadpool's current state. In other words, when a listener
88 * gets told a count of active, idle and zombie threads, it does not
89 * need to worry that internal state of the threadpool might be different
90 * from what it has been told.
91 * 2) It minimizes the locking required in both the threadpool and in
92 * threadpool listener's callbacks.
93 * 3) It ensures that listener callbacks are called in the same order
94 * that the threadpool had its state change.
96 struct ast_taskprocessor *control_tps;
97 /*! True if the threadpool is in the process of shutting down */
99 /*! Threadpool-specific options */
100 struct ast_threadpool_options options;
104 * \brief listener for a threadpool
106 * The listener is notified of changes in a threadpool. It can
107 * react by doing things like increasing the number of threads
110 struct ast_threadpool_listener {
111 /*! Callbacks called by the threadpool */
112 const struct ast_threadpool_listener_callbacks *callbacks;
113 /*! User data for the listener */
118 * \brief states for worker threads
121 /*! The worker is either active or idle */
124 * The worker has been asked to shut down but
125 * may still be in the process of executing tasks.
126 * This transition happens when the threadpool needs
127 * to shrink and needs to kill active threads in order
132 * The worker has been asked to shut down. Typically
133 * only idle threads go to this state directly, but
134 * active threads may go straight to this state when
135 * the threadpool is shut down.
141 * A thread that executes threadpool tasks
143 struct worker_thread {
144 /*! A unique (within a run of Asterisk) ID for the thread. Used for hashing and searching */
146 /*! Condition used in conjunction with state changes */
148 /*! Lock used alongside the condition for state changes */
150 /*! The actual thread that is executing tasks */
152 /*! A pointer to the threadpool. Needed to be able to execute tasks */
153 struct ast_threadpool *pool;
154 /*! The current state of the worker thread */
155 enum worker_state state;
156 /*! A boolean used to determine if an idle thread should become active */
158 /*! Options for this threadpool */
159 struct ast_threadpool_options options;
162 /* Worker thread forward declarations. See definitions for documentation */
163 static int worker_thread_hash(const void *obj, int flags);
164 static int worker_thread_cmp(void *obj, void *arg, int flags);
165 static void worker_thread_destroy(void *obj);
166 static void worker_active(struct worker_thread *worker);
167 static void *worker_start(void *arg);
168 static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool);
169 static int worker_thread_start(struct worker_thread *worker);
170 static int worker_idle(struct worker_thread *worker);
171 static void worker_set_state(struct worker_thread *worker, enum worker_state state);
172 static void worker_shutdown(struct worker_thread *worker);
175 * \brief Notify the threadpool listener that the state has changed.
177 * This notifies the threadpool listener via its state_changed callback.
178 * \param pool The threadpool whose state has changed
180 static void threadpool_send_state_changed(struct ast_threadpool *pool)
182 int active_size = ao2_container_count(pool->active_threads);
183 int idle_size = ao2_container_count(pool->idle_threads);
185 if (pool->listener && pool->listener->callbacks->state_changed) {
186 pool->listener->callbacks->state_changed(pool, pool->listener, active_size, idle_size);
191 * \brief Struct used for queued operations involving worker state changes
193 struct thread_worker_pair {
194 /*! Threadpool that contains the worker whose state has changed */
195 struct ast_threadpool *pool;
196 /*! Worker whose state has changed */
197 struct worker_thread *worker;
201 * \brief Destructor for thread_worker_pair
203 static void thread_worker_pair_destructor(void *obj)
205 struct thread_worker_pair *pair = obj;
206 ao2_ref(pair->worker, -1);
210 * \brief Allocate and initialize a thread_worker_pair
211 * \param pool Threadpool to assign to the thread_worker_pair
212 * \param worker Worker thread to assign to the thread_worker_pair
214 static struct thread_worker_pair *thread_worker_pair_alloc(struct ast_threadpool *pool,
215 struct worker_thread *worker)
217 struct thread_worker_pair *pair = ao2_alloc(sizeof(*pair), thread_worker_pair_destructor);
223 pair->worker = worker;
228 * \brief Move a worker thread from the active container to the idle container.
230 * This function is called from the threadpool's control taskprocessor thread.
231 * \param data A thread_worker_pair containing the threadpool and the worker to move.
234 static int queued_active_thread_idle(void *data)
236 struct thread_worker_pair *pair = data;
238 ao2_link(pair->pool->idle_threads, pair->worker);
239 ao2_unlink(pair->pool->active_threads, pair->worker);
241 threadpool_send_state_changed(pair->pool);
248 * \brief Queue a task to move a thread from the active list to the idle list
250 * This is called by a worker thread when it runs out of tasks to perform and
252 * \param pool The threadpool to which the worker belongs
253 * \param worker The worker thread that has gone idle
255 static void threadpool_active_thread_idle(struct ast_threadpool *pool,
256 struct worker_thread *worker)
258 struct thread_worker_pair *pair;
259 SCOPED_AO2LOCK(lock, pool);
260 if (pool->shutting_down) {
263 pair = thread_worker_pair_alloc(pool, worker);
267 ast_taskprocessor_push(pool->control_tps, queued_active_thread_idle, pair);
271 * \brief Kill a zombie thread
273 * This runs from the threadpool's control taskprocessor thread.
275 * \param data A thread_worker_pair containing the threadpool and the zombie thread
278 static int queued_zombie_thread_dead(void *data)
280 struct thread_worker_pair *pair = data;
282 ao2_unlink(pair->pool->zombie_threads, pair->worker);
283 threadpool_send_state_changed(pair->pool);
290 * \brief Queue a task to kill a zombie thread
292 * This is called by a worker thread when it acknowledges that it is time for
295 static void threadpool_zombie_thread_dead(struct ast_threadpool *pool,
296 struct worker_thread *worker)
298 struct thread_worker_pair *pair;
299 SCOPED_AO2LOCK(lock, pool);
300 if (pool->shutting_down) {
303 pair = thread_worker_pair_alloc(pool, worker);
307 ast_taskprocessor_push(pool->control_tps, queued_zombie_thread_dead, pair);
310 static int queued_idle_thread_dead(void *data)
312 struct thread_worker_pair *pair = data;
314 ao2_unlink(pair->pool->idle_threads, pair->worker);
315 threadpool_send_state_changed(pair->pool);
321 static void threadpool_idle_thread_dead(struct ast_threadpool *pool,
322 struct worker_thread *worker)
324 struct thread_worker_pair *pair;
325 SCOPED_AO2LOCK(lock, pool);
326 if (pool->shutting_down) {
329 pair = thread_worker_pair_alloc(pool, worker);
333 ast_taskprocessor_push(pool->control_tps, queued_idle_thread_dead, pair);
337 * \brief Execute a task in the threadpool
339 * This is the function that worker threads call in order to execute tasks
342 * \param pool The pool to which the tasks belong.
343 * \retval 0 Either the pool has been shut down or there are no tasks.
344 * \retval 1 There are still tasks remaining in the pool.
346 static int threadpool_execute(struct ast_threadpool *pool)
349 if (!pool->shutting_down) {
351 return ast_taskprocessor_execute(pool->tps);
358 * \brief Destroy a threadpool's components.
360 * This is the destructor called automatically when the threadpool's
361 * reference count reaches zero. This is not to be confused with
362 * threadpool_destroy.
364 * By the time this actually gets called, most of the cleanup has already
365 * been done in the pool. The only thing left to do is to release the
366 * final reference to the threadpool listener.
368 * \param obj The pool to destroy
370 static void threadpool_destructor(void *obj)
372 struct ast_threadpool *pool = obj;
373 ao2_cleanup(pool->listener);
377 * \brief Allocate a threadpool
379 * This is implemented as a taskprocessor listener's alloc callback. This
380 * is because the threadpool exists as the private data on a taskprocessor
383 * \param name The name of the threadpool.
384 * \param options The options the threadpool uses.
385 * \retval NULL Could not initialize threadpool properly
386 * \retval non-NULL The newly-allocated threadpool
388 static void *threadpool_alloc(const char *name, const struct ast_threadpool_options *options)
390 RAII_VAR(struct ast_threadpool *, pool,
391 ao2_alloc(sizeof(*pool), threadpool_destructor), ao2_cleanup);
392 struct ast_str *control_tps_name = ast_str_create(64);
394 if (!control_tps_name) {
398 ast_str_set(&control_tps_name, 0, "%s-control", name);
400 pool->control_tps = ast_taskprocessor_get(ast_str_buffer(control_tps_name), TPS_REF_DEFAULT);
401 ast_free(control_tps_name);
402 if (!pool->control_tps) {
405 pool->active_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
406 if (!pool->active_threads) {
409 pool->idle_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
410 if (!pool->idle_threads) {
413 pool->zombie_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
414 if (!pool->zombie_threads) {
417 pool->options = *options;
423 static int threadpool_tps_start(struct ast_taskprocessor_listener *listener)
429 * \brief helper used for queued task when tasks are pushed
431 struct task_pushed_data {
432 /*! Pool into which a task was pushed */
433 struct ast_threadpool *pool;
434 /*! Indicator of whether the pool had no tasks prior to the new task being added */
439 * \brief Allocate and initialize a task_pushed_data
440 * \param pool The threadpool to set in the task_pushed_data
441 * \param was_empty The was_empty value to set in the task_pushed_data
442 * \retval NULL Unable to allocate task_pushed_data
443 * \retval non-NULL The newly-allocated task_pushed_data
445 static struct task_pushed_data *task_pushed_data_alloc(struct ast_threadpool *pool,
448 struct task_pushed_data *tpd = ao2_alloc(sizeof(*tpd), NULL);
454 tpd->was_empty = was_empty;
459 * \brief Activate idle threads
461 * This function always returns CMP_MATCH because all workers that this
462 * function acts on need to be seen as matches so they are unlinked from the
463 * list of idle threads.
465 * Called as an ao2_callback in the threadpool's control taskprocessor thread.
466 * \param obj The worker to activate
467 * \param arg The pool where the worker belongs
470 static int activate_thread(void *obj, void *arg, int flags)
472 struct worker_thread *worker = obj;
473 struct ast_threadpool *pool = arg;
475 if (!ao2_link(pool->active_threads, worker)) {
476 /* If we can't link the idle thread into the active container, then
477 * we'll just leave the thread idle and not wake it up.
479 ast_log(LOG_WARNING, "Failed to activate thread %d. Remaining idle\n",
483 worker_set_state(worker, ALIVE);
488 * \brief Add threads to the threadpool
490 * This function is called from the threadpool's control taskprocessor thread.
491 * \param pool The pool that is expanding
492 * \delta The number of threads to add to the pool
494 static void grow(struct ast_threadpool *pool, int delta)
498 int current_size = ao2_container_count(pool->active_threads) +
499 ao2_container_count(pool->idle_threads);
501 if (pool->options.max_size && current_size + delta > pool->options.max_size) {
502 delta = pool->options.max_size - current_size;
505 ast_debug(3, "Increasing threadpool %s's size by %d\n",
506 ast_taskprocessor_name(pool->tps), delta);
508 for (i = 0; i < delta; ++i) {
509 struct worker_thread *worker = worker_thread_alloc(pool);
513 if (ao2_link(pool->active_threads, worker)) {
514 if (worker_thread_start(worker)) {
515 ast_log(LOG_ERROR, "Unable to start worker thread %d. Destroying.\n", worker->id);
516 ao2_unlink(pool->active_threads, worker);
519 ast_log(LOG_WARNING, "Failed to activate worker thread %d. Destroying.\n", worker->id);
526 * \brief Queued task called when tasks are pushed into the threadpool
528 * This function first calls into the threadpool's listener to let it know
529 * that a task has been pushed. It then wakes up all idle threads and moves
530 * them into the active thread container.
531 * \param data A task_pushed_data
534 static int queued_task_pushed(void *data)
536 struct task_pushed_data *tpd = data;
537 struct ast_threadpool *pool = tpd->pool;
538 int was_empty = tpd->was_empty;
541 if (pool->listener && pool->listener->callbacks->task_pushed) {
542 pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty);
544 if (ao2_container_count(pool->idle_threads) == 0) {
545 if (pool->options.auto_increment > 0) {
546 grow(pool, pool->options.auto_increment);
550 ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
551 activate_thread, pool);
555 threadpool_send_state_changed(pool);
562 * \brief Taskprocessor listener callback called when a task is added
564 * The threadpool uses this opportunity to queue a task on its control taskprocessor
565 * in order to activate idle threads and notify the threadpool listener that the
566 * task has been pushed.
567 * \param listener The taskprocessor listener. The threadpool is the listener's private data
568 * \param was_empty True if the taskprocessor was empty prior to the task being pushed
570 static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener,
573 struct ast_threadpool *pool = ast_taskprocessor_listener_get_user_data(listener);
574 struct task_pushed_data *tpd;
575 SCOPED_AO2LOCK(lock, pool);
577 if (pool->shutting_down) {
580 tpd = task_pushed_data_alloc(pool, was_empty);
585 ast_taskprocessor_push(pool->control_tps, queued_task_pushed, tpd);
589 * \brief Queued task that handles the case where the threadpool's taskprocessor is emptied
591 * This simply lets the threadpool's listener know that the threadpool is devoid of tasks
592 * \param data The pool that has become empty
595 static int queued_emptied(void *data)
597 struct ast_threadpool *pool = data;
599 /* We already checked for existence of this callback when this was queued */
600 pool->listener->callbacks->emptied(pool, pool->listener);
605 * \brief Taskprocessor listener emptied callback
607 * The threadpool queues a task to let the threadpool listener know that
608 * the threadpool no longer contains any tasks.
609 * \param listener The taskprocessor listener. The threadpool is the listener's private data.
611 static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener)
613 struct ast_threadpool *pool = ast_taskprocessor_listener_get_user_data(listener);
614 SCOPED_AO2LOCK(lock, pool);
616 if (pool->shutting_down) {
620 if (pool->listener && pool->listener->callbacks->emptied) {
621 ast_taskprocessor_push(pool->control_tps, queued_emptied, pool);
626 * \brief Taskprocessor listener shutdown callback
628 * The threadpool will shut down and destroy all of its worker threads when
629 * this is called back. By the time this gets called, the taskprocessor's
630 * control taskprocessor has already been destroyed. Therefore there is no risk
631 * in outright destroying the worker threads here.
632 * \param listener The taskprocessor listener. The threadpool is the listener's private data.
634 static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener)
636 struct ast_threadpool *pool = ast_taskprocessor_listener_get_user_data(listener);
638 if (pool->listener && pool->listener->callbacks->shutdown) {
639 pool->listener->callbacks->shutdown(pool->listener);
641 ao2_cleanup(pool->active_threads);
642 ao2_cleanup(pool->idle_threads);
643 ao2_cleanup(pool->zombie_threads);
648 * \brief Table of taskprocessor listener callbacks for threadpool's main taskprocessor
650 static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks = {
651 .start = threadpool_tps_start,
652 .task_pushed = threadpool_tps_task_pushed,
653 .emptied = threadpool_tps_emptied,
654 .shutdown = threadpool_tps_shutdown,
658 * \brief ao2 callback to kill a set number of threads.
660 * Threads will be unlinked from the container as long as the
661 * counter has not reached zero. The counter is decremented with
662 * each thread that is removed.
663 * \param obj The worker thread up for possible destruction
664 * \param arg The counter
665 * \param flags Unused
666 * \retval CMP_MATCH The counter has not reached zero, so this flag should be removed.
667 * \retval CMP_STOP The counter has reached zero so no more threads should be removed.
669 static int kill_threads(void *obj, void *arg, int flags)
671 int *num_to_kill = arg;
673 if (*num_to_kill > 0) {
682 * \brief ao2 callback to zombify a set number of threads.
684 * Threads will be zombified as long as as the counter has not reached
685 * zero. The counter is decremented with each thread that is zombified.
687 * Zombifying a thread involves removing it from its current container,
688 * adding it to the zombie container, and changing the state of the
691 * This callback is called from the threadpool control taskprocessor thread.
693 * \param obj The worker thread that may be zombified
694 * \param arg The pool to which the worker belongs
695 * \param data The counter
696 * \param flags Unused
697 * \retval CMP_MATCH The zombified thread should be removed from its current container
698 * \retval CMP_STOP Stop attempting to zombify threads
700 static int zombify_threads(void *obj, void *arg, void *data, int flags)
702 struct worker_thread *worker = obj;
703 struct ast_threadpool *pool = arg;
704 int *num_to_zombify = data;
706 if ((*num_to_zombify)-- > 0) {
707 if (!ao2_link(pool->zombie_threads, worker)) {
708 ast_log(LOG_WARNING, "Failed to zombify active thread %d. Thread will remain active\n", worker->id);
711 worker_set_state(worker, ZOMBIE);
719 * \brief Remove threads from the threadpool
721 * The preference is to kill idle threads. However, if there are
722 * more threads to remove than there are idle threads, then active
723 * threads will be zombified instead.
725 * This function is called from the threadpool control taskprocessor thread.
727 * \param pool The threadpool to remove threads from
728 * \param delta The number of threads to remove
730 static void shrink(struct ast_threadpool *pool, int delta)
733 * Preference is to kill idle threads, but
734 * we'll move on to deactivating active threads
737 int idle_threads = ao2_container_count(pool->idle_threads);
738 int idle_threads_to_kill = MIN(delta, idle_threads);
739 int active_threads_to_zombify = delta - idle_threads_to_kill;
741 ast_debug(3, "Destroying %d idle threads in threadpool %s\n", idle_threads_to_kill,
742 ast_taskprocessor_name(pool->tps));
744 ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
745 kill_threads, &idle_threads_to_kill);
747 ast_debug(3, "Destroying %d active threads in threadpool %s\n", active_threads_to_zombify,
748 ast_taskprocessor_name(pool->tps));
750 ao2_callback_data(pool->active_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
751 zombify_threads, pool, &active_threads_to_zombify);
755 * \brief Helper struct used for queued operations that change the size of the threadpool
757 struct set_size_data {
758 /*! The pool whose size is to change */
759 struct ast_threadpool *pool;
760 /*! The requested new size of the pool */
765 * \brief Allocate and initialize a set_size_data
766 * \param pool The pool for the set_size_data
767 * \param size The size to store in the set_size_data
769 static struct set_size_data *set_size_data_alloc(struct ast_threadpool *pool,
772 struct set_size_data *ssd = ao2_alloc(sizeof(*ssd), NULL);
783 * \brief Change the size of the threadpool
785 * This can either result in shrinking or growing the threadpool depending
786 * on the new desired size and the current size.
788 * This function is run from the threadpool control taskprocessor thread
790 * \param data A set_size_data used for determining how to act
793 static int queued_set_size(void *data)
795 struct set_size_data *ssd = data;
796 struct ast_threadpool *pool = ssd->pool;
797 unsigned int num_threads = ssd->size;
799 /* We don't count zombie threads as being "live" when potentially resizing */
800 unsigned int current_size = ao2_container_count(pool->active_threads) +
801 ao2_container_count(pool->idle_threads);
803 if (current_size == num_threads) {
804 ast_log(LOG_NOTICE, "Not changing threadpool size since new size %u is the same as current %u\n",
805 num_threads, current_size);
809 if (current_size < num_threads) {
810 grow(pool, num_threads - current_size);
812 shrink(pool, current_size - num_threads);
815 threadpool_send_state_changed(pool);
820 void ast_threadpool_set_size(struct ast_threadpool *pool, unsigned int size)
822 struct set_size_data *ssd;
823 SCOPED_AO2LOCK(lock, pool);
824 if (pool->shutting_down) {
828 ssd = set_size_data_alloc(pool, size);
833 ast_taskprocessor_push(pool->control_tps, queued_set_size, ssd);
836 struct ast_threadpool_listener *ast_threadpool_listener_alloc(
837 const struct ast_threadpool_listener_callbacks *callbacks, void *user_data)
839 struct ast_threadpool_listener *listener = ao2_alloc(sizeof(*listener), NULL);
843 listener->callbacks = callbacks;
844 listener->user_data = user_data;
848 void *ast_threadpool_listener_get_user_data(const struct ast_threadpool_listener *listener)
850 return listener->user_data;
853 struct pool_options_pair {
854 struct ast_threadpool *pool;
855 struct ast_threadpool_options options;
858 struct ast_threadpool *ast_threadpool_create(const char *name,
859 struct ast_threadpool_listener *listener,
860 const struct ast_threadpool_options *options)
862 struct ast_taskprocessor *tps;
863 RAII_VAR(struct ast_taskprocessor_listener *, tps_listener, NULL, ao2_cleanup);
864 RAII_VAR(struct ast_threadpool *, pool, threadpool_alloc(name, options), ao2_cleanup);
870 tps_listener = ast_taskprocessor_listener_alloc(&threadpool_tps_listener_callbacks, pool);
875 if (options->version != AST_THREADPOOL_OPTIONS_VERSION) {
876 ast_log(LOG_WARNING, "Incompatible version of threadpool options in use.\n");
880 tps = ast_taskprocessor_create_with_listener(name, tps_listener);
887 ao2_ref(listener, +1);
888 pool->listener = listener;
890 ast_threadpool_set_size(pool, pool->options.initial_size);
895 int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data)
897 SCOPED_AO2LOCK(lock, pool);
898 if (!pool->shutting_down) {
899 return ast_taskprocessor_push(pool->tps, task, data);
904 void ast_threadpool_shutdown(struct ast_threadpool *pool)
909 /* Shut down the taskprocessors and everything else just
910 * takes care of itself via the taskprocessor callbacks
913 pool->shutting_down = 1;
915 ast_taskprocessor_unreference(pool->control_tps);
916 ast_taskprocessor_unreference(pool->tps);
920 * A monotonically increasing integer used for worker
921 * thread identification.
923 static int worker_id_counter;
925 static int worker_thread_hash(const void *obj, int flags)
927 const struct worker_thread *worker = obj;
932 static int worker_thread_cmp(void *obj, void *arg, int flags)
934 struct worker_thread *worker1 = obj;
935 struct worker_thread *worker2 = arg;
937 return worker1->id == worker2->id ? CMP_MATCH : 0;
941 * \brief shut a worker thread down
943 * Set the worker dead and then wait for its thread
944 * to finish executing.
946 * \param worker The worker thread to shut down
948 static void worker_shutdown(struct worker_thread *worker)
950 worker_set_state(worker, DEAD);
951 if (worker->thread != AST_PTHREADT_NULL) {
952 pthread_join(worker->thread, NULL);
953 worker->thread = AST_PTHREADT_NULL;
958 * \brief Worker thread destructor
960 * Called automatically when refcount reaches 0. Shuts
961 * down the worker thread and destroys its component
964 static void worker_thread_destroy(void *obj)
966 struct worker_thread *worker = obj;
967 ast_debug(3, "Destroying worker thread %d\n", worker->id);
968 worker_shutdown(worker);
969 ast_mutex_destroy(&worker->lock);
970 ast_cond_destroy(&worker->cond);
974 * \brief start point for worker threads
976 * Worker threads start in the active state but may
977 * immediately go idle if there is no work to be
980 * \param arg The worker thread
983 static void *worker_start(void *arg)
985 struct worker_thread *worker = arg;
987 worker_active(worker);
992 * \brief Allocate and initialize a new worker thread
994 * This will create, initialize, and start the thread.
996 * \param pool The threadpool to which the worker will be added
997 * \retval NULL Failed to allocate or start the worker thread
998 * \retval non-NULL The newly-created worker thread
1000 static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool)
1002 struct worker_thread *worker = ao2_alloc(sizeof(*worker), worker_thread_destroy);
1006 worker->id = ast_atomic_fetchadd_int(&worker_id_counter, 1);
1007 ast_mutex_init(&worker->lock);
1008 ast_cond_init(&worker->cond, NULL);
1009 worker->pool = pool;
1010 worker->thread = AST_PTHREADT_NULL;
1011 worker->state = ALIVE;
1012 worker->options = pool->options;
1016 static int worker_thread_start(struct worker_thread *worker)
1018 return ast_pthread_create(&worker->thread, NULL, worker_start, worker);
1022 * \brief Active loop for worker threads
1024 * The worker will stay in this loop for its lifetime,
1025 * executing tasks as they become available. If there
1026 * are no tasks currently available, then the thread
1029 * \param worker The worker thread executing tasks.
1031 static void worker_active(struct worker_thread *worker)
1035 if (!threadpool_execute(worker->pool)) {
1036 alive = worker_idle(worker);
1040 /* Reaching this portion means the thread is
1041 * on death's door. It may have been killed while
1042 * it was idle, in which case it can just die
1043 * peacefully. If it's a zombie, though, then
1044 * it needs to let the pool know so
1045 * that the thread can be removed from the
1046 * list of zombie threads.
1048 if (worker->state == ZOMBIE) {
1049 threadpool_zombie_thread_dead(worker->pool, worker);
1054 * \brief Idle function for worker threads
1056 * The worker waits here until it gets told by the threadpool
1059 * \param worker The idle worker
1060 * \retval 0 The thread is being woken up so that it can conclude.
1061 * \retval non-zero The thread is being woken up to do more work.
1063 static int worker_idle(struct worker_thread *worker)
1065 struct timeval start = ast_tvnow();
1066 struct timespec end = {
1067 .tv_sec = start.tv_sec + worker->options.idle_timeout,
1068 .tv_nsec = start.tv_usec * 1000,
1070 SCOPED_MUTEX(lock, &worker->lock);
1071 if (worker->state != ALIVE) {
1074 threadpool_active_thread_idle(worker->pool, worker);
1075 while (!worker->wake_up) {
1076 if (worker->options.idle_timeout <= 0) {
1077 ast_cond_wait(&worker->cond, lock);
1078 } else if (ast_cond_timedwait(&worker->cond, lock, &end) == ETIMEDOUT) {
1083 if (!worker->wake_up) {
1084 ast_debug(1, "Worker thread idle timeout reached. Dying.\n");
1085 threadpool_idle_thread_dead(worker->pool, worker);
1086 worker->state = DEAD;
1088 worker->wake_up = 0;
1089 return worker->state == ALIVE;
1093 * \brief Change a worker's state
1095 * The threadpool calls into this function in order to let a worker know
1096 * how it should proceed.
1098 static void worker_set_state(struct worker_thread *worker, enum worker_state state)
1100 SCOPED_MUTEX(lock, &worker->lock);
1101 worker->state = state;
1102 worker->wake_up = 1;
1103 ast_cond_signal(&worker->cond);
1107 struct ast_threadpool *pool;
1110 static void serializer_dtor(void *obj)
1112 struct serializer *ser = obj;
1113 ao2_cleanup(ser->pool);
1117 static struct serializer *serializer_create(struct ast_threadpool *pool)
1119 struct serializer *ser = ao2_alloc(sizeof(*ser), serializer_dtor);
1128 static int execute_tasks(void *data)
1130 struct ast_taskprocessor *tps = data;
1132 while (ast_taskprocessor_execute(tps)) {
1136 ast_taskprocessor_unreference(tps);
1140 static void serializer_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
1143 struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener);
1144 struct ast_taskprocessor *tps = ast_taskprocessor_listener_get_tps(listener);
1146 ast_threadpool_push(ser->pool, execute_tasks, tps);
1150 static int serializer_start(struct ast_taskprocessor_listener *listener)
1156 static void serializer_shutdown(struct ast_taskprocessor_listener *listener)
1158 struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener);
1162 static struct ast_taskprocessor_listener_callbacks serializer_tps_listener_callbacks = {
1163 .task_pushed = serializer_task_pushed,
1164 .start = serializer_start,
1165 .shutdown = serializer_shutdown,
1168 struct ast_taskprocessor *ast_threadpool_serializer(const char *name, struct ast_threadpool *pool)
1170 RAII_VAR(struct serializer *, ser, NULL, ao2_cleanup);
1171 RAII_VAR(struct ast_taskprocessor_listener *, listener, NULL, ao2_cleanup);
1172 struct ast_taskprocessor *tps = NULL;
1174 ser = serializer_create(pool);
1179 listener = ast_taskprocessor_listener_alloc(&serializer_tps_listener_callbacks, ser);
1183 ser = NULL; /* ownership transferred to listener */
1185 tps = ast_taskprocessor_create_with_listener(name, listener);
1189 listener = NULL; /* ownership transferred to tps */