Add initial simple threadpool test.
[asterisk/asterisk.git] / main / threadpool.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2012, Digium, Inc.
5  *
6  * Mark Michelson <mmmichelson@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19
20 #include "asterisk.h"
21
22 #include "asterisk/threadpool.h"
23 #include "asterisk/taskprocessor.h"
24 #include "asterisk/astobj2.h"
25 #include "asterisk/utils.h"
26
27 #define THREAD_BUCKETS 89
28
29 /*!
30  * \brief An opaque threadpool structure
31  *
32  * A threadpool is a collection of threads that execute
33  * tasks from a common queue.
34  */
35 struct ast_threadpool {
36         /*! Threadpool listener */
37         struct ast_threadpool_listener *listener;
38         /*! 
39          * \brief The container of active threads.
40          * Active threads are those that are currently running tasks
41          */
42         struct ao2_container *active_threads;
43         /*! 
44          * \brief The container of idle threads.
45          * Idle threads are those that are currenly waiting to run tasks
46          */
47         struct ao2_container *idle_threads;
48         /*! 
49          * \brief The main taskprocessor
50          * 
51          * Tasks that are queued in this taskprocessor are
52          * doled out to the worker threads. Worker threads that
53          * execute tasks from the threadpool are executing tasks
54          * in this taskprocessor.
55          *
56          * The threadpool itself is actually the private data for
57          * this taskprocessor's listener. This way, as taskprocessor
58          * changes occur, the threadpool can alert its listeners
59          * appropriately.
60          */
61         struct ast_taskprocessor *tps;
62         /*!
63          * \brief The control taskprocessor
64          *
65          * This is a standard taskprocessor that uses the default
66          * taskprocessor listener. In other words, all tasks queued to
67          * this taskprocessor have a single thread that executes the
68          * tasks.
69          *
70          * All tasks that modify the state of the threadpool and all tasks
71          * that call out to threadpool listeners are pushed to this
72          * taskprocessor.
73          *
74          * For instance, when the threadpool changes sizes, a task is put
75          * into this taskprocessor to do so. When it comes time to tell the
76          * threadpool listener that worker threads have changed state,
77          * the task is placed in this taskprocessor.
78          *
79          * This is done for three main reasons
80          * 1) It ensures that listeners are given an accurate portrayal
81          * of the threadpool's current state. In other words, when a listener
82          * gets told a count of active and idle threads, it does not
83          * need to worry that internal state of the threadpool might be different
84          * from what it has been told.
85          * 2) It minimizes the locking required in both the threadpool and in
86          * threadpool listener's callbacks.
87          * 3) It ensures that listener callbacks are called in the same order
88          * that the threadpool had its state change.
89          */
90         struct ast_taskprocessor *control_tps;
91 };
92
93 /*!
94  * \brief states for worker threads
95  */
96 enum worker_state {
97         /*! The worker is either active or idle */
98         ALIVE,
99         /*! The worker has been asked to shut down. */
100         DEAD,
101 };
102
103 /* Worker thread forward declarations. See definitions for documentation */
104 struct worker_thread;
105 static int worker_thread_hash(const void *obj, int flags);
106 static int worker_thread_cmp(void *obj, void *arg, int flags);
107 static void worker_thread_destroy(void *obj);
108 static void worker_active(struct worker_thread *worker);
109 static void *worker_start(void *arg);
110 static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool);
111 static int worker_idle(struct worker_thread *worker);
112 static void worker_set_state(struct worker_thread *worker, enum worker_state state);
113 static void worker_shutdown(struct worker_thread *worker);
114
115 /*!
116  * \brief Notify the threadpool listener that the state has changed.
117  *
118  * This notifies the threadpool listener via its state_changed callback.
119  * \param pool The threadpool whose state has changed
120  */
121 static void threadpool_send_state_changed(struct ast_threadpool *pool)
122 {
123         int active_size = ao2_container_count(pool->active_threads);
124         int idle_size = ao2_container_count(pool->idle_threads);
125
126         pool->listener->callbacks->state_changed(pool, pool->listener, active_size, idle_size);
127 }
128
129 /*!
130  * \brief Struct used for queued operations involving worker state changes
131  */
132 struct thread_worker_pair {
133         /*! Threadpool that contains the worker whose state has changed */
134         struct ast_threadpool *pool;
135         /*! Worker whose state has changed */
136         struct worker_thread *worker;
137 };
138
139 /*!
140  * \brief Destructor for thread_worker_pair
141  */
142 static void thread_worker_pair_destructor(void *obj)
143 {
144         struct thread_worker_pair *pair = obj;
145         ao2_ref(pair->worker, -1);
146 }
147
148 /*!
149  * \brief Allocate and initialize a thread_worker_pair
150  * \param pool Threadpool to assign to the thread_worker_pair
151  * \param worker Worker thread to assign to the thread_worker_pair
152  */
153 static struct thread_worker_pair *thread_worker_pair_alloc(struct ast_threadpool *pool,
154                 struct worker_thread *worker)
155 {
156         struct thread_worker_pair *pair = ao2_alloc(sizeof(*pair), thread_worker_pair_destructor);
157         if (!pair) {
158                 return NULL;
159         }
160         pair->pool = pool;
161         ao2_ref(worker, +1);
162         pair->worker = worker;
163         return pair;
164 }
165
166 /*!
167  * \brief Move a worker thread from the active container to the idle container.
168  *
169  * This function is called from the threadpool's control taskprocessor thread.
170  * \param data A thread_worker_pair containing the threadpool and the worker to move.
171  * \return 0
172  */
173 static int queued_active_thread_idle(void *data)
174 {
175         struct thread_worker_pair *pair = data;
176
177         ao2_link(pair->pool->idle_threads, pair->worker);
178         ao2_unlink(pair->pool->active_threads, pair->worker);
179
180         threadpool_send_state_changed(pair->pool);
181
182         ao2_ref(pair, -1);
183         return 0;
184 }
185
186 /*!
187  * \brief Queue a task to move a thread from the active list to the idle list
188  *
189  * This is called by a worker thread when it runs out of tasks to perform and
190  * goes idle.
191  * \param pool The threadpool to which the worker belongs
192  * \param worker The worker thread that has gone idle
193  */
194 static void threadpool_active_thread_idle(struct ast_threadpool *pool,
195                 struct worker_thread *worker)
196 {
197         struct thread_worker_pair *pair = thread_worker_pair_alloc(pool, worker);
198         if (!pair) {
199                 return;
200         }
201         ast_taskprocessor_push(pool->control_tps, queued_active_thread_idle, pair);
202 }
203
204 /*!
205  * \brief Execute a task in the threadpool
206  * 
207  * This is the function that worker threads call in order to execute tasks
208  * in the threadpool
209  *
210  * \param pool The pool to which the tasks belong.
211  * \retval 0 Either the pool has been shut down or there are no tasks.
212  * \retval 1 There are still tasks remaining in the pool.
213  */
214 static int threadpool_execute(struct ast_threadpool *pool)
215 {
216         return ast_taskprocessor_execute(pool->tps);
217 }
218
219 /*!
220  * \brief Destroy a threadpool's components.
221  *
222  * This is the destructor called automatically when the threadpool's
223  * reference count reaches zero. This is not to be confused with
224  * threadpool_destroy.
225  *
226  * By the time this actually gets called, most of the cleanup has already
227  * been done in the pool. The only thing left to do is to release the
228  * final reference to the threadpool listener.
229  *
230  * \param obj The pool to destroy
231  */
232 static void threadpool_destructor(void *obj)
233 {
234         struct ast_threadpool *pool = obj;
235         ao2_cleanup(pool->listener);
236 }
237
238 /*
239  * \brief Allocate a threadpool
240  *
241  * This is implemented as a taskprocessor listener's alloc callback. This
242  * is because the threadpool exists as the private data on a taskprocessor
243  * listener.
244  *
245  * \param listener The taskprocessor listener where the threadpool will live.
246  * \retval NULL Could not initialize threadpool properly
247  * \retval non-NULL The newly-allocated threadpool
248  */
249 static void *threadpool_alloc(struct ast_taskprocessor_listener *listener)
250 {
251         RAII_VAR(struct ast_threadpool *, pool,
252                         ao2_alloc(sizeof(*pool), threadpool_destructor), ao2_cleanup);
253
254         pool->control_tps = ast_taskprocessor_get("CHANGE THIS", TPS_REF_DEFAULT);
255         if (!pool->control_tps) {
256                 return NULL;
257         }
258         pool->active_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
259         if (!pool->active_threads) {
260                 return NULL;
261         }
262         pool->idle_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
263         if (!pool->idle_threads) {
264                 return NULL;
265         }
266
267         ao2_ref(pool, +1);
268         return pool;
269 }
270
271 /*!
272  * \brief helper used for queued task when tasks are pushed
273  */
274 struct task_pushed_data {
275         /*! Pool into which a task was pushed */
276         struct ast_threadpool *pool;
277         /*! Indicator of whether the pool had no tasks prior to the new task being added */
278         int was_empty;
279 };
280
281 /*!
282  * \brief Allocate and initialize a task_pushed_data
283  * \param pool The threadpool to set in the task_pushed_data
284  * \param was_empty The was_empty value to set in the task_pushed_data
285  * \retval NULL Unable to allocate task_pushed_data
286  * \retval non-NULL The newly-allocated task_pushed_data
287  */
288 static struct task_pushed_data *task_pushed_data_alloc(struct ast_threadpool *pool,
289                 int was_empty)
290 {
291         struct task_pushed_data *tpd = ao2_alloc(sizeof(*tpd), NULL);
292
293         if (!tpd) {
294                 return NULL;
295         }
296         tpd->pool = pool;
297         tpd->was_empty = was_empty;
298         return tpd;
299 }
300
301 /*!
302  * \brief Activate idle threads
303  *
304  * This function always returns CMP_MATCH because all threads that this
305  * function acts on need to be seen as matches so they are unlinked from the
306  * list of idle threads.
307  *
308  * Called as an ao2_callback in the threadpool's control taskprocessor thread.
309  * \param obj The worker to activate
310  * \param arg The pool where the worker belongs
311  * \retval CMP_MATCH
312  */
313 static int activate_threads(void *obj, void *arg, int flags)
314 {
315         struct worker_thread *worker = obj;
316         struct ast_threadpool *pool = arg;
317
318         ao2_link(pool->active_threads, worker);
319         worker_set_state(worker, ALIVE);
320         return CMP_MATCH;
321 }
322
323 /*!
324  * \brief Queue task called when tasks are pushed into the threadpool
325  *
326  * This function first calls into the threadpool's listener to let it know
327  * that a task has been pushed. It then wakes up all idle threads and moves
328  * them into the active thread container.
329  * \param data A task_pushed_data
330  * \return 0
331  */
332 static int handle_task_pushed(void *data)
333 {
334         struct task_pushed_data *tpd = data;
335         struct ast_threadpool *pool = tpd->pool;
336         int was_empty = tpd->was_empty;
337
338         pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty);
339         ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
340                         activate_threads, pool);
341         ao2_ref(tpd, -1);
342         return 0;
343 }
344
345 /*!
346  * \brief Taskprocessor listener callback called when a task is added
347  *
348  * The threadpool uses this opportunity to queue a task on its control taskprocessor
349  * in order to activate idle threads and notify the threadpool listener that the
350  * task has been pushed.
351  * \param listener The taskprocessor listener. The threadpool is the listener's private data
352  * \param was_empty True if the taskprocessor was empty prior to the task being pushed
353  */
354 static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener,
355                 int was_empty)
356 {
357         struct ast_threadpool *pool = listener->private_data;
358         struct task_pushed_data *tpd = task_pushed_data_alloc(pool, was_empty);
359
360         if (!tpd) {
361                 return;
362         }
363
364         ast_taskprocessor_push(pool->control_tps, handle_task_pushed, tpd);
365 }
366
367 /*!
368  * \brief Queued task that handles the case where the threadpool's taskprocessor is emptied
369  *
370  * This simply lets the threadpool's listener know that the threadpool is devoid of tasks
371  * \param data The pool that has become empty
372  * \return 0
373  */
374 static int handle_emptied(void *data)
375 {
376         struct ast_threadpool *pool = data;
377
378         pool->listener->callbacks->emptied(pool, pool->listener);
379         return 0;
380 }
381
382 /*!
383  * \brief Taskprocessor listener emptied callback
384  *
385  * The threadpool queues a task to let the threadpool listener know that
386  * the threadpool no longer contains any tasks.
387  * \param listener The taskprocessor listener. The threadpool is the listener's private data.
388  */
389 static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener)
390 {
391         struct ast_threadpool *pool = listener->private_data;
392
393         ast_taskprocessor_push(pool->control_tps, handle_emptied, pool);
394 }
395
396 /*!
397  * \brief Taskprocessor listener shutdown callback
398  *
399  * The threadpool will shut down and destroy all of its worker threads when
400  * this is called back. By the time this gets called, the taskprocessor's
401  * control taskprocessor has already been destroyed. Therefore there is no risk
402  * in outright destroying the worker threads here.
403  * \param listener The taskprocessor listener. The threadpool is the listener's private data.
404  */
405 static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener)
406 {
407         struct ast_threadpool *pool = listener->private_data;
408
409         ao2_cleanup(pool->active_threads);
410         ao2_cleanup(pool->idle_threads);
411 }
412
413 /*!
414  * \brief Taskprocessor listener destroy callback
415  *
416  * Since the threadpool is an ao2 object, all that is necessary is to
417  * decrease the refcount. Since the control taskprocessor should already
418  * be destroyed by this point, this should be the final reference to the
419  * threadpool.
420  *
421  * \param private_data The threadpool to destroy
422  */
423 static void threadpool_destroy(void *private_data)
424 {
425         struct ast_threadpool *pool = private_data;
426         ao2_cleanup(pool);
427 }
428
429 /*!
430  * \brief Table of taskprocessor listener callbacks for threadpool's main taskprocessor
431  */
432 static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks = {
433         .alloc = threadpool_alloc,
434         .task_pushed = threadpool_tps_task_pushed,
435         .emptied = threadpool_tps_emptied,
436         .shutdown = threadpool_tps_shutdown,
437         .destroy = threadpool_destroy,
438 };
439
440 /*!
441  * \brief Add threads to the threadpool
442  *
443  * This function is called from the threadpool's control taskprocessor thread.
444  * \param pool The pool that is expanding
445  * \delta The number of threads to add to the pool
446  */
447 static void grow(struct ast_threadpool *pool, int delta)
448 {
449         int i;
450         for (i = 0; i < delta; ++i) {
451                 struct worker_thread *worker = worker_thread_alloc(pool);
452                 if (!worker) {
453                         return;
454                 }
455                 ao2_link(pool->active_threads, worker);
456         }
457 }
458
459 /*!
460  * \brief ao2 callback to kill a set number of threads.
461  *
462  * Threads will be unlinked from the container as long as the
463  * counter has not reached zero. The counter is decremented with
464  * each thread that is removed.
465  * \param obj The worker thread up for possible destruction
466  * \param arg The counter
467  * \param flags Unused
468  * \retval CMP_MATCH The counter has not reached zero, so this flag should be removed.
469  * \retval CMP_STOP The counter has reached zero so no more threads should be removed.
470  */
471 static int kill_threads(void *obj, void *arg, int flags)
472 {
473         int *num_to_kill = arg;
474
475         if ((*num_to_kill)-- > 0) {
476                 return CMP_MATCH;
477         } else {
478                 return CMP_STOP;
479         }
480 }
481
482 /*!
483  * \brief Remove threads from the threadpool
484  *
485  * The preference is to kill idle threads. However, if there are
486  * more threads to remove than there are idle threads, then active
487  * threads will be removed too.
488  *
489  * This function is called from the threadpool control taskprocessor thread.
490  *
491  * \param pool The threadpool to remove threads from
492  * \param delta The number of threads to remove
493  */
494 static void shrink(struct ast_threadpool *pool, int delta)
495 {
496         int idle_threads = ao2_container_count(pool->idle_threads);
497         int idle_threads_to_kill = MIN(delta, idle_threads);
498         int active_threads_to_kill = delta - idle_threads_to_kill;
499
500         ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK,
501                         kill_threads, &idle_threads_to_kill);
502
503         ao2_callback(pool->active_threads, OBJ_UNLINK | OBJ_NOLOCK,
504                         kill_threads, &active_threads_to_kill);
505 }
506
507 /*!
508  * \brief Helper struct used for queued operations that change the size of the threadpool
509  */
510 struct set_size_data {
511         /*! The pool whose size is to change */
512         struct ast_threadpool *pool;
513         /*! The requested new size of the pool */
514         unsigned int size;
515 };
516
517 /*!
518  * \brief Allocate and initialize a set_size_data
519  * \param pool The pool for the set_size_data
520  * \param size The size to store in the set_size_data
521  */
522 static struct set_size_data *set_size_data_alloc(struct ast_threadpool *pool,
523                 unsigned int size)
524 {
525         struct set_size_data *ssd = ao2_alloc(sizeof(*ssd), NULL);
526         if (!ssd) {
527                 return NULL;
528         }
529
530         ssd->pool = pool;
531         ssd->size = size;
532         return ssd;
533 }
534
535 /*!
536  * \brief Change the size of the threadpool
537  *
538  * This can either result in shrinking or growing the threadpool depending
539  * on the new desired size and the current size.
540  *
541  * This function is run from the threadpool control taskprocessor thread
542  *
543  * \param data A set_size_data used for determining how to act
544  * \return 0
545  */
546 static int queued_set_size(void *data)
547 {
548         struct set_size_data *ssd = data;
549         struct ast_threadpool *pool = ssd->pool;
550         unsigned int new_size = ssd->size;
551         unsigned int current_size = ao2_container_count(pool->active_threads) +
552                 ao2_container_count(pool->idle_threads);
553
554         if (current_size == new_size) {
555                 ast_log(LOG_NOTICE, "Not changing threadpool size since new size %u is the same as current %u\n",
556                                 new_size, current_size);
557                 return 0;
558         }
559
560         if (current_size < new_size) {
561                 grow(pool, new_size - current_size);
562         } else {
563                 shrink(pool, current_size - new_size);
564         }
565
566         threadpool_send_state_changed(pool);
567         ao2_ref(ssd, -1);
568         return 0;
569 }
570
571 void ast_threadpool_set_size(struct ast_threadpool *pool, unsigned int size)
572 {
573         struct set_size_data *ssd;
574
575         ssd = set_size_data_alloc(pool, size);
576         if (!ssd) {
577                 return;
578         }
579
580         ast_taskprocessor_push(pool->control_tps, queued_set_size, ssd);
581 }
582
583 static void listener_destructor(void *obj)
584 {
585         struct ast_threadpool_listener *listener = obj;
586
587         listener->callbacks->destroy(listener->private_data);
588 }
589
590 struct ast_threadpool_listener *ast_threadpool_listener_alloc(
591                 const struct ast_threadpool_listener_callbacks *callbacks)
592 {
593         struct ast_threadpool_listener *listener = ao2_alloc(sizeof(*listener), listener_destructor);
594         if (!listener) {
595                 return NULL;
596         }
597         listener->callbacks = callbacks;
598         listener->private_data = listener->callbacks->alloc(listener);
599         if (!listener->private_data) {
600                 ao2_ref(listener, -1);
601                 return NULL;
602         }
603         return listener;
604 }
605
606 struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *listener, int initial_size)
607 {
608         struct ast_threadpool *pool;
609         struct ast_taskprocessor *tps;
610         RAII_VAR(struct ast_taskprocessor_listener *, tps_listener,
611                         ast_taskprocessor_listener_alloc(&threadpool_tps_listener_callbacks),
612                         ao2_cleanup);
613
614         if (!tps_listener) {
615                 return NULL;
616         }
617
618         tps = ast_taskprocessor_create_with_listener("XXX CHANGE THIS XXX", tps_listener);
619
620         if (!tps) {
621                 return NULL;
622         }
623
624         pool = tps_listener->private_data;
625         pool->tps = tps;
626         ast_threadpool_set_size(pool, initial_size);
627         return pool;
628 }
629
630 int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data)
631 {
632         return ast_taskprocessor_push(pool->tps, task, data);
633 }
634
635 void ast_threadpool_shutdown(struct ast_threadpool *pool)
636 {
637         /* Shut down the taskprocessors and everything else just
638          * takes care of itself via the taskprocessor callbacks
639          */
640         ast_taskprocessor_unreference(pool->control_tps);
641         ast_taskprocessor_unreference(pool->tps);
642 }
643
644 /*!
645  * A thread that executes threadpool tasks
646  */
647 struct worker_thread {
648         /*! A unique (within a run of Asterisk) ID for the thread. Used for hashing and searching */
649         int id;
650         /*! Condition used in conjunction with state changes */
651         ast_cond_t cond;
652         /*! Lock used alongside the condition for state changes */
653         ast_mutex_t lock;
654         /*! The actual thread that is executing tasks */
655         pthread_t thread;
656         /*! A pointer to the threadpool. Needed to be able to execute tasks */
657         struct ast_threadpool *pool;
658         /*! The current state of the worker thread */
659         enum worker_state state;
660         /*! A boolean used to determine if an idle thread should become active */
661         int wake_up;
662 };
663
664 /*!
665  * A monotonically increasing integer used for worker
666  * thread identification.
667  */
668 static int worker_id_counter;
669
670 static int worker_thread_hash(const void *obj, int flags)
671 {
672         const struct worker_thread *worker = obj;
673
674         return worker->id;
675 }
676
677 static int worker_thread_cmp(void *obj, void *arg, int flags)
678 {
679         struct worker_thread *worker1 = obj;
680         struct worker_thread *worker2 = arg;
681
682         return worker1->id == worker2->id ? CMP_MATCH : 0;
683 }
684
685 /*!
686  * \brief shut a worker thread down
687  *
688  * Set the worker dead and then wait for its thread
689  * to finish executing.
690  *
691  * \param worker The worker thread to shut down
692  */
693 static void worker_shutdown(struct worker_thread *worker)
694 {
695         worker_set_state(worker, DEAD);
696         if (worker->thread != AST_PTHREADT_NULL) {
697                 pthread_join(worker->thread, NULL);
698                 worker->thread = AST_PTHREADT_NULL;
699         }
700 }
701
702 /*!
703  * \brief Worker thread destructor
704  *
705  * Called automatically when refcount reaches 0. Shuts
706  * down the worker thread and destroys its component
707  * parts
708  */
709 static void worker_thread_destroy(void *obj)
710 {
711         struct worker_thread *worker = obj;
712         worker_shutdown(worker);
713         ast_mutex_destroy(&worker->lock);
714         ast_cond_destroy(&worker->cond);
715 }
716
717 /*!
718  * \brief start point for worker threads
719  *
720  * Worker threads start in the active state but may
721  * immediately go idle if there is no work to be
722  * done
723  *
724  * \param arg The worker thread
725  * \retval NULL
726  */
727 static void *worker_start(void *arg)
728 {
729         struct worker_thread *worker = arg;
730
731         worker_active(worker);
732         return NULL;
733 }
734
735 /*!
736  * \brief Allocate and initialize a new worker thread
737  *
738  * This will create, initialize, and start the thread.
739  *
740  * \param pool The threadpool to which the worker will be added
741  * \retval NULL Failed to allocate or start the worker thread
742  * \retval non-NULL The newly-created worker thread
743  */
744 static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool)
745 {
746         struct worker_thread *worker = ao2_alloc(sizeof(*worker), worker_thread_destroy);
747         if (!worker) {
748                 return NULL;
749         }
750         worker->id = ast_atomic_fetchadd_int(&worker_id_counter, 1);
751         ast_mutex_init(&worker->lock);
752         ast_cond_init(&worker->cond, NULL);
753         worker->pool = pool;
754         worker->thread = AST_PTHREADT_NULL;
755         worker->state = ALIVE;
756         if (ast_pthread_create(&worker->thread, NULL, worker_start, worker) < 0) {
757                 ast_log(LOG_ERROR, "Unable to start worker thread!\n");
758                 ao2_ref(worker, -1);
759                 return NULL;
760         }
761         return worker;
762 }
763
764 /*!
765  * \brief Active loop for worker threads
766  *
767  * The worker will stay in this loop for its lifetime,
768  * executing tasks as they become available. If there
769  * are no tasks currently available, then the thread
770  * will go idle.
771  *
772  * \param worker The worker thread executing tasks.
773  */
774 static void worker_active(struct worker_thread *worker)
775 {
776         int alive = 1;
777         while (alive) {
778                 if (threadpool_execute(worker->pool)) {
779                         alive = worker_idle(worker);
780                 }
781         }
782 }
783
784 /*!
785  * \brief Idle function for worker threads
786  *
787  * The worker waits here until it gets told by the threadpool
788  * to wake up.
789  *
790  * \param worker The idle worker
791  * \retval 0 The thread is being woken up so that it can conclude.
792  * \retval non-zero The thread is being woken up to do more work.
793  */
794 static int worker_idle(struct worker_thread *worker)
795 {
796         SCOPED_MUTEX(lock, &worker->lock);
797         if (worker->state != ALIVE) {
798                 return 0;
799         }
800         threadpool_active_thread_idle(worker->pool, worker);
801         while (!worker->wake_up) {
802                 ast_cond_wait(&worker->cond, lock);
803         }
804         worker->wake_up = 0;
805         return worker->state == ALIVE;
806 }
807
808 /*!
809  * \brief Change a worker's state
810  *
811  * The threadpool calls into this function in order to let a worker know
812  * how it should proceed.
813  */
814 static void worker_set_state(struct worker_thread *worker, enum worker_state state)
815 {
816         SCOPED_MUTEX(lock, &worker->lock);
817         worker->state = state;
818         worker->wake_up = 1;
819         ast_cond_signal(&worker->cond);
820 }
821