1f1812a648da8c1f108291d2ceafb1434c4881eb
[asterisk/asterisk.git] / main / threadpool.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2012, Digium, Inc.
5  *
6  * Mark Michelson <mmmichelson@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19
20 #include "asterisk.h"
21
22 #include "asterisk/threadpool.h"
23 #include "asterisk/taskprocessor.h"
24 #include "asterisk/astobj2.h"
25 #include "asterisk/utils.h"
26
27 #define THREAD_BUCKETS 89
28
29 /*!
30  * \brief An opaque threadpool structure
31  *
32  * A threadpool is a collection of threads that execute
33  * tasks from a common queue.
34  */
35 struct ast_threadpool {
36         /*! Threadpool listener */
37         struct ast_threadpool_listener *listener;
38         /*! 
39          * \brief The container of active threads.
40          * Active threads are those that are currently running tasks
41          */
42         struct ao2_container *active_threads;
43         /*! 
44          * \brief The container of idle threads.
45          * Idle threads are those that are currenly waiting to run tasks
46          */
47         struct ao2_container *idle_threads;
48         /*! 
49          * \brief The main taskprocessor
50          * 
51          * Tasks that are queued in this taskprocessor are
52          * doled out to the worker threads. Worker threads that
53          * execute tasks from the threadpool are executing tasks
54          * in this taskprocessor.
55          *
56          * The threadpool itself is actually the private data for
57          * this taskprocessor's listener. This way, as taskprocessor
58          * changes occur, the threadpool can alert its listeners
59          * appropriately.
60          */
61         struct ast_taskprocessor *tps;
62         /*!
63          * \brief The control taskprocessor
64          *
65          * This is a standard taskprocessor that uses the default
66          * taskprocessor listener. In other words, all tasks queued to
67          * this taskprocessor have a single thread that executes the
68          * tasks.
69          *
70          * All tasks that modify the state of the threadpool and all tasks
71          * that call out to threadpool listeners are pushed to this
72          * taskprocessor.
73          *
74          * For instance, when the threadpool changes sizes, a task is put
75          * into this taskprocessor to do so. When it comes time to tell the
76          * threadpool listener that worker threads have changed state,
77          * the task is placed in this taskprocessor.
78          *
79          * This is done for three main reasons
80          * 1) It ensures that listeners are given an accurate portrayal
81          * of the threadpool's current state. In other words, when a listener
82          * gets told a count of active and idle threads, it does not
83          * need to worry that internal state of the threadpool might be different
84          * from what it has been told.
85          * 2) It minimizes the locking required in both the threadpool and in
86          * threadpool listener's callbacks.
87          * 3) It ensures that listener callbacks are called in the same order
88          * that the threadpool had its state change.
89          */
90         struct ast_taskprocessor *control_tps;
91 };
92
93 /*!
94  * \brief states for worker threads
95  */
96 enum worker_state {
97         /*! The worker is either active or idle */
98         ALIVE,
99         /*! The worker has been asked to shut down. */
100         DEAD,
101 };
102
103 /* Worker thread forward declarations. See definitions for documentation */
104 struct worker_thread;
105 static int worker_thread_hash(const void *obj, int flags);
106 static int worker_thread_cmp(void *obj, void *arg, int flags);
107 static void worker_thread_destroy(void *obj);
108 static void worker_active(struct worker_thread *worker);
109 static void *worker_start(void *arg);
110 static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool);
111 static int worker_idle(struct worker_thread *worker);
112 static void worker_set_state(struct worker_thread *worker, enum worker_state state);
113 static void worker_shutdown(struct worker_thread *worker);
114
115 /*!
116  * \brief Notify the threadpool listener that the state has changed.
117  *
118  * This notifies the threadpool listener via its state_changed callback.
119  * \param pool The threadpool whose state has changed
120  */
121 static void threadpool_send_state_changed(struct ast_threadpool *pool)
122 {
123         int active_size = ao2_container_count(pool->active_threads);
124         int idle_size = ao2_container_count(pool->idle_threads);
125
126         pool->listener->callbacks->state_changed(pool->listener, active_size, idle_size);
127 }
128
129 /*!
130  * \brief Struct used for queued operations involving worker state changes
131  */
132 struct thread_worker_pair {
133         /*! Threadpool that contains the worker whose state has changed */
134         struct ast_threadpool *pool;
135         /*! Worker whose state has changed */
136         struct worker_thread *worker;
137 };
138
139 /*!
140  * \brief Destructor for thread_worker_pair
141  */
142 static void thread_worker_pair_destructor(void *obj)
143 {
144         struct thread_worker_pair *pair = obj;
145         ao2_ref(pair->worker, -1);
146 }
147
148 /*!
149  * \brief Allocate and initialize a thread_worker_pair
150  * \param pool Threadpool to assign to the thread_worker_pair
151  * \param worker Worker thread to assign to the thread_worker_pair
152  */
153 static struct thread_worker_pair *thread_worker_pair_alloc(struct ast_threadpool *pool,
154                 struct worker_thread *worker)
155 {
156         struct thread_worker_pair *pair = ao2_alloc(sizeof(*pair), thread_worker_pair_destructor);
157         if (!pair) {
158                 return NULL;
159         }
160         pair->pool = pool;
161         ao2_ref(worker, +1);
162         pair->worker = worker;
163         return pair;
164 }
165
166 /*!
167  * \brief Move a worker thread from the active container to the idle container.
168  *
169  * This function is called from the threadpool's control taskprocessor thread.
170  * \param data A thread_worker_pair containing the threadpool and the worker to move.
171  * \return 0
172  */
173 static int queued_active_thread_idle(void *data)
174 {
175         struct thread_worker_pair *pair = data;
176
177         ao2_link(pair->pool->idle_threads, pair->worker);
178         ao2_unlink(pair->pool->active_threads, pair->worker);
179
180         threadpool_send_state_changed(pair->pool);
181
182         ao2_ref(pair, -1);
183         return 0;
184 }
185
186 /*!
187  * \brief Queue a task to move a thread from the active list to the idle list
188  *
189  * This is called by a worker thread when it runs out of tasks to perform and
190  * goes idle.
191  * \param pool The threadpool to which the worker belongs
192  * \param worker The worker thread that has gone idle
193  */
194 static void threadpool_active_thread_idle(struct ast_threadpool *pool,
195                 struct worker_thread *worker)
196 {
197         struct thread_worker_pair *pair = thread_worker_pair_alloc(pool, worker);
198         if (!pair) {
199                 return;
200         }
201         ast_taskprocessor_push(pool->control_tps, queued_active_thread_idle, pair);
202 }
203
204 /*!
205  * \brief Execute a task in the threadpool
206  * 
207  * This is the function that worker threads call in order to execute tasks
208  * in the threadpool
209  *
210  * \param pool The pool to which the tasks belong.
211  * \retval 0 Either the pool has been shut down or there are no tasks.
212  * \retval 1 There are still tasks remaining in the pool.
213  */
214 static int threadpool_execute(struct ast_threadpool *pool)
215 {
216         return ast_taskprocessor_execute(pool->tps);
217 }
218
219 /*!
220  * \brief Destroy a threadpool's components.
221  *
222  * This is the destructor called automatically when the threadpool's
223  * reference count reaches zero. This is not to be confused with
224  * threadpool_destroy.
225  *
226  * By the time this actually gets called, most of the cleanup has already
227  * been done in the pool. The only thing left to do is to release the
228  * final reference to the threadpool listener.
229  *
230  * \param obj The pool to destroy
231  */
232 static void threadpool_destructor(void *obj)
233 {
234         struct ast_threadpool *pool = obj;
235         /* XXX Probably should let the listener know we're being destroyed? */
236
237         /* Threads should all be shut down by now, so this should be a painless
238          * operation
239          */
240         ao2_cleanup(pool->listener);
241 }
242
243 /*
244  * \brief Allocate a threadpool
245  *
246  * This is implemented as a taskprocessor listener's alloc callback. This
247  * is because the threadpool exists as the private data on a taskprocessor
248  * listener.
249  *
250  * \param listener The taskprocessor listener where the threadpool will live.
251  * \retval NULL Could not initialize threadpool properly
252  * \retval non-NULL The newly-allocated threadpool
253  */
254 static void *threadpool_alloc(struct ast_taskprocessor_listener *listener)
255 {
256         RAII_VAR(struct ast_threadpool *, pool,
257                         ao2_alloc(sizeof(*pool), threadpool_destructor), ao2_cleanup);
258
259         pool->control_tps = ast_taskprocessor_get("CHANGE THIS", TPS_REF_DEFAULT);
260         if (!pool->control_tps) {
261                 return NULL;
262         }
263         pool->active_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
264         if (!pool->active_threads) {
265                 return NULL;
266         }
267         pool->idle_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
268         if (!pool->idle_threads) {
269                 return NULL;
270         }
271
272         pool->tps = listener->tps;
273
274         ao2_ref(pool, +1);
275         return pool;
276 }
277
278 /*!
279  * \brief helper used for queued task when tasks are pushed
280  */
281 struct task_pushed_data {
282         /*! Pool into which a task was pushed */
283         struct ast_threadpool *pool;
284         /*! Indicator of whether the pool had no tasks prior to the new task being added */
285         int was_empty;
286 };
287
288 /*!
289  * \brief Allocate and initialize a task_pushed_data
290  * \param pool The threadpool to set in the task_pushed_data
291  * \param was_empty The was_empty value to set in the task_pushed_data
292  * \retval NULL Unable to allocate task_pushed_data
293  * \retval non-NULL The newly-allocated task_pushed_data
294  */
295 static struct task_pushed_data *task_pushed_data_alloc(struct ast_threadpool *pool,
296                 int was_empty)
297 {
298         struct task_pushed_data *tpd = ao2_alloc(sizeof(*tpd), NULL);
299
300         if (!tpd) {
301                 return NULL;
302         }
303         tpd->pool = pool;
304         tpd->was_empty = was_empty;
305         return tpd;
306 }
307
308 /*!
309  * \brief Activate idle threads
310  *
311  * This function always returns CMP_MATCH because all threads that this
312  * function acts on need to be seen as matches so they are unlinked from the
313  * list of idle threads.
314  *
315  * Called as an ao2_callback in the threadpool's control taskprocessor thread.
316  * \param obj The worker to activate
317  * \param arg The pool where the worker belongs
318  * \retval CMP_MATCH
319  */
320 static int activate_threads(void *obj, void *arg, int flags)
321 {
322         struct worker_thread *worker = obj;
323         struct ast_threadpool *pool = arg;
324
325         ao2_link(pool->active_threads, worker);
326         worker_set_state(worker, ALIVE);
327         return CMP_MATCH;
328 }
329
330 /*!
331  * \brief Queue task called when tasks are pushed into the threadpool
332  *
333  * This function first calls into the threadpool's listener to let it know
334  * that a task has been pushed. It then wakes up all idle threads and moves
335  * them into the active thread container.
336  * \param data A task_pushed_data
337  * \return 0
338  */
339 static int handle_task_pushed(void *data)
340 {
341         struct task_pushed_data *tpd = data;
342         struct ast_threadpool *pool = tpd->pool;
343         int was_empty = tpd->was_empty;
344
345         pool->listener->callbacks->tps_task_pushed(pool->listener, was_empty);
346         ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
347                         activate_threads, pool);
348         ao2_ref(tpd, -1);
349         return 0;
350 }
351
352 /*!
353  * \brief Taskprocessor listener callback called when a task is added
354  *
355  * The threadpool uses this opportunity to queue a task on its control taskprocessor
356  * in order to activate idle threads and notify the threadpool listener that the
357  * task has been pushed.
358  * \param listener The taskprocessor listener. The threadpool is the listener's private data
359  * \param was_empty True if the taskprocessor was empty prior to the task being pushed
360  */
361 static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener,
362                 int was_empty)
363 {
364         struct ast_threadpool *pool = listener->private_data;
365         struct task_pushed_data *tpd = task_pushed_data_alloc(pool, was_empty);
366
367         if (!tpd) {
368                 return;
369         }
370
371         ast_taskprocessor_push(pool->control_tps, handle_task_pushed, tpd);
372 }
373
374 /*!
375  * \brief Queued task that handles the case where the threadpool's taskprocessor is emptied
376  *
377  * This simply lets the threadpool's listener know that the threadpool is devoid of tasks
378  * \param data The pool that has become empty
379  * \return 0
380  */
381 static int handle_emptied(void *data)
382 {
383         struct ast_threadpool *pool = data;
384
385         pool->listener->callbacks->emptied(pool->listener);
386         return 0;
387 }
388
389 /*!
390  * \brief Taskprocessor listener emptied callback
391  *
392  * The threadpool queues a task to let the threadpool listener know that
393  * the threadpool no longer contains any tasks.
394  * \param listener The taskprocessor listener. The threadpool is the listener's private data.
395  */
396 static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener)
397 {
398         struct ast_threadpool *pool = listener->private_data;
399
400         ast_taskprocessor_push(pool->control_tps, handle_emptied, pool);
401 }
402
403 /*!
404  * \brief Taskprocessor listener shutdown callback
405  *
406  * The threadpool will shut down and destroy all of its worker threads when
407  * this is called back. By the time this gets called, the taskprocessor's
408  * control taskprocessor has already been destroyed. Therefore there is no risk
409  * in outright destroying the worker threads here.
410  * \param listener The taskprocessor listener. The threadpool is the listener's private data.
411  */
412 static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener)
413 {
414         struct ast_threadpool *pool = listener->private_data;
415
416         ao2_cleanup(pool->active_threads);
417         ao2_cleanup(pool->idle_threads);
418 }
419
420 /*!
421  * \brief Taskprocessor listener destroy callback
422  *
423  * Since the threadpool is an ao2 object, all that is necessary is to
424  * decrease the refcount. Since the control taskprocessor should already
425  * be destroyed by this point, this should be the final reference to the
426  * threadpool.
427  *
428  * \param private_data The threadpool to destroy
429  */
430 static void threadpool_destroy(void *private_data)
431 {
432         struct ast_threadpool *pool = private_data;
433         ao2_cleanup(pool);
434 }
435
436 /*!
437  * \brief Table of taskprocessor listener callbacks for threadpool's main taskprocessor
438  */
439 static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks = {
440         .alloc = threadpool_alloc,
441         .task_pushed = threadpool_tps_task_pushed,
442         .emptied = threadpool_tps_emptied,
443         .shutdown = threadpool_tps_shutdown,
444         .destroy = threadpool_destroy,
445 };
446
447 /*!
448  * \brief Add threads to the threadpool
449  *
450  * This function is called from the threadpool's control taskprocessor thread.
451  * \param pool The pool that is expanding
452  * \delta The number of threads to add to the pool
453  */
454 static void grow(struct ast_threadpool *pool, int delta)
455 {
456         int i;
457         for (i = 0; i < delta; ++i) {
458                 struct worker_thread *worker = worker_thread_alloc(pool);
459                 if (!worker) {
460                         return;
461                 }
462                 ao2_link(pool->active_threads, worker);
463         }
464 }
465
466 /*!
467  * \brief ao2 callback to kill a set number of threads.
468  *
469  * Threads will be unlinked from the container as long as the
470  * counter has not reached zero. The counter is decremented with
471  * each thread that is removed.
472  * \param obj The worker thread up for possible destruction
473  * \param arg The counter
474  * \param flags Unused
475  * \retval CMP_MATCH The counter has not reached zero, so this flag should be removed.
476  * \retval CMP_STOP The counter has reached zero so no more threads should be removed.
477  */
478 static int kill_threads(void *obj, void *arg, int flags)
479 {
480         int *num_to_kill = arg;
481
482         if ((*num_to_kill)-- > 0) {
483                 return CMP_MATCH;
484         } else {
485                 return CMP_STOP;
486         }
487 }
488
489 /*!
490  * \brief Remove threads from the threadpool
491  *
492  * The preference is to kill idle threads. However, if there are
493  * more threads to remove than there are idle threads, then active
494  * threads will be removed too.
495  *
496  * This function is called from the threadpool control taskprocessor thread.
497  *
498  * \param pool The threadpool to remove threads from
499  * \param delta The number of threads to remove
500  */
501 static void shrink(struct ast_threadpool *pool, int delta)
502 {
503         int idle_threads = ao2_container_count(pool->idle_threads);
504         int idle_threads_to_kill = MIN(delta, idle_threads);
505         int active_threads_to_kill = delta - idle_threads_to_kill;
506
507         ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK,
508                         kill_threads, &idle_threads_to_kill);
509
510         ao2_callback(pool->active_threads, OBJ_UNLINK | OBJ_NOLOCK,
511                         kill_threads, &active_threads_to_kill);
512 }
513
514 /*!
515  * \brief Helper struct used for queued operations that change the size of the threadpool
516  */
517 struct set_size_data {
518         /*! The pool whose size is to change */
519         struct ast_threadpool *pool;
520         /*! The requested new size of the pool */
521         unsigned int size;
522 };
523
524 /*!
525  * \brief Allocate and initialize a set_size_data
526  * \param pool The pool for the set_size_data
527  * \param size The size to store in the set_size_data
528  */
529 static struct set_size_data *set_size_data_alloc(struct ast_threadpool *pool,
530                 unsigned int size)
531 {
532         struct set_size_data *ssd = ao2_alloc(sizeof(*ssd), NULL);
533         if (!ssd) {
534                 return NULL;
535         }
536
537         ssd->pool = pool;
538         ssd->size = size;
539         return ssd;
540 }
541
542 /*!
543  * \brief Change the size of the threadpool
544  *
545  * This can either result in shrinking or growing the threadpool depending
546  * on the new desired size and the current size.
547  *
548  * This function is run from the threadpool control taskprocessor thread
549  *
550  * \param data A set_size_data used for determining how to act
551  * \return 0
552  */
553 static int queued_set_size(void *data)
554 {
555         struct set_size_data *ssd = data;
556         struct ast_threadpool *pool = ssd->pool;
557         unsigned int new_size = ssd->size;
558         unsigned int current_size = ao2_container_count(pool->active_threads) +
559                 ao2_container_count(pool->idle_threads);
560
561         if (current_size == new_size) {
562                 ast_log(LOG_NOTICE, "Not changing threadpool size since new size %u is the same as current %u\n",
563                                 new_size, current_size);
564                 return 0;
565         }
566
567         if (current_size < new_size) {
568                 grow(pool, new_size - current_size);
569         } else {
570                 shrink(pool, current_size - new_size);
571         }
572
573         threadpool_send_state_changed(pool);
574         ao2_ref(ssd, -1);
575         return 0;
576 }
577
578 void ast_threadpool_set_size(struct ast_threadpool *pool, unsigned int size)
579 {
580         struct set_size_data *ssd;
581
582         ssd = set_size_data_alloc(pool, size);
583         if (!ssd) {
584                 return;
585         }
586
587         ast_taskprocessor_push(pool->control_tps, queued_set_size, ssd);
588 }
589
590 struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *listener, int initial_size)
591 {
592         struct ast_threadpool *pool;
593         struct ast_taskprocessor *tps;
594         RAII_VAR(struct ast_taskprocessor_listener *, tps_listener,
595                         ast_taskprocessor_listener_alloc(&threadpool_tps_listener_callbacks),
596                         ao2_cleanup);
597
598         if (!tps_listener) {
599                 return NULL;
600         }
601
602         tps = ast_taskprocessor_create_with_listener("XXX CHANGE THIS XXX", tps_listener);
603
604         if (!tps) {
605                 return NULL;
606         }
607
608         pool = tps_listener->private_data;
609         ast_threadpool_set_size(pool, initial_size);
610         return pool;
611 }
612
613 int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data)
614 {
615         return ast_taskprocessor_push(pool->tps, task, data);
616 }
617
618 void ast_threadpool_shutdown(struct ast_threadpool *pool)
619 {
620         /* Shut down the taskprocessors and everything else just
621          * takes care of itself via the taskprocessor callbacks
622          */
623         ast_taskprocessor_unreference(pool->control_tps);
624         ast_taskprocessor_unreference(pool->tps);
625 }
626
627 /*!
628  * A thread that executes threadpool tasks
629  */
630 struct worker_thread {
631         /*! A unique (within a run of Asterisk) ID for the thread. Used for hashing and searching */
632         int id;
633         /*! Condition used in conjunction with state changes */
634         ast_cond_t cond;
635         /*! Lock used alongside the condition for state changes */
636         ast_mutex_t lock;
637         /*! The actual thread that is executing tasks */
638         pthread_t thread;
639         /*! A pointer to the threadpool. Needed to be able to execute tasks */
640         struct ast_threadpool *pool;
641         /*! The current state of the worker thread */
642         enum worker_state state;
643         /*! A boolean used to determine if an idle thread should become active */
644         int wake_up;
645 };
646
647 /*!
648  * A monotonically increasing integer used for worker
649  * thread identification.
650  */
651 static int worker_id_counter;
652
653 static int worker_thread_hash(const void *obj, int flags)
654 {
655         const struct worker_thread *worker = obj;
656
657         return worker->id;
658 }
659
660 static int worker_thread_cmp(void *obj, void *arg, int flags)
661 {
662         struct worker_thread *worker1 = obj;
663         struct worker_thread *worker2 = arg;
664
665         return worker1->id == worker2->id ? CMP_MATCH : 0;
666 }
667
668 /*!
669  * \brief shut a worker thread down
670  *
671  * Set the worker dead and then wait for its thread
672  * to finish executing.
673  *
674  * \param worker The worker thread to shut down
675  */
676 static void worker_shutdown(struct worker_thread *worker)
677 {
678         worker_set_state(worker, DEAD);
679         if (worker->thread != AST_PTHREADT_NULL) {
680                 pthread_join(worker->thread, NULL);
681                 worker->thread = AST_PTHREADT_NULL;
682         }
683 }
684
685 /*!
686  * \brief Worker thread destructor
687  *
688  * Called automatically when refcount reaches 0. Shuts
689  * down the worker thread and destroys its component
690  * parts
691  */
692 static void worker_thread_destroy(void *obj)
693 {
694         struct worker_thread *worker = obj;
695         worker_shutdown(worker);
696         ast_mutex_destroy(&worker->lock);
697         ast_cond_destroy(&worker->cond);
698 }
699
700 /*!
701  * \brief start point for worker threads
702  *
703  * Worker threads start in the active state but may
704  * immediately go idle if there is no work to be
705  * done
706  *
707  * \param arg The worker thread
708  * \retval NULL
709  */
710 static void *worker_start(void *arg)
711 {
712         struct worker_thread *worker = arg;
713
714         worker_active(worker);
715         return NULL;
716 }
717
718 /*!
719  * \brief Allocate and initialize a new worker thread
720  *
721  * This will create, initialize, and start the thread.
722  *
723  * \param pool The threadpool to which the worker will be added
724  * \retval NULL Failed to allocate or start the worker thread
725  * \retval non-NULL The newly-created worker thread
726  */
727 static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool)
728 {
729         struct worker_thread *worker = ao2_alloc(sizeof(*worker), worker_thread_destroy);
730         if (!worker) {
731                 return NULL;
732         }
733         worker->id = ast_atomic_fetchadd_int(&worker_id_counter, 1);
734         ast_mutex_init(&worker->lock);
735         ast_cond_init(&worker->cond, NULL);
736         worker->pool = pool;
737         worker->thread = AST_PTHREADT_NULL;
738         worker->state = ALIVE;
739         if (ast_pthread_create(&worker->thread, NULL, worker_start, worker) < 0) {
740                 ast_log(LOG_ERROR, "Unable to start worker thread!\n");
741                 ao2_ref(worker, -1);
742                 return NULL;
743         }
744         return worker;
745 }
746
747 /*!
748  * \brief Active loop for worker threads
749  *
750  * The worker will stay in this loop for its lifetime,
751  * executing tasks as they become available. If there
752  * are no tasks currently available, then the thread
753  * will go idle.
754  *
755  * \param worker The worker thread executing tasks.
756  */
757 static void worker_active(struct worker_thread *worker)
758 {
759         int alive = 1;
760         while (alive) {
761                 if (threadpool_execute(worker->pool)) {
762                         alive = worker_idle(worker);
763                 }
764         }
765 }
766
767 /*!
768  * \brief Idle function for worker threads
769  *
770  * The worker waits here until it gets told by the threadpool
771  * to wake up.
772  *
773  * \param worker The idle worker
774  * \retval 0 The thread is being woken up so that it can conclude.
775  * \retval non-zero The thread is being woken up to do more work.
776  */
777 static int worker_idle(struct worker_thread *worker)
778 {
779         SCOPED_MUTEX(lock, &worker->lock);
780         if (worker->state != ALIVE) {
781                 return 0;
782         }
783         threadpool_active_thread_idle(worker->pool, worker);
784         while (!worker->wake_up) {
785                 ast_cond_wait(&worker->cond, lock);
786         }
787         worker->wake_up = 0;
788         return worker->state == ALIVE;
789 }
790
791 /*!
792  * \brief Change a worker's state
793  *
794  * The threadpool calls into this function in order to let a worker know
795  * how it should proceed.
796  */
797 static void worker_set_state(struct worker_thread *worker, enum worker_state state)
798 {
799         SCOPED_MUTEX(lock, &worker->lock);
800         worker->state = state;
801         worker->wake_up = 1;
802         ast_cond_signal(&worker->cond);
803 }
804