1c2e9634e5754a15b0bb3876905ff791e8468963
[asterisk/asterisk.git] / main / threadpool.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2012, Digium, Inc.
5  *
6  * Mark Michelson <mmmichelson@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19
20 #include "asterisk.h"
21
22 #include "asterisk/threadpool.h"
23 #include "asterisk/taskprocessor.h"
24 #include "asterisk/astobj2.h"
25 #include "asterisk/utils.h"
26
27 #define THREAD_BUCKETS 89
28
29 /*!
30  * \brief An opaque threadpool structure
31  *
32  * A threadpool is a collection of threads that execute
33  * tasks from a common queue.
34  */
35 struct ast_threadpool {
36         /*! Threadpool listener */
37         struct ast_threadpool_listener *listener;
38         /*! 
39          * \brief The container of active threads.
40          * Active threads are those that are currently running tasks
41          */
42         struct ao2_container *active_threads;
43         /*! 
44          * \brief The container of idle threads.
45          * Idle threads are those that are currenly waiting to run tasks
46          */
47         struct ao2_container *idle_threads;
48         /*! 
49          * \brief The container of zombie threads.
50          * Zombie threads may be running tasks, but they are scheduled to die soon
51          */
52         struct ao2_container *zombie_threads;
53         /*! 
54          * \brief The main taskprocessor
55          * 
56          * Tasks that are queued in this taskprocessor are
57          * doled out to the worker threads. Worker threads that
58          * execute tasks from the threadpool are executing tasks
59          * in this taskprocessor.
60          *
61          * The threadpool itself is actually the private data for
62          * this taskprocessor's listener. This way, as taskprocessor
63          * changes occur, the threadpool can alert its listeners
64          * appropriately.
65          */
66         struct ast_taskprocessor *tps;
67         /*!
68          * \brief The control taskprocessor
69          *
70          * This is a standard taskprocessor that uses the default
71          * taskprocessor listener. In other words, all tasks queued to
72          * this taskprocessor have a single thread that executes the
73          * tasks.
74          *
75          * All tasks that modify the state of the threadpool and all tasks
76          * that call out to threadpool listeners are pushed to this
77          * taskprocessor.
78          *
79          * For instance, when the threadpool changes sizes, a task is put
80          * into this taskprocessor to do so. When it comes time to tell the
81          * threadpool listener that worker threads have changed state,
82          * the task is placed in this taskprocessor.
83          *
84          * This is done for three main reasons
85          * 1) It ensures that listeners are given an accurate portrayal
86          * of the threadpool's current state. In other words, when a listener
87          * gets told a count of active, idle and zombie threads, it does not
88          * need to worry that internal state of the threadpool might be different
89          * from what it has been told.
90          * 2) It minimizes the locking required in both the threadpool and in
91          * threadpool listener's callbacks.
92          * 3) It ensures that listener callbacks are called in the same order
93          * that the threadpool had its state change.
94          */
95         struct ast_taskprocessor *control_tps;
96 };
97
98 /*!
99  * \brief states for worker threads
100  */
101 enum worker_state {
102         /*! The worker is either active or idle */
103         ALIVE,
104         /*!
105          * The worker has been asked to shut down but
106          * may still be in the process of executing tasks.
107          * This transition happens when the threadpool needs
108          * to shrink and needs to kill active threads in order
109          * to do so.
110          */
111         ZOMBIE,
112         /*!
113          * The worker has been asked to shut down. Typically
114          * only idle threads go to this state directly, but
115          * active threads may go straight to this state when
116          * the threadpool is shut down.
117          */
118         DEAD,
119 };
120
121 /* Worker thread forward declarations. See definitions for documentation */
122 struct worker_thread;
123 static int worker_thread_hash(const void *obj, int flags);
124 static int worker_thread_cmp(void *obj, void *arg, int flags);
125 static void worker_thread_destroy(void *obj);
126 static void worker_active(struct worker_thread *worker);
127 static void *worker_start(void *arg);
128 static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool);
129 static int worker_idle(struct worker_thread *worker);
130 static void worker_set_state(struct worker_thread *worker, enum worker_state state);
131 static int worker_shutdown(void *obj, void *arg, int flags);
132
133 /*!
134  * \brief Notify the threadpool listener that the state has changed.
135  *
136  * This notifies the threadpool listener via its state_changed callback.
137  * \param pool The threadpool whose state has changed
138  */
139 static void threadpool_send_state_changed(struct ast_threadpool *pool)
140 {
141         int active_size = ao2_container_count(pool->active_threads);
142         int idle_size = ao2_container_count(pool->idle_threads);
143         int zombie_size = ao2_container_count(pool->zombie_threads);
144
145         pool->listener->callbacks->state_changed(pool->listener, active_size, idle_size, zombie_size);
146 }
147
148 /*!
149  * \brief Struct used for queued operations involving worker state changes
150  */
151 struct thread_worker_pair {
152         /*! Threadpool that contains the worker whose state has changed */
153         struct ast_threadpool *pool;
154         /*! Worker whose state has changed */
155         struct worker_thread *worker;
156 };
157
158 /*!
159  * \brief Destructor for thread_worker_pair
160  */
161 static void thread_worker_pair_destructor(void *obj)
162 {
163         struct thread_worker_pair *pair = obj;
164         ao2_ref(pair->pool, -1);
165         ao2_ref(pair->worker, -1);
166 }
167
168 /*!
169  * \brief Allocate and initialize a thread_worker_pair
170  * \param pool Threadpool to assign to the thread_worker_pair
171  * \param worker Worker thread to assign to the thread_worker_pair
172  */
173 static struct thread_worker_pair *thread_worker_pair_alloc(struct ast_threadpool *pool,
174                 struct worker_thread *worker)
175 {
176         struct thread_worker_pair *pair = ao2_alloc(sizeof(*pair), thread_worker_pair_destructor);
177         if (!pair) {
178                 return NULL;
179         }
180         ao2_ref(pool, +1);
181         pair->pool = pool;
182         ao2_ref(worker, +1);
183         pair->worker = worker;
184         return pair;
185 }
186
187 /*!
188  * \brief Move a worker thread from the active container to the idle container.
189  *
190  * This function is called from the threadpool's control taskprocessor thread.
191  * \param data A thread_worker_pair containing the threadpool and the worker to move.
192  * \return 0
193  */
194 static int queued_active_thread_idle(void *data)
195 {
196         struct thread_worker_pair *pair = data;
197
198         ao2_link(pair->pool->idle_threads, pair->worker);
199         ao2_unlink(pair->pool->active_threads, pair->worker);
200
201         threadpool_send_state_changed(pair->pool);
202
203         ao2_ref(pair, -1);
204         return 0;
205 }
206
207 /*!
208  * \brief Queue a task to move a thread from the active list to the idle list
209  *
210  * This is called by a worker thread when it runs out of tasks to perform and
211  * goes idle.
212  * \param pool The threadpool to which the worker belongs
213  * \param worker The worker thread that has gone idle
214  */
215 static void threadpool_active_thread_idle(struct ast_threadpool *pool,
216                 struct worker_thread *worker)
217 {
218         struct thread_worker_pair *pair = thread_worker_pair_alloc(pool, worker);
219         if (!pair) {
220                 return;
221         }
222         ast_taskprocessor_push(pool->control_tps, queued_active_thread_idle, pair);
223 }
224
225 /*!
226  * \brief Kill a zombie thread
227  *
228  * This runs from the threadpool's control taskprocessor thread.
229  *
230  * \param data A thread_worker_pair containing the threadpool and the zombie thread
231  * \return 0
232  */
233 static int queued_zombie_thread_dead(void *data)
234 {
235         struct thread_worker_pair *pair = data;
236
237         ao2_unlink(pair->pool, pair->worker);
238         threadpool_send_state_changed(pair->pool);
239
240         ao2_ref(pair, -1);
241         return 0;
242 }
243
244 /*!
245  * \brief Queue a task to kill a zombie thread
246  *
247  * This is called by a worker thread when it acknowledges that it is time for
248  * it to die.
249  */
250 static void threadpool_zombie_thread_dead(struct ast_threadpool *pool,
251                 struct worker_thread *worker)
252 {
253         struct thread_worker_pair *pair = thread_worker_pair_alloc(pool, worker);
254         if (!pair) {
255                 return;
256         }
257         ast_taskprocessor_push(pool->control_tps, queued_zombie_thread_dead, pair);
258 }
259
260 /*!
261  * \brief Execute a task in the threadpool
262  * 
263  * This is the function that worker threads call in order to execute tasks
264  * in the threadpool
265  *
266  * \param pool The pool to which the tasks belong.
267  * \retval 0 Either the pool has been shut down or there are no tasks.
268  * \retval 1 There are still tasks remaining in the pool.
269  */
270 static int threadpool_execute(struct ast_threadpool *pool)
271 {
272         return ast_taskprocessor_execute(pool->tps);
273 }
274
275 /*!
276  * \brief Destroy a threadpool's components.
277  *
278  * This is the destructor called automatically when the threadpool's
279  * reference count reaches zero. This is not to be confused with
280  * threadpool_destroy.
281  *
282  * By the time this actually gets called, most of the cleanup has already
283  * been done in the pool. The only thing left to do is to release the
284  * final reference to the threadpool listener.
285  *
286  * \param obj The pool to destroy
287  */
288 static void threadpool_destructor(void *obj)
289 {
290         struct ast_threadpool *pool = obj;
291         /* XXX Probably should let the listener know we're being destroyed? */
292
293         /* Threads should all be shut down by now, so this should be a painless
294          * operation
295          */
296         ao2_cleanup(pool->listener);
297 }
298
299 /*
300  * \brief Allocate a threadpool
301  *
302  * This is implemented as a taskprocessor listener's alloc callback. This
303  * is because the threadpool exists as the private data on a taskprocessor
304  * listener.
305  *
306  * \param listener The taskprocessor listener where the threadpool will live.
307  * \retval NULL Could not initialize threadpool properly
308  * \retval non-NULL The newly-allocated threadpool
309  */
310 static void *threadpool_alloc(struct ast_taskprocessor_listener *listener)
311 {
312         RAII_VAR(struct ast_threadpool *, pool,
313                         ao2_alloc(sizeof(*pool), threadpool_destructor), ao2_cleanup);
314
315         pool->control_tps = ast_taskprocessor_get("CHANGE THIS", TPS_REF_DEFAULT);
316         if (!pool->control_tps) {
317                 return NULL;
318         }
319         pool->active_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
320         if (!pool->active_threads) {
321                 return NULL;
322         }
323         pool->idle_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
324         if (!pool->idle_threads) {
325                 return NULL;
326         }
327         pool->zombie_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
328         if (!pool->zombie_threads) {
329                 return NULL;
330         }
331
332         pool->tps = listener->tps;
333
334         ao2_ref(pool, +1);
335         return pool;
336 }
337
338 /*!
339  * \brief helper used for queued task when tasks are pushed
340  */
341 struct task_pushed_data {
342         /*! Pool into which a task was pushed */
343         struct ast_threadpool *pool;
344         /*! Indicator of whether the pool had no tasks prior to the new task being added */
345         int was_empty;
346 };
347
348 /*!
349  * \brief Destructor for task_pushed_data
350  */
351 static void task_pushed_data_destroy(void *obj)
352 {
353         struct task_pushed_data *tpd = obj;
354         ao2_ref(tpd->pool, -1);
355 }
356
357 /*!
358  * \brief Allocate and initialize a task_pushed_data
359  * \param pool The threadpool to set in the task_pushed_data
360  * \param was_empty The was_empty value to set in the task_pushed_data
361  * \retval NULL Unable to allocate task_pushed_data
362  * \retval non-NULL The newly-allocated task_pushed_data
363  */
364 static struct task_pushed_data *task_pushed_data_alloc(struct ast_threadpool *pool,
365                 int was_empty)
366 {
367         struct task_pushed_data *tpd = ao2_alloc(sizeof(*tpd),
368                         task_pushed_data_destroy);
369
370         if (!tpd) {
371                 return NULL;
372         }
373         ao2_ref(pool, +1);
374         tpd->pool = pool;
375         tpd->was_empty = was_empty;
376         return tpd;
377 }
378
379 /*!
380  * \brief Activate idle threads
381  *
382  * This function always returns CMP_MATCH because all threads that this
383  * function acts on need to be seen as matches so they are unlinked from the
384  * list of idle threads.
385  *
386  * Called as an ao2_callback in the threadpool's control taskprocessor thread.
387  * \param obj The worker to activate
388  * \param arg The pool where the worker belongs
389  * \retval CMP_MATCH
390  */
391 static int activate_threads(void *obj, void *arg, int flags)
392 {
393         struct worker_thread *worker = obj;
394         struct ast_threadpool *pool = arg;
395
396         ao2_link(pool->active_threads, worker);
397         worker_set_state(worker, ALIVE);
398         return CMP_MATCH;
399 }
400
401 /*!
402  * \brief Queue task called when tasks are pushed into the threadpool
403  *
404  * This function first calls into the threadpool's listener to let it know
405  * that a task has been pushed. It then wakes up all idle threads and moves
406  * them into the active thread container.
407  * \param data A task_pushed_data
408  * \return 0
409  */
410 static int handle_task_pushed(void *data)
411 {
412         struct task_pushed_data *tpd = data;
413         struct ast_threadpool *pool = tpd->pool;
414         int was_empty = tpd->was_empty;
415
416         pool->listener->callbacks->tps_task_pushed(pool->listener, was_empty);
417         ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
418                         activate_threads, pool);
419         ao2_ref(tpd, -1);
420         return 0;
421 }
422
423 /*!
424  * \brief Taskprocessor listener callback called when a task is added
425  *
426  * The threadpool uses this opportunity to queue a task on its control taskprocessor
427  * in order to activate idle threads and notify the threadpool listener that the
428  * task has been pushed.
429  * \param listener The taskprocessor listener. The threadpool is the listener's private data
430  * \param was_empty True if the taskprocessor was empty prior to the task being pushed
431  */
432 static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener,
433                 int was_empty)
434 {
435         struct ast_threadpool *pool = listener->private_data;
436         struct task_pushed_data *tpd = task_pushed_data_alloc(pool, was_empty);
437
438         if (!tpd) {
439                 return;
440         }
441
442         ast_taskprocessor_push(pool->control_tps, handle_task_pushed, tpd);
443 }
444
445 /*!
446  * \brief Queued task that handles the case where the threadpool's taskprocessor is emptied
447  *
448  * This simply lets the threadpool's listener know that the threadpool is devoid of tasks
449  * \param data The pool that has become empty
450  * \return 0
451  */
452 static int handle_emptied(void *data)
453 {
454         struct ast_threadpool *pool = data;
455
456         pool->listener->callbacks->emptied(pool->listener);
457         ao2_ref(pool, -1);
458         return 0;
459 }
460
461 /*!
462  * \brief Taskprocessor listener emptied callback
463  *
464  * The threadpool queues a task to let the threadpool listener know that
465  * the threadpool no longer contains any tasks.
466  * \param listener The taskprocessor listener. The threadpool is the listener's private data.
467  */
468 static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener)
469 {
470         struct ast_threadpool *pool = listener->private_data;
471
472         ao2_ref(pool, +1);
473         ast_taskprocessor_push(pool->control_tps, handle_emptied, pool);
474 }
475
476 /*!
477  * \brief Taskprocessor listener shutdown callback
478  *
479  * The threadpool will shut down and destroy all of its worker threads when
480  * this is called back. By the time this gets called, the taskprocessor's
481  * control taskprocessor has already been destroyed. Therefore there is no risk
482  * in outright destroying the worker threads here.
483  * \param listener The taskprocessor listener. The threadpool is the listener's private data.
484  */
485 static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener)
486 {
487         struct ast_threadpool *pool = listener->private_data;
488         int flags = (OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE);
489
490         ao2_cleanup(pool->active_threads);
491         ao2_cleanup(pool->idle_threads);
492         ao2_cleanup(pool->zombie_threads);
493 }
494
495 /*!
496  * \brief Taskprocessor listener destroy callback
497  *
498  * Since the threadpool is an ao2 object, all that is necessary is to
499  * decrease the refcount. Since the control taskprocessor should already
500  * be destroyed by this point, this should be the final reference to the
501  * threadpool.
502  *
503  * \param private_data The threadpool to destroy
504  */
505 static void threadpool_destroy(void *private_data)
506 {
507         struct ast_threadpool *pool = private_data;
508         ao2_cleanup(pool);
509 }
510
511 /*!
512  * \brief Table of taskprocessor listener callbacks for threadpool's main taskprocessor
513  */
514 static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks = {
515         .alloc = threadpool_alloc,
516         .task_pushed = threadpool_tps_task_pushed,
517         .emptied = threadpool_tps_emptied,
518         .shutdown = threadpool_tps_shutdown,
519         .destroy = threadpool_destroy,
520 };
521
522 /*!
523  * \brief Add threads to the threadpool
524  *
525  * This function is called from the threadpool's control taskprocessor thread.
526  * \param pool The pool that is expanding
527  * \delta The number of threads to add to the pool
528  */
529 static void grow(struct ast_threadpool *pool, int delta)
530 {
531         int i;
532         for (i = 0; i < delta; ++i) {
533                 struct worker_thread *worker = worker_thread_alloc(pool);
534                 if (!worker) {
535                         return;
536                 }
537                 ao2_link(pool->active_threads, worker);
538         }
539 }
540
541 /*!
542  * \brief ao2 callback to kill a set number of threads.
543  *
544  * Threads will be unlinked from the container as long as the
545  * counter has not reached zero. The counter is decremented with
546  * each thread that is removed.
547  * \param obj The worker thread up for possible destruction
548  * \param arg The counter
549  * \param flags Unused
550  * \retval CMP_MATCH The counter has not reached zero, so this flag should be removed.
551  * \retval CMP_STOP The counter has reached zero so no more threads should be removed.
552  */
553 static int kill_threads(void *obj, void *arg, int flags)
554 {
555         struct worker_thread *worker = obj;
556         int *num_to_kill = arg;
557
558         if ((*num_to_kill)-- > 0) {
559                 return CMP_MATCH;
560         } else {
561                 return CMP_STOP;
562         }
563 }
564
565 /*!
566  * \brief ao2 callback to zombify a set number of threads.
567  *
568  * Threads will be zombified as long as as the counter has not reached
569  * zero. The counter is decremented with each thread that is zombified.
570  *
571  * Zombifying a thread involves removing it from its current container,
572  * adding it to the zombie container, and changing the state of the
573  * worker to a zombie
574  *
575  * This callback is called from the threadpool control taskprocessor thread.
576  *
577  * \param obj The worker thread that may be zombified
578  * \param arg The pool to which the worker belongs
579  * \param data The counter
580  * \param flags Unused
581  * \retval CMP_MATCH The zombified thread should be removed from its current container
582  * \retval CMP_STOP Stop attempting to zombify threads
583  */
584 static int zombify_threads(void *obj, void *arg, void *data, int flags)
585 {
586         struct worker_thread *worker = obj;
587         struct ast_threadpool *pool = arg;
588         int *num_to_zombify = data;
589
590         if ((*num_to_zombify)-- > 0) {
591                 ao2_link(pool->zombie_threads, worker);
592                 worker_set_state(worker, ZOMBIE);
593                 return CMP_MATCH;
594         } else {
595                 return CMP_STOP;
596         }
597 }
598
599 /*!
600  * \brief Remove threads from the threadpool
601  *
602  * The preference is to kill idle threads. However, if there are
603  * more threads to remove than there are idle threads, then active
604  * threads will be zombified instead.
605  *
606  * This function is called from the threadpool control taskprocessor thread.
607  *
608  * \param pool The threadpool to remove threads from
609  * \param delta The number of threads to remove
610  */
611 static void shrink(struct ast_threadpool *pool, int delta)
612 {
613         /* 
614          * Preference is to kill idle threads, but
615          * we'll move on to deactivating active threads
616          * if we have to
617          */
618         int idle_threads = ao2_container_count(pool->idle_threads);
619         int idle_threads_to_kill = MIN(delta, idle_threads);
620         int active_threads_to_zombify = delta - idle_threads_to_kill;
621
622         ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK,
623                         kill_threads, &idle_threads_to_kill);
624
625         ao2_callback_data(pool->active_threads, OBJ_UNLINK | OBJ_NOLOCK,
626                         zombify_threads, pool, &active_threads_to_zombify);
627 }
628
629 /*!
630  * \brief Helper struct used for queued operations that change the size of the threadpool
631  */
632 struct set_size_data {
633         /*! The pool whose size is to change */
634         struct ast_threadpool *pool;
635         /*! The requested new size of the pool */
636         unsigned int size;
637 };
638
639 /*!
640  * \brief Destructor for set_size_data
641  * \param obj The set_size_data to destroy
642  */
643 static void set_size_data_destroy(void *obj)
644 {
645         struct set_size_data *ssd = obj;
646         ao2_ref(ssd->pool, -1);
647 }
648
649 /*!
650  * \brief Allocate and initialize a set_size_data
651  * \param pool The pool for the set_size_data
652  * \param size The size to store in the set_size_data
653  */
654 static struct set_size_data *set_size_data_alloc(struct ast_threadpool *pool,
655                 unsigned int size)
656 {
657         struct set_size_data *ssd = ao2_alloc(sizeof(*ssd), set_size_data_destroy);
658         if (!ssd) {
659                 return NULL;
660         }
661
662         ao2_ref(pool, +1);
663         ssd->pool = pool;
664         ssd->size = size;
665         return ssd;
666 }
667
668 /*!
669  * \brief Change the size of the threadpool
670  *
671  * This can either result in shrinking or growing the threadpool depending
672  * on the new desired size and the current size.
673  *
674  * This function is run from the threadpool control taskprocessor thread
675  *
676  * \param data A set_size_data used for determining how to act
677  * \return 0
678  */
679 static int queued_set_size(void *data)
680 {
681         struct set_size_data *ssd = data;
682         struct ast_threadpool *pool = ssd->pool;
683         unsigned int num_threads = ssd->size;
684
685         /* We don't count zombie threads as being "live when potentially resizing */
686         unsigned int current_size = ao2_container_count(pool->active_threads) +
687                 ao2_container_count(pool->idle_threads);
688
689         if (current_size == num_threads) {
690                 ast_log(LOG_NOTICE, "Not changing threadpool size since new size %u is the same as current %u\n",
691                                 num_threads, current_size);
692                 return 0;
693         }
694
695         if (current_size < num_threads) {
696                 grow(pool, num_threads - current_size);
697         } else {
698                 shrink(pool, current_size - num_threads);
699         }
700
701         threadpool_send_state_changed(pool);
702         ao2_ref(ssd, -1);
703         return 0;
704 }
705
706 void ast_threadpool_set_size(struct ast_threadpool *pool, unsigned int size)
707 {
708         struct set_size_data *ssd;
709
710         ssd = set_size_data_alloc(pool, size);
711         if (!ssd) {
712                 return;
713         }
714
715         ast_taskprocessor_push(pool->control_tps, queued_set_size, ssd);
716 }
717
718 struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *listener, int initial_size)
719 {
720         struct ast_threadpool *pool;
721         struct ast_taskprocessor *tps;
722         RAII_VAR(struct ast_taskprocessor_listener *, tps_listener,
723                         ast_taskprocessor_listener_alloc(&threadpool_tps_listener_callbacks),
724                         ao2_cleanup);
725
726         if (!tps_listener) {
727                 return NULL;
728         }
729
730         tps = ast_taskprocessor_create_with_listener("XXX CHANGE THIS XXX", tps_listener);
731
732         if (!tps) {
733                 return NULL;
734         }
735
736         pool = tps_listener->private_data;
737         ast_threadpool_set_size(pool, initial_size);
738         return pool;
739 }
740
741 int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data)
742 {
743         return ast_taskprocessor_push(pool->tps, task, data);
744 }
745
746 void ast_threadpool_shutdown(struct ast_threadpool *pool)
747 {
748         /* Shut down the taskprocessors and everything else just
749          * takes care of itself via the taskprocessor callbacks
750          */
751         ast_taskprocessor_unreference(pool->control_tps);
752         ast_taskprocessor_unreference(pool->tps);
753 }
754
755 /*!
756  * A thread that executes threadpool tasks
757  */
758 struct worker_thread {
759         /*! A unique (within a run of Asterisk) ID for the thread. Used for hashing and searching */
760         int id;
761         /*! Condition used in conjunction with state changes */
762         ast_cond_t cond;
763         /*! Lock used alongside the condition for state changes */
764         ast_mutex_t lock;
765         /*! The actual thread that is executing tasks */
766         pthread_t thread;
767         /*! A pointer to the threadpool. Needed to be able to execute tasks */
768         struct ast_threadpool *pool;
769         /*! The current state of the worker thread */
770         enum worker_state state;
771         /*! A boolean used to determine if an idle thread should become active */
772         int wake_up;
773 };
774
775 /*!
776  * A monotonically increasing integer used for worker
777  * thread identification.
778  */
779 static int worker_id_counter;
780
781 static int worker_thread_hash(const void *obj, int flags)
782 {
783         const struct worker_thread *worker = obj;
784
785         return worker->id;
786 }
787
788 static int worker_thread_cmp(void *obj, void *arg, int flags)
789 {
790         struct worker_thread *worker1 = obj;
791         struct worker_thread *worker2 = arg;
792
793         return worker1->id == worker2->id ? CMP_MATCH : 0;
794 }
795
796 /*!
797  * \brief shut a worker thread down
798  *
799  * Set the worker dead and then wait for its thread
800  * to finish executing.
801  *
802  * \param worker The worker thread to shut down
803  */
804 static void worker_shutdown(struct worker_thread *worker)
805 {
806         struct worker_thread *worker = obj;
807
808         worker_set_state(worker, DEAD);
809         if (worker->thread != AST_PTHREADT_NULL) {
810                 pthread_join(worker->thread, NULL);
811                 worker->thread = AST_PTHREADT_NULL;
812         }
813 }
814
815 /*!
816  * \brief Worker thread destructor
817  *
818  * Called automatically when refcount reaches 0. Shuts
819  * down the worker thread and destroys its component
820  * parts
821  */
822 static void worker_thread_destroy(void *obj)
823 {
824         struct worker_thread *worker = obj;
825         worker_shutdown(worker);
826         ast_mutex_destroy(&worker->lock);
827         ast_cond_destroy(&worker->cond);
828 }
829
830 /*!
831  * \brief start point for worker threads
832  *
833  * Worker threads start in the active state but may
834  * immediately go idle if there is no work to be
835  * done
836  *
837  * \param arg The worker thread
838  * \retval NULL
839  */
840 static void *worker_start(void *arg)
841 {
842         struct worker_thread *worker = arg;
843
844         worker_active(worker);
845         return NULL;
846 }
847
848 /*!
849  * \brief Allocate and initialize a new worker thread
850  *
851  * This will create, initialize, and start the thread.
852  *
853  * \param pool The threadpool to which the worker will be added
854  * \retval NULL Failed to allocate or start the worker thread
855  * \retval non-NULL The newly-created worker thread
856  */
857 static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool)
858 {
859         struct worker_thread *worker = ao2_alloc(sizeof(*worker), worker_thread_destroy);
860         if (!worker) {
861                 return NULL;
862         }
863         worker->id = ast_atomic_fetchadd_int(&worker_id_counter, 1);
864         ast_mutex_init(&worker->lock);
865         ast_cond_init(&worker->cond, NULL);
866         worker->pool = pool;
867         worker->thread = AST_PTHREADT_NULL;
868         worker->state = ALIVE;
869         if (ast_pthread_create(&worker->thread, NULL, worker_start, worker) < 0) {
870                 ast_log(LOG_ERROR, "Unable to start worker thread!\n");
871                 ao2_ref(worker, -1);
872                 return NULL;
873         }
874         return worker;
875 }
876
877 /*!
878  * \brief Active loop for worker threads
879  *
880  * The worker will stay in this loop for its lifetime,
881  * executing tasks as they become available. If there
882  * are no tasks currently available, then the thread
883  * will go idle.
884  *
885  * \param worker The worker thread executing tasks.
886  */
887 static void worker_active(struct worker_thread *worker)
888 {
889         int alive = 1;
890         while (alive) {
891                 if (threadpool_execute(worker->pool)) {
892                         alive = worker_idle(worker);
893                 }
894         }
895
896         /* Reaching this portion means the thread is
897          * on death's door. It may have been killed while
898          * it was idle, in which case it can just die
899          * peacefully. If it's a zombie, though, then
900          * it needs to let the pool know so
901          * that the thread can be removed from the
902          * list of zombie threads.
903          */
904         if (worker->state == ZOMBIE) {
905                 threadpool_zombie_thread_dead(worker->pool, worker);
906         }
907 }
908
909 /*!
910  * \brief Idle function for worker threads
911  *
912  * The worker waits here until it gets told by the threadpool
913  * to wake up.
914  *
915  * \param worker The idle worker
916  * \retval 0 The thread is being woken up so that it can conclude.
917  * \retval non-zero The thread is being woken up to do more work.
918  */
919 static int worker_idle(struct worker_thread *worker)
920 {
921         SCOPED_MUTEX(lock, &worker->lock);
922         if (worker->state != ALIVE) {
923                 return 0;
924         }
925         threadpool_active_thread_idle(worker->pool, worker);
926         while (!worker->wake_up) {
927                 ast_cond_wait(&worker->cond, lock);
928         }
929         worker->wake_up = 0;
930         return worker->state == ALIVE;
931 }
932
933 /*!
934  * \brief Change a worker's state
935  *
936  * The threadpool calls into this function in order to let a worker know
937  * how it should proceed.
938  */
939 static void worker_set_state(struct worker_thread *worker, enum worker_state state)
940 {
941         SCOPED_MUTEX(lock, &worker->lock);
942         worker->state = state;
943         worker->wake_up = 1;
944         ast_cond_signal(&worker->cond);
945 }
946