8662c3a393dfe87f9eb5a4c78c54448f835a48ff
[asterisk/asterisk.git] / main / threadpool.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2012, Digium, Inc.
5  *
6  * Mark Michelson <mmmichelson@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19
20 #include "asterisk.h"
21
22 #include "asterisk/threadpool.h"
23 #include "asterisk/taskprocessor.h"
24 #include "asterisk/astobj2.h"
25 #include "asterisk/utils.h"
26
27 #define THREAD_BUCKETS 89
28
29 /*!
30  * \brief An opaque threadpool structure
31  *
32  * A threadpool is a collection of threads that execute
33  * tasks from a common queue.
34  */
35 struct ast_threadpool {
36         /*! Threadpool listener */
37         struct ast_threadpool_listener *listener;
38         /*! 
39          * \brief The container of active threads.
40          * Active threads are those that are currently running tasks
41          */
42         struct ao2_container *active_threads;
43         /*! 
44          * \brief The container of idle threads.
45          * Idle threads are those that are currenly waiting to run tasks
46          */
47         struct ao2_container *idle_threads;
48         /*! 
49          * \brief The main taskprocessor
50          * 
51          * Tasks that are queued in this taskprocessor are
52          * doled out to the worker threads. Worker threads that
53          * execute tasks from the threadpool are executing tasks
54          * in this taskprocessor.
55          *
56          * The threadpool itself is actually the private data for
57          * this taskprocessor's listener. This way, as taskprocessor
58          * changes occur, the threadpool can alert its listeners
59          * appropriately.
60          */
61         struct ast_taskprocessor *tps;
62         /*!
63          * \brief The control taskprocessor
64          *
65          * This is a standard taskprocessor that uses the default
66          * taskprocessor listener. In other words, all tasks queued to
67          * this taskprocessor have a single thread that executes the
68          * tasks.
69          *
70          * All tasks that modify the state of the threadpool and all tasks
71          * that call out to threadpool listeners are pushed to this
72          * taskprocessor.
73          *
74          * For instance, when the threadpool changes sizes, a task is put
75          * into this taskprocessor to do so. When it comes time to tell the
76          * threadpool listener that worker threads have changed state,
77          * the task is placed in this taskprocessor.
78          *
79          * This is done for three main reasons
80          * 1) It ensures that listeners are given an accurate portrayal
81          * of the threadpool's current state. In other words, when a listener
82          * gets told a count of active and idle threads, it does not
83          * need to worry that internal state of the threadpool might be different
84          * from what it has been told.
85          * 2) It minimizes the locking required in both the threadpool and in
86          * threadpool listener's callbacks.
87          * 3) It ensures that listener callbacks are called in the same order
88          * that the threadpool had its state change.
89          */
90         struct ast_taskprocessor *control_tps;
91 };
92
93 /*!
94  * \brief states for worker threads
95  */
96 enum worker_state {
97         /*! The worker is either active or idle */
98         ALIVE,
99         /*! The worker has been asked to shut down. */
100         DEAD,
101 };
102
103 /* Worker thread forward declarations. See definitions for documentation */
104 struct worker_thread;
105 static int worker_thread_hash(const void *obj, int flags);
106 static int worker_thread_cmp(void *obj, void *arg, int flags);
107 static void worker_thread_destroy(void *obj);
108 static void worker_active(struct worker_thread *worker);
109 static void *worker_start(void *arg);
110 static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool);
111 static int worker_idle(struct worker_thread *worker);
112 static void worker_set_state(struct worker_thread *worker, enum worker_state state);
113 static void worker_shutdown(struct worker_thread *worker);
114
115 /*!
116  * \brief Notify the threadpool listener that the state has changed.
117  *
118  * This notifies the threadpool listener via its state_changed callback.
119  * \param pool The threadpool whose state has changed
120  */
121 static void threadpool_send_state_changed(struct ast_threadpool *pool)
122 {
123         int active_size = ao2_container_count(pool->active_threads);
124         int idle_size = ao2_container_count(pool->idle_threads);
125
126         pool->listener->callbacks->state_changed(pool, pool->listener, active_size, idle_size);
127 }
128
129 /*!
130  * \brief Struct used for queued operations involving worker state changes
131  */
132 struct thread_worker_pair {
133         /*! Threadpool that contains the worker whose state has changed */
134         struct ast_threadpool *pool;
135         /*! Worker whose state has changed */
136         struct worker_thread *worker;
137 };
138
139 /*!
140  * \brief Destructor for thread_worker_pair
141  */
142 static void thread_worker_pair_destructor(void *obj)
143 {
144         struct thread_worker_pair *pair = obj;
145         ao2_ref(pair->worker, -1);
146 }
147
148 /*!
149  * \brief Allocate and initialize a thread_worker_pair
150  * \param pool Threadpool to assign to the thread_worker_pair
151  * \param worker Worker thread to assign to the thread_worker_pair
152  */
153 static struct thread_worker_pair *thread_worker_pair_alloc(struct ast_threadpool *pool,
154                 struct worker_thread *worker)
155 {
156         struct thread_worker_pair *pair = ao2_alloc(sizeof(*pair), thread_worker_pair_destructor);
157         if (!pair) {
158                 return NULL;
159         }
160         pair->pool = pool;
161         ao2_ref(worker, +1);
162         pair->worker = worker;
163         return pair;
164 }
165
166 /*!
167  * \brief Move a worker thread from the active container to the idle container.
168  *
169  * This function is called from the threadpool's control taskprocessor thread.
170  * \param data A thread_worker_pair containing the threadpool and the worker to move.
171  * \return 0
172  */
173 static int queued_active_thread_idle(void *data)
174 {
175         struct thread_worker_pair *pair = data;
176
177         ao2_link(pair->pool->idle_threads, pair->worker);
178         ao2_unlink(pair->pool->active_threads, pair->worker);
179
180         threadpool_send_state_changed(pair->pool);
181
182         ao2_ref(pair, -1);
183         return 0;
184 }
185
186 /*!
187  * \brief Queue a task to move a thread from the active list to the idle list
188  *
189  * This is called by a worker thread when it runs out of tasks to perform and
190  * goes idle.
191  * \param pool The threadpool to which the worker belongs
192  * \param worker The worker thread that has gone idle
193  */
194 static void threadpool_active_thread_idle(struct ast_threadpool *pool,
195                 struct worker_thread *worker)
196 {
197         struct thread_worker_pair *pair = thread_worker_pair_alloc(pool, worker);
198         if (!pair) {
199                 return;
200         }
201         ast_taskprocessor_push(pool->control_tps, queued_active_thread_idle, pair);
202 }
203
204 /*!
205  * \brief Execute a task in the threadpool
206  * 
207  * This is the function that worker threads call in order to execute tasks
208  * in the threadpool
209  *
210  * \param pool The pool to which the tasks belong.
211  * \retval 0 Either the pool has been shut down or there are no tasks.
212  * \retval 1 There are still tasks remaining in the pool.
213  */
214 static int threadpool_execute(struct ast_threadpool *pool)
215 {
216         return ast_taskprocessor_execute(pool->tps);
217 }
218
219 /*!
220  * \brief Destroy a threadpool's components.
221  *
222  * This is the destructor called automatically when the threadpool's
223  * reference count reaches zero. This is not to be confused with
224  * threadpool_destroy.
225  *
226  * By the time this actually gets called, most of the cleanup has already
227  * been done in the pool. The only thing left to do is to release the
228  * final reference to the threadpool listener.
229  *
230  * \param obj The pool to destroy
231  */
232 static void threadpool_destructor(void *obj)
233 {
234         struct ast_threadpool *pool = obj;
235         ao2_cleanup(pool->listener);
236 }
237
238 /*
239  * \brief Allocate a threadpool
240  *
241  * This is implemented as a taskprocessor listener's alloc callback. This
242  * is because the threadpool exists as the private data on a taskprocessor
243  * listener.
244  *
245  * \param listener The taskprocessor listener where the threadpool will live.
246  * \retval NULL Could not initialize threadpool properly
247  * \retval non-NULL The newly-allocated threadpool
248  */
249 static void *threadpool_alloc(struct ast_taskprocessor_listener *listener)
250 {
251         RAII_VAR(struct ast_threadpool *, pool,
252                         ao2_alloc(sizeof(*pool), threadpool_destructor), ao2_cleanup);
253
254         pool->control_tps = ast_taskprocessor_get("CHANGE THIS", TPS_REF_DEFAULT);
255         if (!pool->control_tps) {
256                 return NULL;
257         }
258         pool->active_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
259         if (!pool->active_threads) {
260                 return NULL;
261         }
262         pool->idle_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
263         if (!pool->idle_threads) {
264                 return NULL;
265         }
266
267         pool->tps = listener->tps;
268
269         ao2_ref(pool, +1);
270         return pool;
271 }
272
273 /*!
274  * \brief helper used for queued task when tasks are pushed
275  */
276 struct task_pushed_data {
277         /*! Pool into which a task was pushed */
278         struct ast_threadpool *pool;
279         /*! Indicator of whether the pool had no tasks prior to the new task being added */
280         int was_empty;
281 };
282
283 /*!
284  * \brief Allocate and initialize a task_pushed_data
285  * \param pool The threadpool to set in the task_pushed_data
286  * \param was_empty The was_empty value to set in the task_pushed_data
287  * \retval NULL Unable to allocate task_pushed_data
288  * \retval non-NULL The newly-allocated task_pushed_data
289  */
290 static struct task_pushed_data *task_pushed_data_alloc(struct ast_threadpool *pool,
291                 int was_empty)
292 {
293         struct task_pushed_data *tpd = ao2_alloc(sizeof(*tpd), NULL);
294
295         if (!tpd) {
296                 return NULL;
297         }
298         tpd->pool = pool;
299         tpd->was_empty = was_empty;
300         return tpd;
301 }
302
303 /*!
304  * \brief Activate idle threads
305  *
306  * This function always returns CMP_MATCH because all threads that this
307  * function acts on need to be seen as matches so they are unlinked from the
308  * list of idle threads.
309  *
310  * Called as an ao2_callback in the threadpool's control taskprocessor thread.
311  * \param obj The worker to activate
312  * \param arg The pool where the worker belongs
313  * \retval CMP_MATCH
314  */
315 static int activate_threads(void *obj, void *arg, int flags)
316 {
317         struct worker_thread *worker = obj;
318         struct ast_threadpool *pool = arg;
319
320         ao2_link(pool->active_threads, worker);
321         worker_set_state(worker, ALIVE);
322         return CMP_MATCH;
323 }
324
325 /*!
326  * \brief Queue task called when tasks are pushed into the threadpool
327  *
328  * This function first calls into the threadpool's listener to let it know
329  * that a task has been pushed. It then wakes up all idle threads and moves
330  * them into the active thread container.
331  * \param data A task_pushed_data
332  * \return 0
333  */
334 static int handle_task_pushed(void *data)
335 {
336         struct task_pushed_data *tpd = data;
337         struct ast_threadpool *pool = tpd->pool;
338         int was_empty = tpd->was_empty;
339
340         pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty);
341         ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
342                         activate_threads, pool);
343         ao2_ref(tpd, -1);
344         return 0;
345 }
346
347 /*!
348  * \brief Taskprocessor listener callback called when a task is added
349  *
350  * The threadpool uses this opportunity to queue a task on its control taskprocessor
351  * in order to activate idle threads and notify the threadpool listener that the
352  * task has been pushed.
353  * \param listener The taskprocessor listener. The threadpool is the listener's private data
354  * \param was_empty True if the taskprocessor was empty prior to the task being pushed
355  */
356 static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener,
357                 int was_empty)
358 {
359         struct ast_threadpool *pool = listener->private_data;
360         struct task_pushed_data *tpd = task_pushed_data_alloc(pool, was_empty);
361
362         if (!tpd) {
363                 return;
364         }
365
366         ast_taskprocessor_push(pool->control_tps, handle_task_pushed, tpd);
367 }
368
369 /*!
370  * \brief Queued task that handles the case where the threadpool's taskprocessor is emptied
371  *
372  * This simply lets the threadpool's listener know that the threadpool is devoid of tasks
373  * \param data The pool that has become empty
374  * \return 0
375  */
376 static int handle_emptied(void *data)
377 {
378         struct ast_threadpool *pool = data;
379
380         pool->listener->callbacks->emptied(pool, pool->listener);
381         return 0;
382 }
383
384 /*!
385  * \brief Taskprocessor listener emptied callback
386  *
387  * The threadpool queues a task to let the threadpool listener know that
388  * the threadpool no longer contains any tasks.
389  * \param listener The taskprocessor listener. The threadpool is the listener's private data.
390  */
391 static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener)
392 {
393         struct ast_threadpool *pool = listener->private_data;
394
395         ast_taskprocessor_push(pool->control_tps, handle_emptied, pool);
396 }
397
398 /*!
399  * \brief Taskprocessor listener shutdown callback
400  *
401  * The threadpool will shut down and destroy all of its worker threads when
402  * this is called back. By the time this gets called, the taskprocessor's
403  * control taskprocessor has already been destroyed. Therefore there is no risk
404  * in outright destroying the worker threads here.
405  * \param listener The taskprocessor listener. The threadpool is the listener's private data.
406  */
407 static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener)
408 {
409         struct ast_threadpool *pool = listener->private_data;
410
411         ao2_cleanup(pool->active_threads);
412         ao2_cleanup(pool->idle_threads);
413 }
414
415 /*!
416  * \brief Taskprocessor listener destroy callback
417  *
418  * Since the threadpool is an ao2 object, all that is necessary is to
419  * decrease the refcount. Since the control taskprocessor should already
420  * be destroyed by this point, this should be the final reference to the
421  * threadpool.
422  *
423  * \param private_data The threadpool to destroy
424  */
425 static void threadpool_destroy(void *private_data)
426 {
427         struct ast_threadpool *pool = private_data;
428         ao2_cleanup(pool);
429 }
430
431 /*!
432  * \brief Table of taskprocessor listener callbacks for threadpool's main taskprocessor
433  */
434 static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks = {
435         .alloc = threadpool_alloc,
436         .task_pushed = threadpool_tps_task_pushed,
437         .emptied = threadpool_tps_emptied,
438         .shutdown = threadpool_tps_shutdown,
439         .destroy = threadpool_destroy,
440 };
441
442 /*!
443  * \brief Add threads to the threadpool
444  *
445  * This function is called from the threadpool's control taskprocessor thread.
446  * \param pool The pool that is expanding
447  * \delta The number of threads to add to the pool
448  */
449 static void grow(struct ast_threadpool *pool, int delta)
450 {
451         int i;
452         for (i = 0; i < delta; ++i) {
453                 struct worker_thread *worker = worker_thread_alloc(pool);
454                 if (!worker) {
455                         return;
456                 }
457                 ao2_link(pool->active_threads, worker);
458         }
459 }
460
461 /*!
462  * \brief ao2 callback to kill a set number of threads.
463  *
464  * Threads will be unlinked from the container as long as the
465  * counter has not reached zero. The counter is decremented with
466  * each thread that is removed.
467  * \param obj The worker thread up for possible destruction
468  * \param arg The counter
469  * \param flags Unused
470  * \retval CMP_MATCH The counter has not reached zero, so this flag should be removed.
471  * \retval CMP_STOP The counter has reached zero so no more threads should be removed.
472  */
473 static int kill_threads(void *obj, void *arg, int flags)
474 {
475         int *num_to_kill = arg;
476
477         if ((*num_to_kill)-- > 0) {
478                 return CMP_MATCH;
479         } else {
480                 return CMP_STOP;
481         }
482 }
483
484 /*!
485  * \brief Remove threads from the threadpool
486  *
487  * The preference is to kill idle threads. However, if there are
488  * more threads to remove than there are idle threads, then active
489  * threads will be removed too.
490  *
491  * This function is called from the threadpool control taskprocessor thread.
492  *
493  * \param pool The threadpool to remove threads from
494  * \param delta The number of threads to remove
495  */
496 static void shrink(struct ast_threadpool *pool, int delta)
497 {
498         int idle_threads = ao2_container_count(pool->idle_threads);
499         int idle_threads_to_kill = MIN(delta, idle_threads);
500         int active_threads_to_kill = delta - idle_threads_to_kill;
501
502         ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK,
503                         kill_threads, &idle_threads_to_kill);
504
505         ao2_callback(pool->active_threads, OBJ_UNLINK | OBJ_NOLOCK,
506                         kill_threads, &active_threads_to_kill);
507 }
508
509 /*!
510  * \brief Helper struct used for queued operations that change the size of the threadpool
511  */
512 struct set_size_data {
513         /*! The pool whose size is to change */
514         struct ast_threadpool *pool;
515         /*! The requested new size of the pool */
516         unsigned int size;
517 };
518
519 /*!
520  * \brief Allocate and initialize a set_size_data
521  * \param pool The pool for the set_size_data
522  * \param size The size to store in the set_size_data
523  */
524 static struct set_size_data *set_size_data_alloc(struct ast_threadpool *pool,
525                 unsigned int size)
526 {
527         struct set_size_data *ssd = ao2_alloc(sizeof(*ssd), NULL);
528         if (!ssd) {
529                 return NULL;
530         }
531
532         ssd->pool = pool;
533         ssd->size = size;
534         return ssd;
535 }
536
537 /*!
538  * \brief Change the size of the threadpool
539  *
540  * This can either result in shrinking or growing the threadpool depending
541  * on the new desired size and the current size.
542  *
543  * This function is run from the threadpool control taskprocessor thread
544  *
545  * \param data A set_size_data used for determining how to act
546  * \return 0
547  */
548 static int queued_set_size(void *data)
549 {
550         struct set_size_data *ssd = data;
551         struct ast_threadpool *pool = ssd->pool;
552         unsigned int new_size = ssd->size;
553         unsigned int current_size = ao2_container_count(pool->active_threads) +
554                 ao2_container_count(pool->idle_threads);
555
556         if (current_size == new_size) {
557                 ast_log(LOG_NOTICE, "Not changing threadpool size since new size %u is the same as current %u\n",
558                                 new_size, current_size);
559                 return 0;
560         }
561
562         if (current_size < new_size) {
563                 grow(pool, new_size - current_size);
564         } else {
565                 shrink(pool, current_size - new_size);
566         }
567
568         threadpool_send_state_changed(pool);
569         ao2_ref(ssd, -1);
570         return 0;
571 }
572
573 void ast_threadpool_set_size(struct ast_threadpool *pool, unsigned int size)
574 {
575         struct set_size_data *ssd;
576
577         ssd = set_size_data_alloc(pool, size);
578         if (!ssd) {
579                 return;
580         }
581
582         ast_taskprocessor_push(pool->control_tps, queued_set_size, ssd);
583 }
584
585 static void listener_destructor(void *obj)
586 {
587         struct ast_threadpool_listener *listener = obj;
588
589         listener->callbacks->destroy(listener->private_data);
590 }
591
592 struct ast_threadpool_listener *ast_threadpool_listener_alloc(
593                 const struct ast_threadpool_listener_callbacks *callbacks)
594 {
595         struct ast_threadpool_listener *listener = ao2_alloc(sizeof(*listener), listener_destructor);
596         if (!listener) {
597                 return NULL;
598         }
599         listener->callbacks = callbacks;
600         listener->private_data = listener->callbacks->alloc(listener);
601         if (!listener->private_data) {
602                 ao2_ref(listener, -1);
603                 return NULL;
604         }
605         return listener;
606 }
607
608 struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *listener, int initial_size)
609 {
610         struct ast_threadpool *pool;
611         struct ast_taskprocessor *tps;
612         RAII_VAR(struct ast_taskprocessor_listener *, tps_listener,
613                         ast_taskprocessor_listener_alloc(&threadpool_tps_listener_callbacks),
614                         ao2_cleanup);
615
616         if (!tps_listener) {
617                 return NULL;
618         }
619
620         tps = ast_taskprocessor_create_with_listener("XXX CHANGE THIS XXX", tps_listener);
621
622         if (!tps) {
623                 return NULL;
624         }
625
626         pool = tps_listener->private_data;
627         ast_threadpool_set_size(pool, initial_size);
628         return pool;
629 }
630
631 int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data)
632 {
633         return ast_taskprocessor_push(pool->tps, task, data);
634 }
635
636 void ast_threadpool_shutdown(struct ast_threadpool *pool)
637 {
638         /* Shut down the taskprocessors and everything else just
639          * takes care of itself via the taskprocessor callbacks
640          */
641         ast_taskprocessor_unreference(pool->control_tps);
642         ast_taskprocessor_unreference(pool->tps);
643 }
644
645 /*!
646  * A thread that executes threadpool tasks
647  */
648 struct worker_thread {
649         /*! A unique (within a run of Asterisk) ID for the thread. Used for hashing and searching */
650         int id;
651         /*! Condition used in conjunction with state changes */
652         ast_cond_t cond;
653         /*! Lock used alongside the condition for state changes */
654         ast_mutex_t lock;
655         /*! The actual thread that is executing tasks */
656         pthread_t thread;
657         /*! A pointer to the threadpool. Needed to be able to execute tasks */
658         struct ast_threadpool *pool;
659         /*! The current state of the worker thread */
660         enum worker_state state;
661         /*! A boolean used to determine if an idle thread should become active */
662         int wake_up;
663 };
664
665 /*!
666  * A monotonically increasing integer used for worker
667  * thread identification.
668  */
669 static int worker_id_counter;
670
671 static int worker_thread_hash(const void *obj, int flags)
672 {
673         const struct worker_thread *worker = obj;
674
675         return worker->id;
676 }
677
678 static int worker_thread_cmp(void *obj, void *arg, int flags)
679 {
680         struct worker_thread *worker1 = obj;
681         struct worker_thread *worker2 = arg;
682
683         return worker1->id == worker2->id ? CMP_MATCH : 0;
684 }
685
686 /*!
687  * \brief shut a worker thread down
688  *
689  * Set the worker dead and then wait for its thread
690  * to finish executing.
691  *
692  * \param worker The worker thread to shut down
693  */
694 static void worker_shutdown(struct worker_thread *worker)
695 {
696         worker_set_state(worker, DEAD);
697         if (worker->thread != AST_PTHREADT_NULL) {
698                 pthread_join(worker->thread, NULL);
699                 worker->thread = AST_PTHREADT_NULL;
700         }
701 }
702
703 /*!
704  * \brief Worker thread destructor
705  *
706  * Called automatically when refcount reaches 0. Shuts
707  * down the worker thread and destroys its component
708  * parts
709  */
710 static void worker_thread_destroy(void *obj)
711 {
712         struct worker_thread *worker = obj;
713         worker_shutdown(worker);
714         ast_mutex_destroy(&worker->lock);
715         ast_cond_destroy(&worker->cond);
716 }
717
718 /*!
719  * \brief start point for worker threads
720  *
721  * Worker threads start in the active state but may
722  * immediately go idle if there is no work to be
723  * done
724  *
725  * \param arg The worker thread
726  * \retval NULL
727  */
728 static void *worker_start(void *arg)
729 {
730         struct worker_thread *worker = arg;
731
732         worker_active(worker);
733         return NULL;
734 }
735
736 /*!
737  * \brief Allocate and initialize a new worker thread
738  *
739  * This will create, initialize, and start the thread.
740  *
741  * \param pool The threadpool to which the worker will be added
742  * \retval NULL Failed to allocate or start the worker thread
743  * \retval non-NULL The newly-created worker thread
744  */
745 static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool)
746 {
747         struct worker_thread *worker = ao2_alloc(sizeof(*worker), worker_thread_destroy);
748         if (!worker) {
749                 return NULL;
750         }
751         worker->id = ast_atomic_fetchadd_int(&worker_id_counter, 1);
752         ast_mutex_init(&worker->lock);
753         ast_cond_init(&worker->cond, NULL);
754         worker->pool = pool;
755         worker->thread = AST_PTHREADT_NULL;
756         worker->state = ALIVE;
757         if (ast_pthread_create(&worker->thread, NULL, worker_start, worker) < 0) {
758                 ast_log(LOG_ERROR, "Unable to start worker thread!\n");
759                 ao2_ref(worker, -1);
760                 return NULL;
761         }
762         return worker;
763 }
764
765 /*!
766  * \brief Active loop for worker threads
767  *
768  * The worker will stay in this loop for its lifetime,
769  * executing tasks as they become available. If there
770  * are no tasks currently available, then the thread
771  * will go idle.
772  *
773  * \param worker The worker thread executing tasks.
774  */
775 static void worker_active(struct worker_thread *worker)
776 {
777         int alive = 1;
778         while (alive) {
779                 if (threadpool_execute(worker->pool)) {
780                         alive = worker_idle(worker);
781                 }
782         }
783 }
784
785 /*!
786  * \brief Idle function for worker threads
787  *
788  * The worker waits here until it gets told by the threadpool
789  * to wake up.
790  *
791  * \param worker The idle worker
792  * \retval 0 The thread is being woken up so that it can conclude.
793  * \retval non-zero The thread is being woken up to do more work.
794  */
795 static int worker_idle(struct worker_thread *worker)
796 {
797         SCOPED_MUTEX(lock, &worker->lock);
798         if (worker->state != ALIVE) {
799                 return 0;
800         }
801         threadpool_active_thread_idle(worker->pool, worker);
802         while (!worker->wake_up) {
803                 ast_cond_wait(&worker->cond, lock);
804         }
805         worker->wake_up = 0;
806         return worker->state == ALIVE;
807 }
808
809 /*!
810  * \brief Change a worker's state
811  *
812  * The threadpool calls into this function in order to let a worker know
813  * how it should proceed.
814  */
815 static void worker_set_state(struct worker_thread *worker, enum worker_state state)
816 {
817         SCOPED_MUTEX(lock, &worker->lock);
818         worker->state = state;
819         worker->wake_up = 1;
820         ast_cond_signal(&worker->cond);
821 }
822