Fix infinite looping and crash problem.
[asterisk/asterisk.git] / main / threadpool.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2012, Digium, Inc.
5  *
6  * Mark Michelson <mmmichelson@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19
20 #include "asterisk.h"
21
22 #include "asterisk/threadpool.h"
23 #include "asterisk/taskprocessor.h"
24 #include "asterisk/astobj2.h"
25 #include "asterisk/utils.h"
26
27 #define THREAD_BUCKETS 89
28
29 /*!
30  * \brief An opaque threadpool structure
31  *
32  * A threadpool is a collection of threads that execute
33  * tasks from a common queue.
34  */
35 struct ast_threadpool {
36         /*! Threadpool listener */
37         struct ast_threadpool_listener *listener;
38         /*! 
39          * \brief The container of active threads.
40          * Active threads are those that are currently running tasks
41          */
42         struct ao2_container *active_threads;
43         /*! 
44          * \brief The container of idle threads.
45          * Idle threads are those that are currenly waiting to run tasks
46          */
47         struct ao2_container *idle_threads;
48         /*! 
49          * \brief The main taskprocessor
50          * 
51          * Tasks that are queued in this taskprocessor are
52          * doled out to the worker threads. Worker threads that
53          * execute tasks from the threadpool are executing tasks
54          * in this taskprocessor.
55          *
56          * The threadpool itself is actually the private data for
57          * this taskprocessor's listener. This way, as taskprocessor
58          * changes occur, the threadpool can alert its listeners
59          * appropriately.
60          */
61         struct ast_taskprocessor *tps;
62         /*!
63          * \brief The control taskprocessor
64          *
65          * This is a standard taskprocessor that uses the default
66          * taskprocessor listener. In other words, all tasks queued to
67          * this taskprocessor have a single thread that executes the
68          * tasks.
69          *
70          * All tasks that modify the state of the threadpool and all tasks
71          * that call out to threadpool listeners are pushed to this
72          * taskprocessor.
73          *
74          * For instance, when the threadpool changes sizes, a task is put
75          * into this taskprocessor to do so. When it comes time to tell the
76          * threadpool listener that worker threads have changed state,
77          * the task is placed in this taskprocessor.
78          *
79          * This is done for three main reasons
80          * 1) It ensures that listeners are given an accurate portrayal
81          * of the threadpool's current state. In other words, when a listener
82          * gets told a count of active and idle threads, it does not
83          * need to worry that internal state of the threadpool might be different
84          * from what it has been told.
85          * 2) It minimizes the locking required in both the threadpool and in
86          * threadpool listener's callbacks.
87          * 3) It ensures that listener callbacks are called in the same order
88          * that the threadpool had its state change.
89          */
90         struct ast_taskprocessor *control_tps;
91 };
92
93 /*!
94  * \brief states for worker threads
95  */
96 enum worker_state {
97         /*! The worker is either active or idle */
98         ALIVE,
99         /*! The worker has been asked to shut down. */
100         DEAD,
101 };
102
103 /* Worker thread forward declarations. See definitions for documentation */
104 struct worker_thread;
105 static int worker_thread_hash(const void *obj, int flags);
106 static int worker_thread_cmp(void *obj, void *arg, int flags);
107 static void worker_thread_destroy(void *obj);
108 static void worker_active(struct worker_thread *worker);
109 static void *worker_start(void *arg);
110 static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool);
111 static int worker_idle(struct worker_thread *worker);
112 static void worker_set_state(struct worker_thread *worker, enum worker_state state);
113 static void worker_shutdown(struct worker_thread *worker);
114
115 /*!
116  * \brief Notify the threadpool listener that the state has changed.
117  *
118  * This notifies the threadpool listener via its state_changed callback.
119  * \param pool The threadpool whose state has changed
120  */
121 static void threadpool_send_state_changed(struct ast_threadpool *pool)
122 {
123         int active_size = ao2_container_count(pool->active_threads);
124         int idle_size = ao2_container_count(pool->idle_threads);
125
126         pool->listener->callbacks->state_changed(pool, pool->listener, active_size, idle_size);
127 }
128
129 /*!
130  * \brief Struct used for queued operations involving worker state changes
131  */
132 struct thread_worker_pair {
133         /*! Threadpool that contains the worker whose state has changed */
134         struct ast_threadpool *pool;
135         /*! Worker whose state has changed */
136         struct worker_thread *worker;
137 };
138
139 /*!
140  * \brief Destructor for thread_worker_pair
141  */
142 static void thread_worker_pair_destructor(void *obj)
143 {
144         struct thread_worker_pair *pair = obj;
145         ao2_ref(pair->worker, -1);
146 }
147
148 /*!
149  * \brief Allocate and initialize a thread_worker_pair
150  * \param pool Threadpool to assign to the thread_worker_pair
151  * \param worker Worker thread to assign to the thread_worker_pair
152  */
153 static struct thread_worker_pair *thread_worker_pair_alloc(struct ast_threadpool *pool,
154                 struct worker_thread *worker)
155 {
156         struct thread_worker_pair *pair = ao2_alloc(sizeof(*pair), thread_worker_pair_destructor);
157         if (!pair) {
158                 return NULL;
159         }
160         pair->pool = pool;
161         ao2_ref(worker, +1);
162         pair->worker = worker;
163         return pair;
164 }
165
166 /*!
167  * \brief Move a worker thread from the active container to the idle container.
168  *
169  * This function is called from the threadpool's control taskprocessor thread.
170  * \param data A thread_worker_pair containing the threadpool and the worker to move.
171  * \return 0
172  */
173 static int queued_active_thread_idle(void *data)
174 {
175         struct thread_worker_pair *pair = data;
176
177         ao2_link(pair->pool->idle_threads, pair->worker);
178         ao2_unlink(pair->pool->active_threads, pair->worker);
179
180         threadpool_send_state_changed(pair->pool);
181
182         ao2_ref(pair, -1);
183         return 0;
184 }
185
186 /*!
187  * \brief Queue a task to move a thread from the active list to the idle list
188  *
189  * This is called by a worker thread when it runs out of tasks to perform and
190  * goes idle.
191  * \param pool The threadpool to which the worker belongs
192  * \param worker The worker thread that has gone idle
193  */
194 static void threadpool_active_thread_idle(struct ast_threadpool *pool,
195                 struct worker_thread *worker)
196 {
197         struct thread_worker_pair *pair = thread_worker_pair_alloc(pool, worker);
198         if (!pair) {
199                 return;
200         }
201         ast_taskprocessor_push(pool->control_tps, queued_active_thread_idle, pair);
202 }
203
204 /*!
205  * \brief Execute a task in the threadpool
206  * 
207  * This is the function that worker threads call in order to execute tasks
208  * in the threadpool
209  *
210  * \param pool The pool to which the tasks belong.
211  * \retval 0 Either the pool has been shut down or there are no tasks.
212  * \retval 1 There are still tasks remaining in the pool.
213  */
214 static int threadpool_execute(struct ast_threadpool *pool)
215 {
216         return ast_taskprocessor_execute(pool->tps);
217 }
218
219 /*!
220  * \brief Destroy a threadpool's components.
221  *
222  * This is the destructor called automatically when the threadpool's
223  * reference count reaches zero. This is not to be confused with
224  * threadpool_destroy.
225  *
226  * By the time this actually gets called, most of the cleanup has already
227  * been done in the pool. The only thing left to do is to release the
228  * final reference to the threadpool listener.
229  *
230  * \param obj The pool to destroy
231  */
232 static void threadpool_destructor(void *obj)
233 {
234         struct ast_threadpool *pool = obj;
235         ao2_cleanup(pool->listener);
236 }
237
238 /*
239  * \brief Allocate a threadpool
240  *
241  * This is implemented as a taskprocessor listener's alloc callback. This
242  * is because the threadpool exists as the private data on a taskprocessor
243  * listener.
244  *
245  * \param listener The taskprocessor listener where the threadpool will live.
246  * \retval NULL Could not initialize threadpool properly
247  * \retval non-NULL The newly-allocated threadpool
248  */
249 static void *threadpool_alloc(struct ast_taskprocessor_listener *listener)
250 {
251         RAII_VAR(struct ast_threadpool *, pool,
252                         ao2_alloc(sizeof(*pool), threadpool_destructor), ao2_cleanup);
253
254         pool->control_tps = ast_taskprocessor_get("CHANGE THIS", TPS_REF_DEFAULT);
255         if (!pool->control_tps) {
256                 return NULL;
257         }
258         pool->active_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
259         if (!pool->active_threads) {
260                 return NULL;
261         }
262         pool->idle_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
263         if (!pool->idle_threads) {
264                 return NULL;
265         }
266
267         ao2_ref(pool, +1);
268         return pool;
269 }
270
271 static int threadpool_tps_start(struct ast_taskprocessor_listener *listener)
272 {
273         return 0;
274 }
275
276 /*!
277  * \brief helper used for queued task when tasks are pushed
278  */
279 struct task_pushed_data {
280         /*! Pool into which a task was pushed */
281         struct ast_threadpool *pool;
282         /*! Indicator of whether the pool had no tasks prior to the new task being added */
283         int was_empty;
284 };
285
286 /*!
287  * \brief Allocate and initialize a task_pushed_data
288  * \param pool The threadpool to set in the task_pushed_data
289  * \param was_empty The was_empty value to set in the task_pushed_data
290  * \retval NULL Unable to allocate task_pushed_data
291  * \retval non-NULL The newly-allocated task_pushed_data
292  */
293 static struct task_pushed_data *task_pushed_data_alloc(struct ast_threadpool *pool,
294                 int was_empty)
295 {
296         struct task_pushed_data *tpd = ao2_alloc(sizeof(*tpd), NULL);
297
298         if (!tpd) {
299                 return NULL;
300         }
301         tpd->pool = pool;
302         tpd->was_empty = was_empty;
303         return tpd;
304 }
305
306 /*!
307  * \brief Activate idle threads
308  *
309  * This function always returns CMP_MATCH because all threads that this
310  * function acts on need to be seen as matches so they are unlinked from the
311  * list of idle threads.
312  *
313  * Called as an ao2_callback in the threadpool's control taskprocessor thread.
314  * \param obj The worker to activate
315  * \param arg The pool where the worker belongs
316  * \retval CMP_MATCH
317  */
318 static int activate_threads(void *obj, void *arg, int flags)
319 {
320         struct worker_thread *worker = obj;
321         struct ast_threadpool *pool = arg;
322
323         ao2_link(pool->active_threads, worker);
324         worker_set_state(worker, ALIVE);
325         return CMP_MATCH;
326 }
327
328 /*!
329  * \brief Queue task called when tasks are pushed into the threadpool
330  *
331  * This function first calls into the threadpool's listener to let it know
332  * that a task has been pushed. It then wakes up all idle threads and moves
333  * them into the active thread container.
334  * \param data A task_pushed_data
335  * \return 0
336  */
337 static int handle_task_pushed(void *data)
338 {
339         struct task_pushed_data *tpd = data;
340         struct ast_threadpool *pool = tpd->pool;
341         int was_empty = tpd->was_empty;
342
343         pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty);
344         ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
345                         activate_threads, pool);
346         ao2_ref(tpd, -1);
347         return 0;
348 }
349
350 /*!
351  * \brief Taskprocessor listener callback called when a task is added
352  *
353  * The threadpool uses this opportunity to queue a task on its control taskprocessor
354  * in order to activate idle threads and notify the threadpool listener that the
355  * task has been pushed.
356  * \param listener The taskprocessor listener. The threadpool is the listener's private data
357  * \param was_empty True if the taskprocessor was empty prior to the task being pushed
358  */
359 static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener,
360                 int was_empty)
361 {
362         struct ast_threadpool *pool = listener->private_data;
363         struct task_pushed_data *tpd = task_pushed_data_alloc(pool, was_empty);
364
365         if (!tpd) {
366                 return;
367         }
368
369         ast_taskprocessor_push(pool->control_tps, handle_task_pushed, tpd);
370 }
371
372 /*!
373  * \brief Queued task that handles the case where the threadpool's taskprocessor is emptied
374  *
375  * This simply lets the threadpool's listener know that the threadpool is devoid of tasks
376  * \param data The pool that has become empty
377  * \return 0
378  */
379 static int handle_emptied(void *data)
380 {
381         struct ast_threadpool *pool = data;
382
383         pool->listener->callbacks->emptied(pool, pool->listener);
384         return 0;
385 }
386
387 /*!
388  * \brief Taskprocessor listener emptied callback
389  *
390  * The threadpool queues a task to let the threadpool listener know that
391  * the threadpool no longer contains any tasks.
392  * \param listener The taskprocessor listener. The threadpool is the listener's private data.
393  */
394 static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener)
395 {
396         struct ast_threadpool *pool = listener->private_data;
397
398         ast_taskprocessor_push(pool->control_tps, handle_emptied, pool);
399 }
400
401 /*!
402  * \brief Taskprocessor listener shutdown callback
403  *
404  * The threadpool will shut down and destroy all of its worker threads when
405  * this is called back. By the time this gets called, the taskprocessor's
406  * control taskprocessor has already been destroyed. Therefore there is no risk
407  * in outright destroying the worker threads here.
408  * \param listener The taskprocessor listener. The threadpool is the listener's private data.
409  */
410 static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener)
411 {
412         struct ast_threadpool *pool = listener->private_data;
413
414         ao2_cleanup(pool->active_threads);
415         ao2_cleanup(pool->idle_threads);
416 }
417
418 /*!
419  * \brief Taskprocessor listener destroy callback
420  *
421  * Since the threadpool is an ao2 object, all that is necessary is to
422  * decrease the refcount. Since the control taskprocessor should already
423  * be destroyed by this point, this should be the final reference to the
424  * threadpool.
425  *
426  * \param private_data The threadpool to destroy
427  */
428 static void threadpool_destroy(void *private_data)
429 {
430         struct ast_threadpool *pool = private_data;
431         ao2_cleanup(pool);
432 }
433
434 /*!
435  * \brief Table of taskprocessor listener callbacks for threadpool's main taskprocessor
436  */
437 static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks = {
438         .alloc = threadpool_alloc,
439         .start = threadpool_tps_start,
440         .task_pushed = threadpool_tps_task_pushed,
441         .emptied = threadpool_tps_emptied,
442         .shutdown = threadpool_tps_shutdown,
443         .destroy = threadpool_destroy,
444 };
445
446 /*!
447  * \brief Add threads to the threadpool
448  *
449  * This function is called from the threadpool's control taskprocessor thread.
450  * \param pool The pool that is expanding
451  * \delta The number of threads to add to the pool
452  */
453 static void grow(struct ast_threadpool *pool, int delta)
454 {
455         int i;
456         for (i = 0; i < delta; ++i) {
457                 struct worker_thread *worker = worker_thread_alloc(pool);
458                 if (!worker) {
459                         return;
460                 }
461                 ao2_link(pool->active_threads, worker);
462         }
463 }
464
465 /*!
466  * \brief ao2 callback to kill a set number of threads.
467  *
468  * Threads will be unlinked from the container as long as the
469  * counter has not reached zero. The counter is decremented with
470  * each thread that is removed.
471  * \param obj The worker thread up for possible destruction
472  * \param arg The counter
473  * \param flags Unused
474  * \retval CMP_MATCH The counter has not reached zero, so this flag should be removed.
475  * \retval CMP_STOP The counter has reached zero so no more threads should be removed.
476  */
477 static int kill_threads(void *obj, void *arg, int flags)
478 {
479         int *num_to_kill = arg;
480
481         if ((*num_to_kill)-- > 0) {
482                 return CMP_MATCH;
483         } else {
484                 return CMP_STOP;
485         }
486 }
487
488 /*!
489  * \brief Remove threads from the threadpool
490  *
491  * The preference is to kill idle threads. However, if there are
492  * more threads to remove than there are idle threads, then active
493  * threads will be removed too.
494  *
495  * This function is called from the threadpool control taskprocessor thread.
496  *
497  * \param pool The threadpool to remove threads from
498  * \param delta The number of threads to remove
499  */
500 static void shrink(struct ast_threadpool *pool, int delta)
501 {
502         int idle_threads = ao2_container_count(pool->idle_threads);
503         int idle_threads_to_kill = MIN(delta, idle_threads);
504         int active_threads_to_kill = delta - idle_threads_to_kill;
505
506         ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK,
507                         kill_threads, &idle_threads_to_kill);
508
509         ao2_callback(pool->active_threads, OBJ_UNLINK | OBJ_NOLOCK,
510                         kill_threads, &active_threads_to_kill);
511 }
512
513 /*!
514  * \brief Helper struct used for queued operations that change the size of the threadpool
515  */
516 struct set_size_data {
517         /*! The pool whose size is to change */
518         struct ast_threadpool *pool;
519         /*! The requested new size of the pool */
520         unsigned int size;
521 };
522
523 /*!
524  * \brief Allocate and initialize a set_size_data
525  * \param pool The pool for the set_size_data
526  * \param size The size to store in the set_size_data
527  */
528 static struct set_size_data *set_size_data_alloc(struct ast_threadpool *pool,
529                 unsigned int size)
530 {
531         struct set_size_data *ssd = ao2_alloc(sizeof(*ssd), NULL);
532         if (!ssd) {
533                 return NULL;
534         }
535
536         ssd->pool = pool;
537         ssd->size = size;
538         return ssd;
539 }
540
541 /*!
542  * \brief Change the size of the threadpool
543  *
544  * This can either result in shrinking or growing the threadpool depending
545  * on the new desired size and the current size.
546  *
547  * This function is run from the threadpool control taskprocessor thread
548  *
549  * \param data A set_size_data used for determining how to act
550  * \return 0
551  */
552 static int queued_set_size(void *data)
553 {
554         struct set_size_data *ssd = data;
555         struct ast_threadpool *pool = ssd->pool;
556         unsigned int new_size = ssd->size;
557         unsigned int current_size = ao2_container_count(pool->active_threads) +
558                 ao2_container_count(pool->idle_threads);
559
560         if (current_size == new_size) {
561                 ast_log(LOG_NOTICE, "Not changing threadpool size since new size %u is the same as current %u\n",
562                                 new_size, current_size);
563                 return 0;
564         }
565
566         if (current_size < new_size) {
567                 grow(pool, new_size - current_size);
568         } else {
569                 shrink(pool, current_size - new_size);
570         }
571
572         threadpool_send_state_changed(pool);
573         ao2_ref(ssd, -1);
574         return 0;
575 }
576
577 void ast_threadpool_set_size(struct ast_threadpool *pool, unsigned int size)
578 {
579         struct set_size_data *ssd;
580
581         ssd = set_size_data_alloc(pool, size);
582         if (!ssd) {
583                 return;
584         }
585
586         ast_taskprocessor_push(pool->control_tps, queued_set_size, ssd);
587 }
588
589 static void listener_destructor(void *obj)
590 {
591         struct ast_threadpool_listener *listener = obj;
592
593         listener->callbacks->destroy(listener->private_data);
594 }
595
596 struct ast_threadpool_listener *ast_threadpool_listener_alloc(
597                 const struct ast_threadpool_listener_callbacks *callbacks)
598 {
599         struct ast_threadpool_listener *listener = ao2_alloc(sizeof(*listener), listener_destructor);
600         if (!listener) {
601                 return NULL;
602         }
603         listener->callbacks = callbacks;
604         listener->private_data = listener->callbacks->alloc(listener);
605         if (!listener->private_data) {
606                 ao2_ref(listener, -1);
607                 return NULL;
608         }
609         return listener;
610 }
611
612 struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *listener, int initial_size)
613 {
614         struct ast_threadpool *pool;
615         struct ast_taskprocessor *tps;
616         RAII_VAR(struct ast_taskprocessor_listener *, tps_listener,
617                         ast_taskprocessor_listener_alloc(&threadpool_tps_listener_callbacks),
618                         ao2_cleanup);
619
620         if (!tps_listener) {
621                 return NULL;
622         }
623
624         tps = ast_taskprocessor_create_with_listener("XXX CHANGE THIS XXX", tps_listener);
625
626         if (!tps) {
627                 return NULL;
628         }
629
630         pool = tps_listener->private_data;
631         pool->tps = tps;
632         ast_log(LOG_NOTICE, "The taskprocessor I've created is located at %p\n", pool->tps);
633         ao2_ref(listener, +1);
634         pool->listener = listener;
635         ast_threadpool_set_size(pool, initial_size);
636         return pool;
637 }
638
639 int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data)
640 {
641         return ast_taskprocessor_push(pool->tps, task, data);
642 }
643
644 void ast_threadpool_shutdown(struct ast_threadpool *pool)
645 {
646         /* Shut down the taskprocessors and everything else just
647          * takes care of itself via the taskprocessor callbacks
648          */
649         ast_taskprocessor_unreference(pool->control_tps);
650         ast_taskprocessor_unreference(pool->tps);
651 }
652
653 /*!
654  * A thread that executes threadpool tasks
655  */
656 struct worker_thread {
657         /*! A unique (within a run of Asterisk) ID for the thread. Used for hashing and searching */
658         int id;
659         /*! Condition used in conjunction with state changes */
660         ast_cond_t cond;
661         /*! Lock used alongside the condition for state changes */
662         ast_mutex_t lock;
663         /*! The actual thread that is executing tasks */
664         pthread_t thread;
665         /*! A pointer to the threadpool. Needed to be able to execute tasks */
666         struct ast_threadpool *pool;
667         /*! The current state of the worker thread */
668         enum worker_state state;
669         /*! A boolean used to determine if an idle thread should become active */
670         int wake_up;
671 };
672
673 /*!
674  * A monotonically increasing integer used for worker
675  * thread identification.
676  */
677 static int worker_id_counter;
678
679 static int worker_thread_hash(const void *obj, int flags)
680 {
681         const struct worker_thread *worker = obj;
682
683         return worker->id;
684 }
685
686 static int worker_thread_cmp(void *obj, void *arg, int flags)
687 {
688         struct worker_thread *worker1 = obj;
689         struct worker_thread *worker2 = arg;
690
691         return worker1->id == worker2->id ? CMP_MATCH : 0;
692 }
693
694 /*!
695  * \brief shut a worker thread down
696  *
697  * Set the worker dead and then wait for its thread
698  * to finish executing.
699  *
700  * \param worker The worker thread to shut down
701  */
702 static void worker_shutdown(struct worker_thread *worker)
703 {
704         worker_set_state(worker, DEAD);
705         if (worker->thread != AST_PTHREADT_NULL) {
706                 pthread_join(worker->thread, NULL);
707                 worker->thread = AST_PTHREADT_NULL;
708         }
709 }
710
711 /*!
712  * \brief Worker thread destructor
713  *
714  * Called automatically when refcount reaches 0. Shuts
715  * down the worker thread and destroys its component
716  * parts
717  */
718 static void worker_thread_destroy(void *obj)
719 {
720         struct worker_thread *worker = obj;
721         worker_shutdown(worker);
722         ast_mutex_destroy(&worker->lock);
723         ast_cond_destroy(&worker->cond);
724 }
725
726 /*!
727  * \brief start point for worker threads
728  *
729  * Worker threads start in the active state but may
730  * immediately go idle if there is no work to be
731  * done
732  *
733  * \param arg The worker thread
734  * \retval NULL
735  */
736 static void *worker_start(void *arg)
737 {
738         struct worker_thread *worker = arg;
739
740         worker_active(worker);
741         return NULL;
742 }
743
744 /*!
745  * \brief Allocate and initialize a new worker thread
746  *
747  * This will create, initialize, and start the thread.
748  *
749  * \param pool The threadpool to which the worker will be added
750  * \retval NULL Failed to allocate or start the worker thread
751  * \retval non-NULL The newly-created worker thread
752  */
753 static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool)
754 {
755         struct worker_thread *worker = ao2_alloc(sizeof(*worker), worker_thread_destroy);
756         if (!worker) {
757                 return NULL;
758         }
759         worker->id = ast_atomic_fetchadd_int(&worker_id_counter, 1);
760         ast_mutex_init(&worker->lock);
761         ast_cond_init(&worker->cond, NULL);
762         worker->pool = pool;
763         worker->thread = AST_PTHREADT_NULL;
764         worker->state = ALIVE;
765         if (ast_pthread_create(&worker->thread, NULL, worker_start, worker) < 0) {
766                 ast_log(LOG_ERROR, "Unable to start worker thread!\n");
767                 ao2_ref(worker, -1);
768                 return NULL;
769         }
770         return worker;
771 }
772
773 /*!
774  * \brief Active loop for worker threads
775  *
776  * The worker will stay in this loop for its lifetime,
777  * executing tasks as they become available. If there
778  * are no tasks currently available, then the thread
779  * will go idle.
780  *
781  * \param worker The worker thread executing tasks.
782  */
783 static void worker_active(struct worker_thread *worker)
784 {
785         int alive = 1;
786         while (alive) {
787                 if (threadpool_execute(worker->pool) == 0) {
788                         alive = worker_idle(worker);
789                 }
790         }
791 }
792
793 /*!
794  * \brief Idle function for worker threads
795  *
796  * The worker waits here until it gets told by the threadpool
797  * to wake up.
798  *
799  * \param worker The idle worker
800  * \retval 0 The thread is being woken up so that it can conclude.
801  * \retval non-zero The thread is being woken up to do more work.
802  */
803 static int worker_idle(struct worker_thread *worker)
804 {
805         SCOPED_MUTEX(lock, &worker->lock);
806         if (worker->state != ALIVE) {
807                 return 0;
808         }
809         threadpool_active_thread_idle(worker->pool, worker);
810         while (!worker->wake_up) {
811                 ast_cond_wait(&worker->cond, lock);
812         }
813         worker->wake_up = 0;
814         return worker->state == ALIVE;
815 }
816
817 /*!
818  * \brief Change a worker's state
819  *
820  * The threadpool calls into this function in order to let a worker know
821  * how it should proceed.
822  */
823 static void worker_set_state(struct worker_thread *worker, enum worker_state state)
824 {
825         SCOPED_MUTEX(lock, &worker->lock);
826         worker->state = state;
827         worker->wake_up = 1;
828         ast_cond_signal(&worker->cond);
829 }
830