Simplify threadpool refcounting a bit.
[asterisk/asterisk.git] / main / threadpool.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2012, Digium, Inc.
5  *
6  * Mark Michelson <mmmichelson@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19
20 #include "asterisk.h"
21
22 #include "asterisk/threadpool.h"
23 #include "asterisk/taskprocessor.h"
24 #include "asterisk/astobj2.h"
25 #include "asterisk/utils.h"
26
27 #define THREAD_BUCKETS 89
28
29 /*!
30  * \brief An opaque threadpool structure
31  *
32  * A threadpool is a collection of threads that execute
33  * tasks from a common queue.
34  */
35 struct ast_threadpool {
36         /*! Threadpool listener */
37         struct ast_threadpool_listener *listener;
38         /*! 
39          * \brief The container of active threads.
40          * Active threads are those that are currently running tasks
41          */
42         struct ao2_container *active_threads;
43         /*! 
44          * \brief The container of idle threads.
45          * Idle threads are those that are currenly waiting to run tasks
46          */
47         struct ao2_container *idle_threads;
48         /*! 
49          * \brief The container of zombie threads.
50          * Zombie threads may be running tasks, but they are scheduled to die soon
51          */
52         struct ao2_container *zombie_threads;
53         /*! 
54          * \brief The main taskprocessor
55          * 
56          * Tasks that are queued in this taskprocessor are
57          * doled out to the worker threads. Worker threads that
58          * execute tasks from the threadpool are executing tasks
59          * in this taskprocessor.
60          *
61          * The threadpool itself is actually the private data for
62          * this taskprocessor's listener. This way, as taskprocessor
63          * changes occur, the threadpool can alert its listeners
64          * appropriately.
65          */
66         struct ast_taskprocessor *tps;
67         /*!
68          * \brief The control taskprocessor
69          *
70          * This is a standard taskprocessor that uses the default
71          * taskprocessor listener. In other words, all tasks queued to
72          * this taskprocessor have a single thread that executes the
73          * tasks.
74          *
75          * All tasks that modify the state of the threadpool and all tasks
76          * that call out to threadpool listeners are pushed to this
77          * taskprocessor.
78          *
79          * For instance, when the threadpool changes sizes, a task is put
80          * into this taskprocessor to do so. When it comes time to tell the
81          * threadpool listener that worker threads have changed state,
82          * the task is placed in this taskprocessor.
83          *
84          * This is done for three main reasons
85          * 1) It ensures that listeners are given an accurate portrayal
86          * of the threadpool's current state. In other words, when a listener
87          * gets told a count of active, idle and zombie threads, it does not
88          * need to worry that internal state of the threadpool might be different
89          * from what it has been told.
90          * 2) It minimizes the locking required in both the threadpool and in
91          * threadpool listener's callbacks.
92          * 3) It ensures that listener callbacks are called in the same order
93          * that the threadpool had its state change.
94          */
95         struct ast_taskprocessor *control_tps;
96 };
97
98 /*!
99  * \brief states for worker threads
100  */
101 enum worker_state {
102         /*! The worker is either active or idle */
103         ALIVE,
104         /*!
105          * The worker has been asked to shut down but
106          * may still be in the process of executing tasks.
107          * This transition happens when the threadpool needs
108          * to shrink and needs to kill active threads in order
109          * to do so.
110          */
111         ZOMBIE,
112         /*!
113          * The worker has been asked to shut down. Typically
114          * only idle threads go to this state directly, but
115          * active threads may go straight to this state when
116          * the threadpool is shut down.
117          */
118         DEAD,
119 };
120
121 /* Worker thread forward declarations. See definitions for documentation */
122 struct worker_thread;
123 static int worker_thread_hash(const void *obj, int flags);
124 static int worker_thread_cmp(void *obj, void *arg, int flags);
125 static void worker_thread_destroy(void *obj);
126 static void worker_active(struct worker_thread *worker);
127 static void *worker_start(void *arg);
128 static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool);
129 static int worker_idle(struct worker_thread *worker);
130 static void worker_set_state(struct worker_thread *worker, enum worker_state state);
131 static int worker_shutdown(void *obj, void *arg, int flags);
132
133 /*!
134  * \brief Notify the threadpool listener that the state has changed.
135  *
136  * This notifies the threadpool listener via its state_changed callback.
137  * \param pool The threadpool whose state has changed
138  */
139 static void threadpool_send_state_changed(struct ast_threadpool *pool)
140 {
141         int active_size = ao2_container_count(pool->active_threads);
142         int idle_size = ao2_container_count(pool->idle_threads);
143         int zombie_size = ao2_container_count(pool->zombie_threads);
144
145         pool->listener->callbacks->state_changed(pool->listener, active_size, idle_size, zombie_size);
146 }
147
148 /*!
149  * \brief Struct used for queued operations involving worker state changes
150  */
151 struct thread_worker_pair {
152         /*! Threadpool that contains the worker whose state has changed */
153         struct ast_threadpool *pool;
154         /*! Worker whose state has changed */
155         struct worker_thread *worker;
156 };
157
158 /*!
159  * \brief Destructor for thread_worker_pair
160  */
161 static void thread_worker_pair_destructor(void *obj)
162 {
163         struct thread_worker_pair *pair = obj;
164         ao2_ref(pair->worker, -1);
165 }
166
167 /*!
168  * \brief Allocate and initialize a thread_worker_pair
169  * \param pool Threadpool to assign to the thread_worker_pair
170  * \param worker Worker thread to assign to the thread_worker_pair
171  */
172 static struct thread_worker_pair *thread_worker_pair_alloc(struct ast_threadpool *pool,
173                 struct worker_thread *worker)
174 {
175         struct thread_worker_pair *pair = ao2_alloc(sizeof(*pair), thread_worker_pair_destructor);
176         if (!pair) {
177                 return NULL;
178         }
179         pair->pool = pool;
180         ao2_ref(worker, +1);
181         pair->worker = worker;
182         return pair;
183 }
184
185 /*!
186  * \brief Move a worker thread from the active container to the idle container.
187  *
188  * This function is called from the threadpool's control taskprocessor thread.
189  * \param data A thread_worker_pair containing the threadpool and the worker to move.
190  * \return 0
191  */
192 static int queued_active_thread_idle(void *data)
193 {
194         struct thread_worker_pair *pair = data;
195
196         ao2_link(pair->pool->idle_threads, pair->worker);
197         ao2_unlink(pair->pool->active_threads, pair->worker);
198
199         threadpool_send_state_changed(pair->pool);
200
201         ao2_ref(pair, -1);
202         return 0;
203 }
204
205 /*!
206  * \brief Queue a task to move a thread from the active list to the idle list
207  *
208  * This is called by a worker thread when it runs out of tasks to perform and
209  * goes idle.
210  * \param pool The threadpool to which the worker belongs
211  * \param worker The worker thread that has gone idle
212  */
213 static void threadpool_active_thread_idle(struct ast_threadpool *pool,
214                 struct worker_thread *worker)
215 {
216         struct thread_worker_pair *pair = thread_worker_pair_alloc(pool, worker);
217         if (!pair) {
218                 return;
219         }
220         ast_taskprocessor_push(pool->control_tps, queued_active_thread_idle, pair);
221 }
222
223 /*!
224  * \brief Kill a zombie thread
225  *
226  * This runs from the threadpool's control taskprocessor thread.
227  *
228  * \param data A thread_worker_pair containing the threadpool and the zombie thread
229  * \return 0
230  */
231 static int queued_zombie_thread_dead(void *data)
232 {
233         struct thread_worker_pair *pair = data;
234
235         ao2_unlink(pair->pool, pair->worker);
236         threadpool_send_state_changed(pair->pool);
237
238         ao2_ref(pair, -1);
239         return 0;
240 }
241
242 /*!
243  * \brief Queue a task to kill a zombie thread
244  *
245  * This is called by a worker thread when it acknowledges that it is time for
246  * it to die.
247  */
248 static void threadpool_zombie_thread_dead(struct ast_threadpool *pool,
249                 struct worker_thread *worker)
250 {
251         struct thread_worker_pair *pair = thread_worker_pair_alloc(pool, worker);
252         if (!pair) {
253                 return;
254         }
255         ast_taskprocessor_push(pool->control_tps, queued_zombie_thread_dead, pair);
256 }
257
258 /*!
259  * \brief Execute a task in the threadpool
260  * 
261  * This is the function that worker threads call in order to execute tasks
262  * in the threadpool
263  *
264  * \param pool The pool to which the tasks belong.
265  * \retval 0 Either the pool has been shut down or there are no tasks.
266  * \retval 1 There are still tasks remaining in the pool.
267  */
268 static int threadpool_execute(struct ast_threadpool *pool)
269 {
270         return ast_taskprocessor_execute(pool->tps);
271 }
272
273 /*!
274  * \brief Destroy a threadpool's components.
275  *
276  * This is the destructor called automatically when the threadpool's
277  * reference count reaches zero. This is not to be confused with
278  * threadpool_destroy.
279  *
280  * By the time this actually gets called, most of the cleanup has already
281  * been done in the pool. The only thing left to do is to release the
282  * final reference to the threadpool listener.
283  *
284  * \param obj The pool to destroy
285  */
286 static void threadpool_destructor(void *obj)
287 {
288         struct ast_threadpool *pool = obj;
289         /* XXX Probably should let the listener know we're being destroyed? */
290
291         /* Threads should all be shut down by now, so this should be a painless
292          * operation
293          */
294         ao2_cleanup(pool->listener);
295 }
296
297 /*
298  * \brief Allocate a threadpool
299  *
300  * This is implemented as a taskprocessor listener's alloc callback. This
301  * is because the threadpool exists as the private data on a taskprocessor
302  * listener.
303  *
304  * \param listener The taskprocessor listener where the threadpool will live.
305  * \retval NULL Could not initialize threadpool properly
306  * \retval non-NULL The newly-allocated threadpool
307  */
308 static void *threadpool_alloc(struct ast_taskprocessor_listener *listener)
309 {
310         RAII_VAR(struct ast_threadpool *, pool,
311                         ao2_alloc(sizeof(*pool), threadpool_destructor), ao2_cleanup);
312
313         pool->control_tps = ast_taskprocessor_get("CHANGE THIS", TPS_REF_DEFAULT);
314         if (!pool->control_tps) {
315                 return NULL;
316         }
317         pool->active_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
318         if (!pool->active_threads) {
319                 return NULL;
320         }
321         pool->idle_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
322         if (!pool->idle_threads) {
323                 return NULL;
324         }
325         pool->zombie_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
326         if (!pool->zombie_threads) {
327                 return NULL;
328         }
329
330         pool->tps = listener->tps;
331
332         ao2_ref(pool, +1);
333         return pool;
334 }
335
336 /*!
337  * \brief helper used for queued task when tasks are pushed
338  */
339 struct task_pushed_data {
340         /*! Pool into which a task was pushed */
341         struct ast_threadpool *pool;
342         /*! Indicator of whether the pool had no tasks prior to the new task being added */
343         int was_empty;
344 };
345
346 /*!
347  * \brief Allocate and initialize a task_pushed_data
348  * \param pool The threadpool to set in the task_pushed_data
349  * \param was_empty The was_empty value to set in the task_pushed_data
350  * \retval NULL Unable to allocate task_pushed_data
351  * \retval non-NULL The newly-allocated task_pushed_data
352  */
353 static struct task_pushed_data *task_pushed_data_alloc(struct ast_threadpool *pool,
354                 int was_empty)
355 {
356         struct task_pushed_data *tpd = ao2_alloc(sizeof(*tpd), NULL);
357
358         if (!tpd) {
359                 return NULL;
360         }
361         tpd->pool = pool;
362         tpd->was_empty = was_empty;
363         return tpd;
364 }
365
366 /*!
367  * \brief Activate idle threads
368  *
369  * This function always returns CMP_MATCH because all threads that this
370  * function acts on need to be seen as matches so they are unlinked from the
371  * list of idle threads.
372  *
373  * Called as an ao2_callback in the threadpool's control taskprocessor thread.
374  * \param obj The worker to activate
375  * \param arg The pool where the worker belongs
376  * \retval CMP_MATCH
377  */
378 static int activate_threads(void *obj, void *arg, int flags)
379 {
380         struct worker_thread *worker = obj;
381         struct ast_threadpool *pool = arg;
382
383         ao2_link(pool->active_threads, worker);
384         worker_set_state(worker, ALIVE);
385         return CMP_MATCH;
386 }
387
388 /*!
389  * \brief Queue task called when tasks are pushed into the threadpool
390  *
391  * This function first calls into the threadpool's listener to let it know
392  * that a task has been pushed. It then wakes up all idle threads and moves
393  * them into the active thread container.
394  * \param data A task_pushed_data
395  * \return 0
396  */
397 static int handle_task_pushed(void *data)
398 {
399         struct task_pushed_data *tpd = data;
400         struct ast_threadpool *pool = tpd->pool;
401         int was_empty = tpd->was_empty;
402
403         pool->listener->callbacks->tps_task_pushed(pool->listener, was_empty);
404         ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
405                         activate_threads, pool);
406         ao2_ref(tpd, -1);
407         return 0;
408 }
409
410 /*!
411  * \brief Taskprocessor listener callback called when a task is added
412  *
413  * The threadpool uses this opportunity to queue a task on its control taskprocessor
414  * in order to activate idle threads and notify the threadpool listener that the
415  * task has been pushed.
416  * \param listener The taskprocessor listener. The threadpool is the listener's private data
417  * \param was_empty True if the taskprocessor was empty prior to the task being pushed
418  */
419 static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener,
420                 int was_empty)
421 {
422         struct ast_threadpool *pool = listener->private_data;
423         struct task_pushed_data *tpd = task_pushed_data_alloc(pool, was_empty);
424
425         if (!tpd) {
426                 return;
427         }
428
429         ast_taskprocessor_push(pool->control_tps, handle_task_pushed, tpd);
430 }
431
432 /*!
433  * \brief Queued task that handles the case where the threadpool's taskprocessor is emptied
434  *
435  * This simply lets the threadpool's listener know that the threadpool is devoid of tasks
436  * \param data The pool that has become empty
437  * \return 0
438  */
439 static int handle_emptied(void *data)
440 {
441         struct ast_threadpool *pool = data;
442
443         pool->listener->callbacks->emptied(pool->listener);
444         return 0;
445 }
446
447 /*!
448  * \brief Taskprocessor listener emptied callback
449  *
450  * The threadpool queues a task to let the threadpool listener know that
451  * the threadpool no longer contains any tasks.
452  * \param listener The taskprocessor listener. The threadpool is the listener's private data.
453  */
454 static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener)
455 {
456         struct ast_threadpool *pool = listener->private_data;
457
458         ast_taskprocessor_push(pool->control_tps, handle_emptied, pool);
459 }
460
461 /*!
462  * \brief Taskprocessor listener shutdown callback
463  *
464  * The threadpool will shut down and destroy all of its worker threads when
465  * this is called back. By the time this gets called, the taskprocessor's
466  * control taskprocessor has already been destroyed. Therefore there is no risk
467  * in outright destroying the worker threads here.
468  * \param listener The taskprocessor listener. The threadpool is the listener's private data.
469  */
470 static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener)
471 {
472         struct ast_threadpool *pool = listener->private_data;
473         int flags = (OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE);
474
475         ao2_cleanup(pool->active_threads);
476         ao2_cleanup(pool->idle_threads);
477         ao2_cleanup(pool->zombie_threads);
478 }
479
480 /*!
481  * \brief Taskprocessor listener destroy callback
482  *
483  * Since the threadpool is an ao2 object, all that is necessary is to
484  * decrease the refcount. Since the control taskprocessor should already
485  * be destroyed by this point, this should be the final reference to the
486  * threadpool.
487  *
488  * \param private_data The threadpool to destroy
489  */
490 static void threadpool_destroy(void *private_data)
491 {
492         struct ast_threadpool *pool = private_data;
493         ao2_cleanup(pool);
494 }
495
496 /*!
497  * \brief Table of taskprocessor listener callbacks for threadpool's main taskprocessor
498  */
499 static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks = {
500         .alloc = threadpool_alloc,
501         .task_pushed = threadpool_tps_task_pushed,
502         .emptied = threadpool_tps_emptied,
503         .shutdown = threadpool_tps_shutdown,
504         .destroy = threadpool_destroy,
505 };
506
507 /*!
508  * \brief Add threads to the threadpool
509  *
510  * This function is called from the threadpool's control taskprocessor thread.
511  * \param pool The pool that is expanding
512  * \delta The number of threads to add to the pool
513  */
514 static void grow(struct ast_threadpool *pool, int delta)
515 {
516         int i;
517         for (i = 0; i < delta; ++i) {
518                 struct worker_thread *worker = worker_thread_alloc(pool);
519                 if (!worker) {
520                         return;
521                 }
522                 ao2_link(pool->active_threads, worker);
523         }
524 }
525
526 /*!
527  * \brief ao2 callback to kill a set number of threads.
528  *
529  * Threads will be unlinked from the container as long as the
530  * counter has not reached zero. The counter is decremented with
531  * each thread that is removed.
532  * \param obj The worker thread up for possible destruction
533  * \param arg The counter
534  * \param flags Unused
535  * \retval CMP_MATCH The counter has not reached zero, so this flag should be removed.
536  * \retval CMP_STOP The counter has reached zero so no more threads should be removed.
537  */
538 static int kill_threads(void *obj, void *arg, int flags)
539 {
540         struct worker_thread *worker = obj;
541         int *num_to_kill = arg;
542
543         if ((*num_to_kill)-- > 0) {
544                 return CMP_MATCH;
545         } else {
546                 return CMP_STOP;
547         }
548 }
549
550 /*!
551  * \brief ao2 callback to zombify a set number of threads.
552  *
553  * Threads will be zombified as long as as the counter has not reached
554  * zero. The counter is decremented with each thread that is zombified.
555  *
556  * Zombifying a thread involves removing it from its current container,
557  * adding it to the zombie container, and changing the state of the
558  * worker to a zombie
559  *
560  * This callback is called from the threadpool control taskprocessor thread.
561  *
562  * \param obj The worker thread that may be zombified
563  * \param arg The pool to which the worker belongs
564  * \param data The counter
565  * \param flags Unused
566  * \retval CMP_MATCH The zombified thread should be removed from its current container
567  * \retval CMP_STOP Stop attempting to zombify threads
568  */
569 static int zombify_threads(void *obj, void *arg, void *data, int flags)
570 {
571         struct worker_thread *worker = obj;
572         struct ast_threadpool *pool = arg;
573         int *num_to_zombify = data;
574
575         if ((*num_to_zombify)-- > 0) {
576                 ao2_link(pool->zombie_threads, worker);
577                 worker_set_state(worker, ZOMBIE);
578                 return CMP_MATCH;
579         } else {
580                 return CMP_STOP;
581         }
582 }
583
584 /*!
585  * \brief Remove threads from the threadpool
586  *
587  * The preference is to kill idle threads. However, if there are
588  * more threads to remove than there are idle threads, then active
589  * threads will be zombified instead.
590  *
591  * This function is called from the threadpool control taskprocessor thread.
592  *
593  * \param pool The threadpool to remove threads from
594  * \param delta The number of threads to remove
595  */
596 static void shrink(struct ast_threadpool *pool, int delta)
597 {
598         /* 
599          * Preference is to kill idle threads, but
600          * we'll move on to deactivating active threads
601          * if we have to
602          */
603         int idle_threads = ao2_container_count(pool->idle_threads);
604         int idle_threads_to_kill = MIN(delta, idle_threads);
605         int active_threads_to_zombify = delta - idle_threads_to_kill;
606
607         ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK,
608                         kill_threads, &idle_threads_to_kill);
609
610         ao2_callback_data(pool->active_threads, OBJ_UNLINK | OBJ_NOLOCK,
611                         zombify_threads, pool, &active_threads_to_zombify);
612 }
613
614 /*!
615  * \brief Helper struct used for queued operations that change the size of the threadpool
616  */
617 struct set_size_data {
618         /*! The pool whose size is to change */
619         struct ast_threadpool *pool;
620         /*! The requested new size of the pool */
621         unsigned int size;
622 };
623
624 /*!
625  * \brief Allocate and initialize a set_size_data
626  * \param pool The pool for the set_size_data
627  * \param size The size to store in the set_size_data
628  */
629 static struct set_size_data *set_size_data_alloc(struct ast_threadpool *pool,
630                 unsigned int size)
631 {
632         struct set_size_data *ssd = ao2_alloc(sizeof(*ssd), set_size_data_destroy);
633         if (!ssd) {
634                 return NULL;
635         }
636
637         ssd->pool = pool;
638         ssd->size = size;
639         return ssd;
640 }
641
642 /*!
643  * \brief Change the size of the threadpool
644  *
645  * This can either result in shrinking or growing the threadpool depending
646  * on the new desired size and the current size.
647  *
648  * This function is run from the threadpool control taskprocessor thread
649  *
650  * \param data A set_size_data used for determining how to act
651  * \return 0
652  */
653 static int queued_set_size(void *data)
654 {
655         struct set_size_data *ssd = data;
656         struct ast_threadpool *pool = ssd->pool;
657         unsigned int num_threads = ssd->size;
658
659         /* We don't count zombie threads as being "live when potentially resizing */
660         unsigned int current_size = ao2_container_count(pool->active_threads) +
661                 ao2_container_count(pool->idle_threads);
662
663         if (current_size == num_threads) {
664                 ast_log(LOG_NOTICE, "Not changing threadpool size since new size %u is the same as current %u\n",
665                                 num_threads, current_size);
666                 return 0;
667         }
668
669         if (current_size < num_threads) {
670                 grow(pool, num_threads - current_size);
671         } else {
672                 shrink(pool, current_size - num_threads);
673         }
674
675         threadpool_send_state_changed(pool);
676         ao2_ref(ssd, -1);
677         return 0;
678 }
679
680 void ast_threadpool_set_size(struct ast_threadpool *pool, unsigned int size)
681 {
682         struct set_size_data *ssd;
683
684         ssd = set_size_data_alloc(pool, size);
685         if (!ssd) {
686                 return;
687         }
688
689         ast_taskprocessor_push(pool->control_tps, queued_set_size, ssd);
690 }
691
692 struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *listener, int initial_size)
693 {
694         struct ast_threadpool *pool;
695         struct ast_taskprocessor *tps;
696         RAII_VAR(struct ast_taskprocessor_listener *, tps_listener,
697                         ast_taskprocessor_listener_alloc(&threadpool_tps_listener_callbacks),
698                         ao2_cleanup);
699
700         if (!tps_listener) {
701                 return NULL;
702         }
703
704         tps = ast_taskprocessor_create_with_listener("XXX CHANGE THIS XXX", tps_listener);
705
706         if (!tps) {
707                 return NULL;
708         }
709
710         pool = tps_listener->private_data;
711         ast_threadpool_set_size(pool, initial_size);
712         return pool;
713 }
714
715 int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data)
716 {
717         return ast_taskprocessor_push(pool->tps, task, data);
718 }
719
720 void ast_threadpool_shutdown(struct ast_threadpool *pool)
721 {
722         /* Shut down the taskprocessors and everything else just
723          * takes care of itself via the taskprocessor callbacks
724          */
725         ast_taskprocessor_unreference(pool->control_tps);
726         ast_taskprocessor_unreference(pool->tps);
727 }
728
729 /*!
730  * A thread that executes threadpool tasks
731  */
732 struct worker_thread {
733         /*! A unique (within a run of Asterisk) ID for the thread. Used for hashing and searching */
734         int id;
735         /*! Condition used in conjunction with state changes */
736         ast_cond_t cond;
737         /*! Lock used alongside the condition for state changes */
738         ast_mutex_t lock;
739         /*! The actual thread that is executing tasks */
740         pthread_t thread;
741         /*! A pointer to the threadpool. Needed to be able to execute tasks */
742         struct ast_threadpool *pool;
743         /*! The current state of the worker thread */
744         enum worker_state state;
745         /*! A boolean used to determine if an idle thread should become active */
746         int wake_up;
747 };
748
749 /*!
750  * A monotonically increasing integer used for worker
751  * thread identification.
752  */
753 static int worker_id_counter;
754
755 static int worker_thread_hash(const void *obj, int flags)
756 {
757         const struct worker_thread *worker = obj;
758
759         return worker->id;
760 }
761
762 static int worker_thread_cmp(void *obj, void *arg, int flags)
763 {
764         struct worker_thread *worker1 = obj;
765         struct worker_thread *worker2 = arg;
766
767         return worker1->id == worker2->id ? CMP_MATCH : 0;
768 }
769
770 /*!
771  * \brief shut a worker thread down
772  *
773  * Set the worker dead and then wait for its thread
774  * to finish executing.
775  *
776  * \param worker The worker thread to shut down
777  */
778 static void worker_shutdown(struct worker_thread *worker)
779 {
780         struct worker_thread *worker = obj;
781
782         worker_set_state(worker, DEAD);
783         if (worker->thread != AST_PTHREADT_NULL) {
784                 pthread_join(worker->thread, NULL);
785                 worker->thread = AST_PTHREADT_NULL;
786         }
787 }
788
789 /*!
790  * \brief Worker thread destructor
791  *
792  * Called automatically when refcount reaches 0. Shuts
793  * down the worker thread and destroys its component
794  * parts
795  */
796 static void worker_thread_destroy(void *obj)
797 {
798         struct worker_thread *worker = obj;
799         worker_shutdown(worker);
800         ast_mutex_destroy(&worker->lock);
801         ast_cond_destroy(&worker->cond);
802 }
803
804 /*!
805  * \brief start point for worker threads
806  *
807  * Worker threads start in the active state but may
808  * immediately go idle if there is no work to be
809  * done
810  *
811  * \param arg The worker thread
812  * \retval NULL
813  */
814 static void *worker_start(void *arg)
815 {
816         struct worker_thread *worker = arg;
817
818         worker_active(worker);
819         return NULL;
820 }
821
822 /*!
823  * \brief Allocate and initialize a new worker thread
824  *
825  * This will create, initialize, and start the thread.
826  *
827  * \param pool The threadpool to which the worker will be added
828  * \retval NULL Failed to allocate or start the worker thread
829  * \retval non-NULL The newly-created worker thread
830  */
831 static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool)
832 {
833         struct worker_thread *worker = ao2_alloc(sizeof(*worker), worker_thread_destroy);
834         if (!worker) {
835                 return NULL;
836         }
837         worker->id = ast_atomic_fetchadd_int(&worker_id_counter, 1);
838         ast_mutex_init(&worker->lock);
839         ast_cond_init(&worker->cond, NULL);
840         worker->pool = pool;
841         worker->thread = AST_PTHREADT_NULL;
842         worker->state = ALIVE;
843         if (ast_pthread_create(&worker->thread, NULL, worker_start, worker) < 0) {
844                 ast_log(LOG_ERROR, "Unable to start worker thread!\n");
845                 ao2_ref(worker, -1);
846                 return NULL;
847         }
848         return worker;
849 }
850
851 /*!
852  * \brief Active loop for worker threads
853  *
854  * The worker will stay in this loop for its lifetime,
855  * executing tasks as they become available. If there
856  * are no tasks currently available, then the thread
857  * will go idle.
858  *
859  * \param worker The worker thread executing tasks.
860  */
861 static void worker_active(struct worker_thread *worker)
862 {
863         int alive = 1;
864         while (alive) {
865                 if (threadpool_execute(worker->pool)) {
866                         alive = worker_idle(worker);
867                 }
868         }
869
870         /* Reaching this portion means the thread is
871          * on death's door. It may have been killed while
872          * it was idle, in which case it can just die
873          * peacefully. If it's a zombie, though, then
874          * it needs to let the pool know so
875          * that the thread can be removed from the
876          * list of zombie threads.
877          */
878         if (worker->state == ZOMBIE) {
879                 threadpool_zombie_thread_dead(worker->pool, worker);
880         }
881 }
882
883 /*!
884  * \brief Idle function for worker threads
885  *
886  * The worker waits here until it gets told by the threadpool
887  * to wake up.
888  *
889  * \param worker The idle worker
890  * \retval 0 The thread is being woken up so that it can conclude.
891  * \retval non-zero The thread is being woken up to do more work.
892  */
893 static int worker_idle(struct worker_thread *worker)
894 {
895         SCOPED_MUTEX(lock, &worker->lock);
896         if (worker->state != ALIVE) {
897                 return 0;
898         }
899         threadpool_active_thread_idle(worker->pool, worker);
900         while (!worker->wake_up) {
901                 ast_cond_wait(&worker->cond, lock);
902         }
903         worker->wake_up = 0;
904         return worker->state == ALIVE;
905 }
906
907 /*!
908  * \brief Change a worker's state
909  *
910  * The threadpool calls into this function in order to let a worker know
911  * how it should proceed.
912  */
913 static void worker_set_state(struct worker_thread *worker, enum worker_state state)
914 {
915         SCOPED_MUTEX(lock, &worker->lock);
916         worker->state = state;
917         worker->wake_up = 1;
918         ast_cond_signal(&worker->cond);
919 }
920