2 * Asterisk -- An open source telephony toolkit.
4 * Copyright (C) 2012, Digium, Inc.
6 * Mark Michelson <mmmichelson@digium.com>
8 * See http://www.asterisk.org for more information about
9 * the Asterisk project. Please do not directly contact
10 * any of the maintainers of this project for assistance;
11 * the project provides a web site, mailing lists and IRC
12 * channels for your use.
14 * This program is free software, distributed under the terms of
15 * the GNU General Public License Version 2. See the LICENSE file
16 * at the top of the source tree.
22 #include "asterisk/threadpool.h"
23 #include "asterisk/taskprocessor.h"
24 #include "asterisk/astobj2.h"
25 #include "asterisk/utils.h"
27 #define THREAD_BUCKETS 89
30 * \brief An opaque threadpool structure
32 * A threadpool is a collection of threads that execute
33 * tasks from a common queue.
35 struct ast_threadpool {
36 /*! Threadpool listener */
37 struct ast_threadpool_listener *listener;
39 * \brief The container of active threads.
40 * Active threads are those that are currently running tasks
42 struct ao2_container *active_threads;
44 * \brief The container of idle threads.
45 * Idle threads are those that are currenly waiting to run tasks
47 struct ao2_container *idle_threads;
49 * \brief The main taskprocessor
51 * Tasks that are queued in this taskprocessor are
52 * doled out to the worker threads. Worker threads that
53 * execute tasks from the threadpool are executing tasks
54 * in this taskprocessor.
56 * The threadpool itself is actually the private data for
57 * this taskprocessor's listener. This way, as taskprocessor
58 * changes occur, the threadpool can alert its listeners
61 struct ast_taskprocessor *tps;
63 * \brief The control taskprocessor
65 * This is a standard taskprocessor that uses the default
66 * taskprocessor listener. In other words, all tasks queued to
67 * this taskprocessor have a single thread that executes the
70 * All tasks that modify the state of the threadpool and all tasks
71 * that call out to threadpool listeners are pushed to this
74 * For instance, when the threadpool changes sizes, a task is put
75 * into this taskprocessor to do so. When it comes time to tell the
76 * threadpool listener that worker threads have changed state,
77 * the task is placed in this taskprocessor.
79 * This is done for three main reasons
80 * 1) It ensures that listeners are given an accurate portrayal
81 * of the threadpool's current state. In other words, when a listener
82 * gets told a count of active and idle threads, it does not
83 * need to worry that internal state of the threadpool might be different
84 * from what it has been told.
85 * 2) It minimizes the locking required in both the threadpool and in
86 * threadpool listener's callbacks.
87 * 3) It ensures that listener callbacks are called in the same order
88 * that the threadpool had its state change.
90 struct ast_taskprocessor *control_tps;
94 * \brief states for worker threads
97 /*! The worker is either active or idle */
99 /*! The worker has been asked to shut down. */
103 /* Worker thread forward declarations. See definitions for documentation */
104 struct worker_thread;
105 static int worker_thread_hash(const void *obj, int flags);
106 static int worker_thread_cmp(void *obj, void *arg, int flags);
107 static void worker_thread_destroy(void *obj);
108 static void worker_active(struct worker_thread *worker);
109 static void *worker_start(void *arg);
110 static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool);
111 static int worker_idle(struct worker_thread *worker);
112 static void worker_set_state(struct worker_thread *worker, enum worker_state state);
113 static void worker_shutdown(struct worker_thread *worker);
116 * \brief Notify the threadpool listener that the state has changed.
118 * This notifies the threadpool listener via its state_changed callback.
119 * \param pool The threadpool whose state has changed
121 static void threadpool_send_state_changed(struct ast_threadpool *pool)
123 int active_size = ao2_container_count(pool->active_threads);
124 int idle_size = ao2_container_count(pool->idle_threads);
126 pool->listener->callbacks->state_changed(pool, pool->listener, active_size, idle_size);
130 * \brief Struct used for queued operations involving worker state changes
132 struct thread_worker_pair {
133 /*! Threadpool that contains the worker whose state has changed */
134 struct ast_threadpool *pool;
135 /*! Worker whose state has changed */
136 struct worker_thread *worker;
140 * \brief Destructor for thread_worker_pair
142 static void thread_worker_pair_destructor(void *obj)
144 struct thread_worker_pair *pair = obj;
145 ao2_ref(pair->worker, -1);
149 * \brief Allocate and initialize a thread_worker_pair
150 * \param pool Threadpool to assign to the thread_worker_pair
151 * \param worker Worker thread to assign to the thread_worker_pair
153 static struct thread_worker_pair *thread_worker_pair_alloc(struct ast_threadpool *pool,
154 struct worker_thread *worker)
156 struct thread_worker_pair *pair = ao2_alloc(sizeof(*pair), thread_worker_pair_destructor);
162 pair->worker = worker;
167 * \brief Move a worker thread from the active container to the idle container.
169 * This function is called from the threadpool's control taskprocessor thread.
170 * \param data A thread_worker_pair containing the threadpool and the worker to move.
173 static int queued_active_thread_idle(void *data)
175 struct thread_worker_pair *pair = data;
177 ao2_link(pair->pool->idle_threads, pair->worker);
178 ao2_unlink(pair->pool->active_threads, pair->worker);
180 threadpool_send_state_changed(pair->pool);
187 * \brief Queue a task to move a thread from the active list to the idle list
189 * This is called by a worker thread when it runs out of tasks to perform and
191 * \param pool The threadpool to which the worker belongs
192 * \param worker The worker thread that has gone idle
194 static void threadpool_active_thread_idle(struct ast_threadpool *pool,
195 struct worker_thread *worker)
197 struct thread_worker_pair *pair = thread_worker_pair_alloc(pool, worker);
201 ast_taskprocessor_push(pool->control_tps, queued_active_thread_idle, pair);
205 * \brief Execute a task in the threadpool
207 * This is the function that worker threads call in order to execute tasks
210 * \param pool The pool to which the tasks belong.
211 * \retval 0 Either the pool has been shut down or there are no tasks.
212 * \retval 1 There are still tasks remaining in the pool.
214 static int threadpool_execute(struct ast_threadpool *pool)
216 return ast_taskprocessor_execute(pool->tps);
220 * \brief Destroy a threadpool's components.
222 * This is the destructor called automatically when the threadpool's
223 * reference count reaches zero. This is not to be confused with
224 * threadpool_destroy.
226 * By the time this actually gets called, most of the cleanup has already
227 * been done in the pool. The only thing left to do is to release the
228 * final reference to the threadpool listener.
230 * \param obj The pool to destroy
232 static void threadpool_destructor(void *obj)
234 struct ast_threadpool *pool = obj;
235 ao2_cleanup(pool->listener);
239 * \brief Allocate a threadpool
241 * This is implemented as a taskprocessor listener's alloc callback. This
242 * is because the threadpool exists as the private data on a taskprocessor
245 * \param listener The taskprocessor listener where the threadpool will live.
246 * \retval NULL Could not initialize threadpool properly
247 * \retval non-NULL The newly-allocated threadpool
249 static void *threadpool_alloc(struct ast_taskprocessor_listener *listener)
251 RAII_VAR(struct ast_threadpool *, pool,
252 ao2_alloc(sizeof(*pool), threadpool_destructor), ao2_cleanup);
254 pool->control_tps = ast_taskprocessor_get("CHANGE THIS", TPS_REF_DEFAULT);
255 if (!pool->control_tps) {
258 pool->active_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
259 if (!pool->active_threads) {
262 pool->idle_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
263 if (!pool->idle_threads) {
267 pool->tps = listener->tps;
274 * \brief helper used for queued task when tasks are pushed
276 struct task_pushed_data {
277 /*! Pool into which a task was pushed */
278 struct ast_threadpool *pool;
279 /*! Indicator of whether the pool had no tasks prior to the new task being added */
284 * \brief Allocate and initialize a task_pushed_data
285 * \param pool The threadpool to set in the task_pushed_data
286 * \param was_empty The was_empty value to set in the task_pushed_data
287 * \retval NULL Unable to allocate task_pushed_data
288 * \retval non-NULL The newly-allocated task_pushed_data
290 static struct task_pushed_data *task_pushed_data_alloc(struct ast_threadpool *pool,
293 struct task_pushed_data *tpd = ao2_alloc(sizeof(*tpd), NULL);
299 tpd->was_empty = was_empty;
304 * \brief Activate idle threads
306 * This function always returns CMP_MATCH because all threads that this
307 * function acts on need to be seen as matches so they are unlinked from the
308 * list of idle threads.
310 * Called as an ao2_callback in the threadpool's control taskprocessor thread.
311 * \param obj The worker to activate
312 * \param arg The pool where the worker belongs
315 static int activate_threads(void *obj, void *arg, int flags)
317 struct worker_thread *worker = obj;
318 struct ast_threadpool *pool = arg;
320 ao2_link(pool->active_threads, worker);
321 worker_set_state(worker, ALIVE);
326 * \brief Queue task called when tasks are pushed into the threadpool
328 * This function first calls into the threadpool's listener to let it know
329 * that a task has been pushed. It then wakes up all idle threads and moves
330 * them into the active thread container.
331 * \param data A task_pushed_data
334 static int handle_task_pushed(void *data)
336 struct task_pushed_data *tpd = data;
337 struct ast_threadpool *pool = tpd->pool;
338 int was_empty = tpd->was_empty;
340 pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty);
341 ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
342 activate_threads, pool);
348 * \brief Taskprocessor listener callback called when a task is added
350 * The threadpool uses this opportunity to queue a task on its control taskprocessor
351 * in order to activate idle threads and notify the threadpool listener that the
352 * task has been pushed.
353 * \param listener The taskprocessor listener. The threadpool is the listener's private data
354 * \param was_empty True if the taskprocessor was empty prior to the task being pushed
356 static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener,
359 struct ast_threadpool *pool = listener->private_data;
360 struct task_pushed_data *tpd = task_pushed_data_alloc(pool, was_empty);
366 ast_taskprocessor_push(pool->control_tps, handle_task_pushed, tpd);
370 * \brief Queued task that handles the case where the threadpool's taskprocessor is emptied
372 * This simply lets the threadpool's listener know that the threadpool is devoid of tasks
373 * \param data The pool that has become empty
376 static int handle_emptied(void *data)
378 struct ast_threadpool *pool = data;
380 pool->listener->callbacks->emptied(pool, pool->listener);
385 * \brief Taskprocessor listener emptied callback
387 * The threadpool queues a task to let the threadpool listener know that
388 * the threadpool no longer contains any tasks.
389 * \param listener The taskprocessor listener. The threadpool is the listener's private data.
391 static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener)
393 struct ast_threadpool *pool = listener->private_data;
395 ast_taskprocessor_push(pool->control_tps, handle_emptied, pool);
399 * \brief Taskprocessor listener shutdown callback
401 * The threadpool will shut down and destroy all of its worker threads when
402 * this is called back. By the time this gets called, the taskprocessor's
403 * control taskprocessor has already been destroyed. Therefore there is no risk
404 * in outright destroying the worker threads here.
405 * \param listener The taskprocessor listener. The threadpool is the listener's private data.
407 static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener)
409 struct ast_threadpool *pool = listener->private_data;
411 ao2_cleanup(pool->active_threads);
412 ao2_cleanup(pool->idle_threads);
416 * \brief Taskprocessor listener destroy callback
418 * Since the threadpool is an ao2 object, all that is necessary is to
419 * decrease the refcount. Since the control taskprocessor should already
420 * be destroyed by this point, this should be the final reference to the
423 * \param private_data The threadpool to destroy
425 static void threadpool_destroy(void *private_data)
427 struct ast_threadpool *pool = private_data;
432 * \brief Table of taskprocessor listener callbacks for threadpool's main taskprocessor
434 static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks = {
435 .alloc = threadpool_alloc,
436 .task_pushed = threadpool_tps_task_pushed,
437 .emptied = threadpool_tps_emptied,
438 .shutdown = threadpool_tps_shutdown,
439 .destroy = threadpool_destroy,
443 * \brief Add threads to the threadpool
445 * This function is called from the threadpool's control taskprocessor thread.
446 * \param pool The pool that is expanding
447 * \delta The number of threads to add to the pool
449 static void grow(struct ast_threadpool *pool, int delta)
452 for (i = 0; i < delta; ++i) {
453 struct worker_thread *worker = worker_thread_alloc(pool);
457 ao2_link(pool->active_threads, worker);
462 * \brief ao2 callback to kill a set number of threads.
464 * Threads will be unlinked from the container as long as the
465 * counter has not reached zero. The counter is decremented with
466 * each thread that is removed.
467 * \param obj The worker thread up for possible destruction
468 * \param arg The counter
469 * \param flags Unused
470 * \retval CMP_MATCH The counter has not reached zero, so this flag should be removed.
471 * \retval CMP_STOP The counter has reached zero so no more threads should be removed.
473 static int kill_threads(void *obj, void *arg, int flags)
475 int *num_to_kill = arg;
477 if ((*num_to_kill)-- > 0) {
485 * \brief Remove threads from the threadpool
487 * The preference is to kill idle threads. However, if there are
488 * more threads to remove than there are idle threads, then active
489 * threads will be removed too.
491 * This function is called from the threadpool control taskprocessor thread.
493 * \param pool The threadpool to remove threads from
494 * \param delta The number of threads to remove
496 static void shrink(struct ast_threadpool *pool, int delta)
498 int idle_threads = ao2_container_count(pool->idle_threads);
499 int idle_threads_to_kill = MIN(delta, idle_threads);
500 int active_threads_to_kill = delta - idle_threads_to_kill;
502 ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK,
503 kill_threads, &idle_threads_to_kill);
505 ao2_callback(pool->active_threads, OBJ_UNLINK | OBJ_NOLOCK,
506 kill_threads, &active_threads_to_kill);
510 * \brief Helper struct used for queued operations that change the size of the threadpool
512 struct set_size_data {
513 /*! The pool whose size is to change */
514 struct ast_threadpool *pool;
515 /*! The requested new size of the pool */
520 * \brief Allocate and initialize a set_size_data
521 * \param pool The pool for the set_size_data
522 * \param size The size to store in the set_size_data
524 static struct set_size_data *set_size_data_alloc(struct ast_threadpool *pool,
527 struct set_size_data *ssd = ao2_alloc(sizeof(*ssd), NULL);
538 * \brief Change the size of the threadpool
540 * This can either result in shrinking or growing the threadpool depending
541 * on the new desired size and the current size.
543 * This function is run from the threadpool control taskprocessor thread
545 * \param data A set_size_data used for determining how to act
548 static int queued_set_size(void *data)
550 struct set_size_data *ssd = data;
551 struct ast_threadpool *pool = ssd->pool;
552 unsigned int new_size = ssd->size;
553 unsigned int current_size = ao2_container_count(pool->active_threads) +
554 ao2_container_count(pool->idle_threads);
556 if (current_size == new_size) {
557 ast_log(LOG_NOTICE, "Not changing threadpool size since new size %u is the same as current %u\n",
558 new_size, current_size);
562 if (current_size < new_size) {
563 grow(pool, new_size - current_size);
565 shrink(pool, current_size - new_size);
568 threadpool_send_state_changed(pool);
573 void ast_threadpool_set_size(struct ast_threadpool *pool, unsigned int size)
575 struct set_size_data *ssd;
577 ssd = set_size_data_alloc(pool, size);
582 ast_taskprocessor_push(pool->control_tps, queued_set_size, ssd);
585 static void listener_destructor(void *obj)
587 struct ast_threadpool_listener *listener = obj;
589 listener->callbacks->destroy(listener->private_data);
592 struct ast_threadpool_listener *ast_threadpool_listener_alloc(
593 const struct ast_threadpool_listener_callbacks *callbacks)
595 struct ast_threadpool_listener *listener = ao2_alloc(sizeof(*listener), listener_destructor);
599 listener->callbacks = callbacks;
600 listener->private_data = listener->callbacks->alloc(listener);
601 if (!listener->private_data) {
602 ao2_ref(listener, -1);
608 struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *listener, int initial_size)
610 struct ast_threadpool *pool;
611 struct ast_taskprocessor *tps;
612 RAII_VAR(struct ast_taskprocessor_listener *, tps_listener,
613 ast_taskprocessor_listener_alloc(&threadpool_tps_listener_callbacks),
620 tps = ast_taskprocessor_create_with_listener("XXX CHANGE THIS XXX", tps_listener);
626 pool = tps_listener->private_data;
627 ast_threadpool_set_size(pool, initial_size);
631 int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data)
633 return ast_taskprocessor_push(pool->tps, task, data);
636 void ast_threadpool_shutdown(struct ast_threadpool *pool)
638 /* Shut down the taskprocessors and everything else just
639 * takes care of itself via the taskprocessor callbacks
641 ast_taskprocessor_unreference(pool->control_tps);
642 ast_taskprocessor_unreference(pool->tps);
646 * A thread that executes threadpool tasks
648 struct worker_thread {
649 /*! A unique (within a run of Asterisk) ID for the thread. Used for hashing and searching */
651 /*! Condition used in conjunction with state changes */
653 /*! Lock used alongside the condition for state changes */
655 /*! The actual thread that is executing tasks */
657 /*! A pointer to the threadpool. Needed to be able to execute tasks */
658 struct ast_threadpool *pool;
659 /*! The current state of the worker thread */
660 enum worker_state state;
661 /*! A boolean used to determine if an idle thread should become active */
666 * A monotonically increasing integer used for worker
667 * thread identification.
669 static int worker_id_counter;
671 static int worker_thread_hash(const void *obj, int flags)
673 const struct worker_thread *worker = obj;
678 static int worker_thread_cmp(void *obj, void *arg, int flags)
680 struct worker_thread *worker1 = obj;
681 struct worker_thread *worker2 = arg;
683 return worker1->id == worker2->id ? CMP_MATCH : 0;
687 * \brief shut a worker thread down
689 * Set the worker dead and then wait for its thread
690 * to finish executing.
692 * \param worker The worker thread to shut down
694 static void worker_shutdown(struct worker_thread *worker)
696 worker_set_state(worker, DEAD);
697 if (worker->thread != AST_PTHREADT_NULL) {
698 pthread_join(worker->thread, NULL);
699 worker->thread = AST_PTHREADT_NULL;
704 * \brief Worker thread destructor
706 * Called automatically when refcount reaches 0. Shuts
707 * down the worker thread and destroys its component
710 static void worker_thread_destroy(void *obj)
712 struct worker_thread *worker = obj;
713 worker_shutdown(worker);
714 ast_mutex_destroy(&worker->lock);
715 ast_cond_destroy(&worker->cond);
719 * \brief start point for worker threads
721 * Worker threads start in the active state but may
722 * immediately go idle if there is no work to be
725 * \param arg The worker thread
728 static void *worker_start(void *arg)
730 struct worker_thread *worker = arg;
732 worker_active(worker);
737 * \brief Allocate and initialize a new worker thread
739 * This will create, initialize, and start the thread.
741 * \param pool The threadpool to which the worker will be added
742 * \retval NULL Failed to allocate or start the worker thread
743 * \retval non-NULL The newly-created worker thread
745 static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool)
747 struct worker_thread *worker = ao2_alloc(sizeof(*worker), worker_thread_destroy);
751 worker->id = ast_atomic_fetchadd_int(&worker_id_counter, 1);
752 ast_mutex_init(&worker->lock);
753 ast_cond_init(&worker->cond, NULL);
755 worker->thread = AST_PTHREADT_NULL;
756 worker->state = ALIVE;
757 if (ast_pthread_create(&worker->thread, NULL, worker_start, worker) < 0) {
758 ast_log(LOG_ERROR, "Unable to start worker thread!\n");
766 * \brief Active loop for worker threads
768 * The worker will stay in this loop for its lifetime,
769 * executing tasks as they become available. If there
770 * are no tasks currently available, then the thread
773 * \param worker The worker thread executing tasks.
775 static void worker_active(struct worker_thread *worker)
779 if (threadpool_execute(worker->pool)) {
780 alive = worker_idle(worker);
786 * \brief Idle function for worker threads
788 * The worker waits here until it gets told by the threadpool
791 * \param worker The idle worker
792 * \retval 0 The thread is being woken up so that it can conclude.
793 * \retval non-zero The thread is being woken up to do more work.
795 static int worker_idle(struct worker_thread *worker)
797 SCOPED_MUTEX(lock, &worker->lock);
798 if (worker->state != ALIVE) {
801 threadpool_active_thread_idle(worker->pool, worker);
802 while (!worker->wake_up) {
803 ast_cond_wait(&worker->cond, lock);
806 return worker->state == ALIVE;
810 * \brief Change a worker's state
812 * The threadpool calls into this function in order to let a worker know
813 * how it should proceed.
815 static void worker_set_state(struct worker_thread *worker, enum worker_state state)
817 SCOPED_MUTEX(lock, &worker->lock);
818 worker->state = state;
820 ast_cond_signal(&worker->cond);