Make the initial size of the threadpool part of the options passed in.
[asterisk/asterisk.git] / main / threadpool.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2012, Digium, Inc.
5  *
6  * Mark Michelson <mmmichelson@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19
20 #include "asterisk.h"
21
22 #include "asterisk/threadpool.h"
23 #include "asterisk/taskprocessor.h"
24 #include "asterisk/astobj2.h"
25 #include "asterisk/utils.h"
26
27 #define THREAD_BUCKETS 89
28
29 /*!
30  * \brief An opaque threadpool structure
31  *
32  * A threadpool is a collection of threads that execute
33  * tasks from a common queue.
34  */
35 struct ast_threadpool {
36         /*! Threadpool listener */
37         struct ast_threadpool_listener *listener;
38         /*!
39          * \brief The container of active threads.
40          * Active threads are those that are currently running tasks
41          */
42         struct ao2_container *active_threads;
43         /*!
44          * \brief The container of idle threads.
45          * Idle threads are those that are currenly waiting to run tasks
46          */
47         struct ao2_container *idle_threads;
48         /*!
49          * \brief The container of zombie threads.
50          * Zombie threads may be running tasks, but they are scheduled to die soon
51          */
52         struct ao2_container *zombie_threads;
53         /*!
54          * \brief The main taskprocessor
55          *
56          * Tasks that are queued in this taskprocessor are
57          * doled out to the worker threads. Worker threads that
58          * execute tasks from the threadpool are executing tasks
59          * in this taskprocessor.
60          *
61          * The threadpool itself is actually the private data for
62          * this taskprocessor's listener. This way, as taskprocessor
63          * changes occur, the threadpool can alert its listeners
64          * appropriately.
65          */
66         struct ast_taskprocessor *tps;
67         /*!
68          * \brief The control taskprocessor
69          *
70          * This is a standard taskprocessor that uses the default
71          * taskprocessor listener. In other words, all tasks queued to
72          * this taskprocessor have a single thread that executes the
73          * tasks.
74          *
75          * All tasks that modify the state of the threadpool and all tasks
76          * that call out to threadpool listeners are pushed to this
77          * taskprocessor.
78          *
79          * For instance, when the threadpool changes sizes, a task is put
80          * into this taskprocessor to do so. When it comes time to tell the
81          * threadpool listener that worker threads have changed state,
82          * the task is placed in this taskprocessor.
83          *
84          * This is done for three main reasons
85          * 1) It ensures that listeners are given an accurate portrayal
86          * of the threadpool's current state. In other words, when a listener
87          * gets told a count of active, idle and zombie threads, it does not
88          * need to worry that internal state of the threadpool might be different
89          * from what it has been told.
90          * 2) It minimizes the locking required in both the threadpool and in
91          * threadpool listener's callbacks.
92          * 3) It ensures that listener callbacks are called in the same order
93          * that the threadpool had its state change.
94          */
95         struct ast_taskprocessor *control_tps;
96         /*! True if the threadpool is in the processof shutting down */
97         int shutting_down;
98         /*! Threadpool-specific options */
99         struct ast_threadpool_options options;
100 };
101
102 /*!
103  * \brief states for worker threads
104  */
105 enum worker_state {
106         /*! The worker is either active or idle */
107         ALIVE,
108         /*!
109          * The worker has been asked to shut down but
110          * may still be in the process of executing tasks.
111          * This transition happens when the threadpool needs
112          * to shrink and needs to kill active threads in order
113          * to do so.
114          */
115         ZOMBIE,
116         /*!
117          * The worker has been asked to shut down. Typically
118          * only idle threads go to this state directly, but
119          * active threads may go straight to this state when
120          * the threadpool is shut down.
121          */
122         DEAD,
123 };
124
125 /*!
126  * A thread that executes threadpool tasks
127  */
128 struct worker_thread {
129         /*! A unique (within a run of Asterisk) ID for the thread. Used for hashing and searching */
130         int id;
131         /*! Condition used in conjunction with state changes */
132         ast_cond_t cond;
133         /*! Lock used alongside the condition for state changes */
134         ast_mutex_t lock;
135         /*! The actual thread that is executing tasks */
136         pthread_t thread;
137         /*! A pointer to the threadpool. Needed to be able to execute tasks */
138         struct ast_threadpool *pool;
139         /*! The current state of the worker thread */
140         enum worker_state state;
141         /*! A boolean used to determine if an idle thread should become active */
142         int wake_up;
143         /*! Options for this threadpool */
144         struct ast_threadpool_options options;
145 };
146
147 /* Worker thread forward declarations. See definitions for documentation */
148 static int worker_thread_hash(const void *obj, int flags);
149 static int worker_thread_cmp(void *obj, void *arg, int flags);
150 static void worker_thread_destroy(void *obj);
151 static void worker_active(struct worker_thread *worker);
152 static void *worker_start(void *arg);
153 static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool);
154 static int worker_thread_start(struct worker_thread *worker);
155 static int worker_idle(struct worker_thread *worker);
156 static void worker_set_state(struct worker_thread *worker, enum worker_state state);
157 static void worker_shutdown(struct worker_thread *worker);
158
159 /*!
160  * \brief Notify the threadpool listener that the state has changed.
161  *
162  * This notifies the threadpool listener via its state_changed callback.
163  * \param pool The threadpool whose state has changed
164  */
165 static void threadpool_send_state_changed(struct ast_threadpool *pool)
166 {
167         int active_size = ao2_container_count(pool->active_threads);
168         int idle_size = ao2_container_count(pool->idle_threads);
169
170         if (pool->listener && pool->listener->callbacks->state_changed) {
171                 pool->listener->callbacks->state_changed(pool, pool->listener, active_size, idle_size);
172         }
173 }
174
175 /*!
176  * \brief Struct used for queued operations involving worker state changes
177  */
178 struct thread_worker_pair {
179         /*! Threadpool that contains the worker whose state has changed */
180         struct ast_threadpool *pool;
181         /*! Worker whose state has changed */
182         struct worker_thread *worker;
183 };
184
185 /*!
186  * \brief Destructor for thread_worker_pair
187  */
188 static void thread_worker_pair_destructor(void *obj)
189 {
190         struct thread_worker_pair *pair = obj;
191         ao2_ref(pair->worker, -1);
192 }
193
194 /*!
195  * \brief Allocate and initialize a thread_worker_pair
196  * \param pool Threadpool to assign to the thread_worker_pair
197  * \param worker Worker thread to assign to the thread_worker_pair
198  */
199 static struct thread_worker_pair *thread_worker_pair_alloc(struct ast_threadpool *pool,
200                 struct worker_thread *worker)
201 {
202         struct thread_worker_pair *pair = ao2_alloc(sizeof(*pair), thread_worker_pair_destructor);
203         if (!pair) {
204                 return NULL;
205         }
206         pair->pool = pool;
207         ao2_ref(worker, +1);
208         pair->worker = worker;
209         return pair;
210 }
211
212 /*!
213  * \brief Move a worker thread from the active container to the idle container.
214  *
215  * This function is called from the threadpool's control taskprocessor thread.
216  * \param data A thread_worker_pair containing the threadpool and the worker to move.
217  * \return 0
218  */
219 static int queued_active_thread_idle(void *data)
220 {
221         struct thread_worker_pair *pair = data;
222
223         ao2_link(pair->pool->idle_threads, pair->worker);
224         ao2_unlink(pair->pool->active_threads, pair->worker);
225
226         threadpool_send_state_changed(pair->pool);
227
228         ao2_ref(pair, -1);
229         return 0;
230 }
231
232 /*!
233  * \brief Queue a task to move a thread from the active list to the idle list
234  *
235  * This is called by a worker thread when it runs out of tasks to perform and
236  * goes idle.
237  * \param pool The threadpool to which the worker belongs
238  * \param worker The worker thread that has gone idle
239  */
240 static void threadpool_active_thread_idle(struct ast_threadpool *pool,
241                 struct worker_thread *worker)
242 {
243         struct thread_worker_pair *pair;
244         SCOPED_AO2LOCK(lock, pool);
245         if (pool->shutting_down) {
246                 return;
247         }
248         pair = thread_worker_pair_alloc(pool, worker);
249         if (!pair) {
250                 return;
251         }
252         ast_taskprocessor_push(pool->control_tps, queued_active_thread_idle, pair);
253 }
254
255 /*!
256  * \brief Kill a zombie thread
257  *
258  * This runs from the threadpool's control taskprocessor thread.
259  *
260  * \param data A thread_worker_pair containing the threadpool and the zombie thread
261  * \return 0
262  */
263 static int queued_zombie_thread_dead(void *data)
264 {
265         struct thread_worker_pair *pair = data;
266
267         ao2_unlink(pair->pool->zombie_threads, pair->worker);
268         threadpool_send_state_changed(pair->pool);
269
270         ao2_ref(pair, -1);
271         return 0;
272 }
273
274 /*!
275  * \brief Queue a task to kill a zombie thread
276  *
277  * This is called by a worker thread when it acknowledges that it is time for
278  * it to die.
279  */
280 static void threadpool_zombie_thread_dead(struct ast_threadpool *pool,
281                 struct worker_thread *worker)
282 {
283         struct thread_worker_pair *pair;
284         SCOPED_AO2LOCK(lock, pool);
285         if (pool->shutting_down) {
286                 return;
287         }
288         pair = thread_worker_pair_alloc(pool, worker);
289         if (!pair) {
290                 return;
291         }
292         ast_taskprocessor_push(pool->control_tps, queued_zombie_thread_dead, pair);
293 }
294
295 static int queued_idle_thread_dead(void *data)
296 {
297         struct thread_worker_pair *pair = data;
298
299         ao2_unlink(pair->pool->idle_threads, pair->worker);
300         threadpool_send_state_changed(pair->pool);
301
302         ao2_ref(pair, -1);
303         return 0;
304 }
305
306 static void threadpool_idle_thread_dead(struct ast_threadpool *pool,
307                 struct worker_thread *worker)
308 {
309         struct thread_worker_pair *pair;
310         SCOPED_AO2LOCK(lock, pool);
311         if (pool->shutting_down) {
312                 return;
313         }
314         pair = thread_worker_pair_alloc(pool, worker);
315         if (!pair) {
316                 return;
317         }
318         ast_taskprocessor_push(pool->control_tps, queued_idle_thread_dead, pair);
319 }
320
321 /*!
322  * \brief Execute a task in the threadpool
323  *
324  * This is the function that worker threads call in order to execute tasks
325  * in the threadpool
326  *
327  * \param pool The pool to which the tasks belong.
328  * \retval 0 Either the pool has been shut down or there are no tasks.
329  * \retval 1 There are still tasks remaining in the pool.
330  */
331 static int threadpool_execute(struct ast_threadpool *pool)
332 {
333         ao2_lock(pool);
334         if (!pool->shutting_down) {
335                 ao2_unlock(pool);
336                 return ast_taskprocessor_execute(pool->tps);
337         }
338         ao2_unlock(pool);
339         return 0;
340 }
341
342 /*!
343  * \brief Destroy a threadpool's components.
344  *
345  * This is the destructor called automatically when the threadpool's
346  * reference count reaches zero. This is not to be confused with
347  * threadpool_destroy.
348  *
349  * By the time this actually gets called, most of the cleanup has already
350  * been done in the pool. The only thing left to do is to release the
351  * final reference to the threadpool listener.
352  *
353  * \param obj The pool to destroy
354  */
355 static void threadpool_destructor(void *obj)
356 {
357         struct ast_threadpool *pool = obj;
358         ao2_cleanup(pool->listener);
359 }
360
361 /*
362  * \brief Allocate a threadpool
363  *
364  * This is implemented as a taskprocessor listener's alloc callback. This
365  * is because the threadpool exists as the private data on a taskprocessor
366  * listener.
367  *
368  * \param name The name of the threadpool.
369  * \param options The options the threadpool uses.
370  * \retval NULL Could not initialize threadpool properly
371  * \retval non-NULL The newly-allocated threadpool
372  */
373 static void *threadpool_alloc(const char *name, const struct ast_threadpool_options *options)
374 {
375         RAII_VAR(struct ast_threadpool *, pool,
376                         ao2_alloc(sizeof(*pool), threadpool_destructor), ao2_cleanup);
377         struct ast_str *control_tps_name = ast_str_create(64);
378
379         if (!control_tps_name) {
380                 return NULL;
381         }
382
383         ast_str_set(&control_tps_name, 0, "%s-control", name);
384
385         pool->control_tps = ast_taskprocessor_get(ast_str_buffer(control_tps_name), TPS_REF_DEFAULT);
386         ast_free(control_tps_name);
387         if (!pool->control_tps) {
388                 return NULL;
389         }
390         pool->active_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
391         if (!pool->active_threads) {
392                 return NULL;
393         }
394         pool->idle_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
395         if (!pool->idle_threads) {
396                 return NULL;
397         }
398         pool->zombie_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
399         if (!pool->zombie_threads) {
400                 return NULL;
401         }
402         pool->options = *options;
403
404         ao2_ref(pool, +1);
405         return pool;
406 }
407
408 static int threadpool_tps_start(struct ast_taskprocessor_listener *listener)
409 {
410         return 0;
411 }
412
413 /*!
414  * \brief helper used for queued task when tasks are pushed
415  */
416 struct task_pushed_data {
417         /*! Pool into which a task was pushed */
418         struct ast_threadpool *pool;
419         /*! Indicator of whether the pool had no tasks prior to the new task being added */
420         int was_empty;
421 };
422
423 /*!
424  * \brief Allocate and initialize a task_pushed_data
425  * \param pool The threadpool to set in the task_pushed_data
426  * \param was_empty The was_empty value to set in the task_pushed_data
427  * \retval NULL Unable to allocate task_pushed_data
428  * \retval non-NULL The newly-allocated task_pushed_data
429  */
430 static struct task_pushed_data *task_pushed_data_alloc(struct ast_threadpool *pool,
431                 int was_empty)
432 {
433         struct task_pushed_data *tpd = ao2_alloc(sizeof(*tpd), NULL);
434
435         if (!tpd) {
436                 return NULL;
437         }
438         tpd->pool = pool;
439         tpd->was_empty = was_empty;
440         return tpd;
441 }
442
443 /*!
444  * \brief Activate idle threads
445  *
446  * This function always returns CMP_MATCH because all workers that this
447  * function acts on need to be seen as matches so they are unlinked from the
448  * list of idle threads.
449  *
450  * Called as an ao2_callback in the threadpool's control taskprocessor thread.
451  * \param obj The worker to activate
452  * \param arg The pool where the worker belongs
453  * \retval CMP_MATCH
454  */
455 static int activate_thread(void *obj, void *arg, int flags)
456 {
457         struct worker_thread *worker = obj;
458         struct ast_threadpool *pool = arg;
459
460         if (!ao2_link(pool->active_threads, worker)) {
461                 /* If we can't link the idle thread into the active container, then
462                  * we'll just leave the thread idle and not wake it up.
463                  */
464                 ast_log(LOG_WARNING, "Failed to activate thread %d. Remaining idle\n",
465                                 worker->id);
466                 return 0;
467         }
468         worker_set_state(worker, ALIVE);
469         return CMP_MATCH;
470 }
471
472 /*!
473  * \brief Add threads to the threadpool
474  *
475  * This function is called from the threadpool's control taskprocessor thread.
476  * \param pool The pool that is expanding
477  * \delta The number of threads to add to the pool
478  */
479 static void grow(struct ast_threadpool *pool, int delta)
480 {
481         int i;
482
483         ast_debug(3, "Increasing threadpool %s's size by %d\n",
484                         ast_taskprocessor_name(pool->tps), delta);
485
486         for (i = 0; i < delta; ++i) {
487                 struct worker_thread *worker = worker_thread_alloc(pool);
488                 if (!worker) {
489                         return;
490                 }
491                 if (ao2_link(pool->active_threads, worker)) {
492                         if (worker_thread_start(worker)) {
493                                 ast_log(LOG_ERROR, "Unable to start worker thread %d. Destroying.\n", worker->id);
494                                 ao2_unlink(pool->active_threads, worker);
495                         }
496                 } else {
497                         ast_log(LOG_WARNING, "Failed to activate worker thread %d. Destroying.\n", worker->id);
498                 }
499                 ao2_ref(worker, -1);
500         }
501 }
502
503 /*!
504  * \brief Queued task called when tasks are pushed into the threadpool
505  *
506  * This function first calls into the threadpool's listener to let it know
507  * that a task has been pushed. It then wakes up all idle threads and moves
508  * them into the active thread container.
509  * \param data A task_pushed_data
510  * \return 0
511  */
512 static int queued_task_pushed(void *data)
513 {
514         struct task_pushed_data *tpd = data;
515         struct ast_threadpool *pool = tpd->pool;
516         int was_empty = tpd->was_empty;
517         int state_changed;
518
519         if (pool->listener && pool->listener->callbacks->task_pushed) {
520                 pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty);
521         }
522         if (ao2_container_count(pool->idle_threads) == 0) {
523                 if (pool->options.auto_increment > 0) {
524                         grow(pool, pool->options.auto_increment);
525                         state_changed = 1;
526                 }
527         } else {
528                 ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
529                                 activate_thread, pool);
530                 state_changed = 1;
531         }
532         if (state_changed) {
533                 threadpool_send_state_changed(pool);
534         }
535         ao2_ref(tpd, -1);
536         return 0;
537 }
538
539 /*!
540  * \brief Taskprocessor listener callback called when a task is added
541  *
542  * The threadpool uses this opportunity to queue a task on its control taskprocessor
543  * in order to activate idle threads and notify the threadpool listener that the
544  * task has been pushed.
545  * \param listener The taskprocessor listener. The threadpool is the listener's private data
546  * \param was_empty True if the taskprocessor was empty prior to the task being pushed
547  */
548 static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener,
549                 int was_empty)
550 {
551         struct ast_threadpool *pool = listener->user_data;
552         struct task_pushed_data *tpd;
553         SCOPED_AO2LOCK(lock, pool);
554
555         if (pool->shutting_down) {
556                 return;
557         }
558         tpd = task_pushed_data_alloc(pool, was_empty);
559         if (!tpd) {
560                 return;
561         }
562
563         ast_taskprocessor_push(pool->control_tps, queued_task_pushed, tpd);
564 }
565
566 /*!
567  * \brief Queued task that handles the case where the threadpool's taskprocessor is emptied
568  *
569  * This simply lets the threadpool's listener know that the threadpool is devoid of tasks
570  * \param data The pool that has become empty
571  * \return 0
572  */
573 static int queued_emptied(void *data)
574 {
575         struct ast_threadpool *pool = data;
576
577         /* We already checked for existence of this callback when this was queued */
578         pool->listener->callbacks->emptied(pool, pool->listener);
579         return 0;
580 }
581
582 /*!
583  * \brief Taskprocessor listener emptied callback
584  *
585  * The threadpool queues a task to let the threadpool listener know that
586  * the threadpool no longer contains any tasks.
587  * \param listener The taskprocessor listener. The threadpool is the listener's private data.
588  */
589 static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener)
590 {
591         struct ast_threadpool *pool = listener->user_data;
592         SCOPED_AO2LOCK(lock, pool);
593
594         if (pool->shutting_down) {
595                 return;
596         }
597
598         if (pool->listener && pool->listener->callbacks->emptied) {
599                 ast_taskprocessor_push(pool->control_tps, queued_emptied, pool);
600         }
601 }
602
603 /*!
604  * \brief Taskprocessor listener shutdown callback
605  *
606  * The threadpool will shut down and destroy all of its worker threads when
607  * this is called back. By the time this gets called, the taskprocessor's
608  * control taskprocessor has already been destroyed. Therefore there is no risk
609  * in outright destroying the worker threads here.
610  * \param listener The taskprocessor listener. The threadpool is the listener's private data.
611  */
612 static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener)
613 {
614         struct ast_threadpool *pool = listener->user_data;
615
616         pool->listener->callbacks->shutdown(pool->listener);
617         ao2_cleanup(pool->active_threads);
618         ao2_cleanup(pool->idle_threads);
619         ao2_cleanup(pool->zombie_threads);
620         ao2_cleanup(pool);
621 }
622
623 /*!
624  * \brief Table of taskprocessor listener callbacks for threadpool's main taskprocessor
625  */
626 static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks = {
627         .start = threadpool_tps_start,
628         .task_pushed = threadpool_tps_task_pushed,
629         .emptied = threadpool_tps_emptied,
630         .shutdown = threadpool_tps_shutdown,
631 };
632
633 /*!
634  * \brief ao2 callback to kill a set number of threads.
635  *
636  * Threads will be unlinked from the container as long as the
637  * counter has not reached zero. The counter is decremented with
638  * each thread that is removed.
639  * \param obj The worker thread up for possible destruction
640  * \param arg The counter
641  * \param flags Unused
642  * \retval CMP_MATCH The counter has not reached zero, so this flag should be removed.
643  * \retval CMP_STOP The counter has reached zero so no more threads should be removed.
644  */
645 static int kill_threads(void *obj, void *arg, int flags)
646 {
647         int *num_to_kill = arg;
648
649         if (*num_to_kill > 0) {
650                 --(*num_to_kill);
651                 return CMP_MATCH;
652         } else {
653                 return CMP_STOP;
654         }
655 }
656
657 /*!
658  * \brief ao2 callback to zombify a set number of threads.
659  *
660  * Threads will be zombified as long as as the counter has not reached
661  * zero. The counter is decremented with each thread that is zombified.
662  *
663  * Zombifying a thread involves removing it from its current container,
664  * adding it to the zombie container, and changing the state of the
665  * worker to a zombie
666  *
667  * This callback is called from the threadpool control taskprocessor thread.
668  *
669  * \param obj The worker thread that may be zombified
670  * \param arg The pool to which the worker belongs
671  * \param data The counter
672  * \param flags Unused
673  * \retval CMP_MATCH The zombified thread should be removed from its current container
674  * \retval CMP_STOP Stop attempting to zombify threads
675  */
676 static int zombify_threads(void *obj, void *arg, void *data, int flags)
677 {
678         struct worker_thread *worker = obj;
679         struct ast_threadpool *pool = arg;
680         int *num_to_zombify = data;
681
682         if ((*num_to_zombify)-- > 0) {
683                 if (!ao2_link(pool->zombie_threads, worker)) {
684                         ast_log(LOG_WARNING, "Failed to zombify active thread %d. Thread will remain active\n", worker->id);
685                         return 0;
686                 }
687                 worker_set_state(worker, ZOMBIE);
688                 return CMP_MATCH;
689         } else {
690                 return CMP_STOP;
691         }
692 }
693
694 /*!
695  * \brief Remove threads from the threadpool
696  *
697  * The preference is to kill idle threads. However, if there are
698  * more threads to remove than there are idle threads, then active
699  * threads will be zombified instead.
700  *
701  * This function is called from the threadpool control taskprocessor thread.
702  *
703  * \param pool The threadpool to remove threads from
704  * \param delta The number of threads to remove
705  */
706 static void shrink(struct ast_threadpool *pool, int delta)
707 {
708         /*
709          * Preference is to kill idle threads, but
710          * we'll move on to deactivating active threads
711          * if we have to
712          */
713         int idle_threads = ao2_container_count(pool->idle_threads);
714         int idle_threads_to_kill = MIN(delta, idle_threads);
715         int active_threads_to_zombify = delta - idle_threads_to_kill;
716
717         ast_debug(3, "Destroying %d idle threads in threadpool %s\n", idle_threads_to_kill,
718                         ast_taskprocessor_name(pool->tps));
719
720         ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
721                         kill_threads, &idle_threads_to_kill);
722
723         ast_debug(3, "Destroying %d active threads in threadpool %s\n", active_threads_to_zombify,
724                         ast_taskprocessor_name(pool->tps));
725
726         ao2_callback_data(pool->active_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
727                         zombify_threads, pool, &active_threads_to_zombify);
728 }
729
730 /*!
731  * \brief Helper struct used for queued operations that change the size of the threadpool
732  */
733 struct set_size_data {
734         /*! The pool whose size is to change */
735         struct ast_threadpool *pool;
736         /*! The requested new size of the pool */
737         unsigned int size;
738 };
739
740 /*!
741  * \brief Allocate and initialize a set_size_data
742  * \param pool The pool for the set_size_data
743  * \param size The size to store in the set_size_data
744  */
745 static struct set_size_data *set_size_data_alloc(struct ast_threadpool *pool,
746                 unsigned int size)
747 {
748         struct set_size_data *ssd = ao2_alloc(sizeof(*ssd), NULL);
749         if (!ssd) {
750                 return NULL;
751         }
752
753         ssd->pool = pool;
754         ssd->size = size;
755         return ssd;
756 }
757
758 /*!
759  * \brief Change the size of the threadpool
760  *
761  * This can either result in shrinking or growing the threadpool depending
762  * on the new desired size and the current size.
763  *
764  * This function is run from the threadpool control taskprocessor thread
765  *
766  * \param data A set_size_data used for determining how to act
767  * \return 0
768  */
769 static int queued_set_size(void *data)
770 {
771         struct set_size_data *ssd = data;
772         struct ast_threadpool *pool = ssd->pool;
773         unsigned int num_threads = ssd->size;
774
775         /* We don't count zombie threads as being "live when potentially resizing */
776         unsigned int current_size = ao2_container_count(pool->active_threads) +
777                 ao2_container_count(pool->idle_threads);
778
779         if (current_size == num_threads) {
780                 ast_log(LOG_NOTICE, "Not changing threadpool size since new size %u is the same as current %u\n",
781                                 num_threads, current_size);
782                 return 0;
783         }
784
785         if (current_size < num_threads) {
786                 grow(pool, num_threads - current_size);
787         } else {
788                 shrink(pool, current_size - num_threads);
789         }
790
791         threadpool_send_state_changed(pool);
792         ao2_ref(ssd, -1);
793         return 0;
794 }
795
796 void ast_threadpool_set_size(struct ast_threadpool *pool, unsigned int size)
797 {
798         struct set_size_data *ssd;
799         SCOPED_AO2LOCK(lock, pool);
800         if (pool->shutting_down) {
801                 return;
802         }
803
804         ssd = set_size_data_alloc(pool, size);
805         if (!ssd) {
806                 return;
807         }
808
809         ast_taskprocessor_push(pool->control_tps, queued_set_size, ssd);
810 }
811
812 struct ast_threadpool_listener *ast_threadpool_listener_alloc(
813                 const struct ast_threadpool_listener_callbacks *callbacks, void *user_data)
814 {
815         struct ast_threadpool_listener *listener = ao2_alloc(sizeof(*listener), NULL);
816         if (!listener) {
817                 return NULL;
818         }
819         listener->callbacks = callbacks;
820         listener->user_data = user_data;
821         return listener;
822 }
823
824 struct pool_options_pair {
825         struct ast_threadpool *pool;
826         struct ast_threadpool_options options;
827 };
828
829 struct ast_threadpool *ast_threadpool_create(const char *name,
830                 struct ast_threadpool_listener *listener,
831                 const struct ast_threadpool_options *options)
832 {
833         struct ast_taskprocessor *tps;
834         RAII_VAR(struct ast_taskprocessor_listener *, tps_listener, NULL, ao2_cleanup);
835         RAII_VAR(struct ast_threadpool *, pool, threadpool_alloc(name, options), ao2_cleanup);
836
837         if (!pool) {
838                 return NULL;
839         }
840         
841         tps_listener = ast_taskprocessor_listener_alloc(&threadpool_tps_listener_callbacks, pool);
842         if (!tps_listener) {
843                 return NULL;
844         }
845
846         if (options->version != AST_THREADPOOL_OPTIONS_VERSION) {
847                 ast_log(LOG_WARNING, "Incompatible version of threadpool options in use.\n");
848                 return NULL;
849         }
850
851         tps = ast_taskprocessor_create_with_listener(name, tps_listener);
852         if (!tps) {
853                 return NULL;
854         }
855
856         pool->tps = tps;
857         if (listener) {
858                 ao2_ref(listener, +1);
859                 pool->listener = listener;
860         }
861         ast_threadpool_set_size(pool, pool->options.initial_size);
862         ao2_ref(pool, +1);
863         return pool;
864 }
865
866 int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data)
867 {
868         SCOPED_AO2LOCK(lock, pool);
869         if (!pool->shutting_down) {
870                 return ast_taskprocessor_push(pool->tps, task, data);
871         }
872         return 0;
873 }
874
875 void ast_threadpool_shutdown(struct ast_threadpool *pool)
876 {
877         /* Shut down the taskprocessors and everything else just
878          * takes care of itself via the taskprocessor callbacks
879          */
880         ao2_lock(pool);
881         pool->shutting_down = 1;
882         ao2_unlock(pool);
883         ast_taskprocessor_unreference(pool->control_tps);
884         ast_taskprocessor_unreference(pool->tps);
885 }
886
887 /*!
888  * A monotonically increasing integer used for worker
889  * thread identification.
890  */
891 static int worker_id_counter;
892
893 static int worker_thread_hash(const void *obj, int flags)
894 {
895         const struct worker_thread *worker = obj;
896
897         return worker->id;
898 }
899
900 static int worker_thread_cmp(void *obj, void *arg, int flags)
901 {
902         struct worker_thread *worker1 = obj;
903         struct worker_thread *worker2 = arg;
904
905         return worker1->id == worker2->id ? CMP_MATCH : 0;
906 }
907
908 /*!
909  * \brief shut a worker thread down
910  *
911  * Set the worker dead and then wait for its thread
912  * to finish executing.
913  *
914  * \param worker The worker thread to shut down
915  */
916 static void worker_shutdown(struct worker_thread *worker)
917 {
918         worker_set_state(worker, DEAD);
919         if (worker->thread != AST_PTHREADT_NULL) {
920                 pthread_join(worker->thread, NULL);
921                 worker->thread = AST_PTHREADT_NULL;
922         }
923 }
924
925 /*!
926  * \brief Worker thread destructor
927  *
928  * Called automatically when refcount reaches 0. Shuts
929  * down the worker thread and destroys its component
930  * parts
931  */
932 static void worker_thread_destroy(void *obj)
933 {
934         struct worker_thread *worker = obj;
935         ast_debug(3, "Destroying worker thread %d\n", worker->id);
936         worker_shutdown(worker);
937         ast_mutex_destroy(&worker->lock);
938         ast_cond_destroy(&worker->cond);
939 }
940
941 /*!
942  * \brief start point for worker threads
943  *
944  * Worker threads start in the active state but may
945  * immediately go idle if there is no work to be
946  * done
947  *
948  * \param arg The worker thread
949  * \retval NULL
950  */
951 static void *worker_start(void *arg)
952 {
953         struct worker_thread *worker = arg;
954
955         worker_active(worker);
956         return NULL;
957 }
958
959 /*!
960  * \brief Allocate and initialize a new worker thread
961  *
962  * This will create, initialize, and start the thread.
963  *
964  * \param pool The threadpool to which the worker will be added
965  * \retval NULL Failed to allocate or start the worker thread
966  * \retval non-NULL The newly-created worker thread
967  */
968 static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool)
969 {
970         struct worker_thread *worker = ao2_alloc(sizeof(*worker), worker_thread_destroy);
971         if (!worker) {
972                 return NULL;
973         }
974         worker->id = ast_atomic_fetchadd_int(&worker_id_counter, 1);
975         ast_mutex_init(&worker->lock);
976         ast_cond_init(&worker->cond, NULL);
977         worker->pool = pool;
978         worker->thread = AST_PTHREADT_NULL;
979         worker->state = ALIVE;
980         worker->options = pool->options;
981         return worker;
982 }
983
984 static int worker_thread_start(struct worker_thread *worker)
985 {
986         return ast_pthread_create(&worker->thread, NULL, worker_start, worker);
987 }
988
989 /*!
990  * \brief Active loop for worker threads
991  *
992  * The worker will stay in this loop for its lifetime,
993  * executing tasks as they become available. If there
994  * are no tasks currently available, then the thread
995  * will go idle.
996  *
997  * \param worker The worker thread executing tasks.
998  */
999 static void worker_active(struct worker_thread *worker)
1000 {
1001         int alive = 1;
1002         while (alive) {
1003                 if (!threadpool_execute(worker->pool)) {
1004                         alive = worker_idle(worker);
1005                 }
1006         }
1007
1008         /* Reaching this portion means the thread is
1009          * on death's door. It may have been killed while
1010          * it was idle, in which case it can just die
1011          * peacefully. If it's a zombie, though, then
1012          * it needs to let the pool know so
1013          * that the thread can be removed from the
1014          * list of zombie threads.
1015          */
1016         if (worker->state == ZOMBIE) {
1017                 threadpool_zombie_thread_dead(worker->pool, worker);
1018         }
1019 }
1020
1021 /*!
1022  * \brief Idle function for worker threads
1023  *
1024  * The worker waits here until it gets told by the threadpool
1025  * to wake up.
1026  *
1027  * \param worker The idle worker
1028  * \retval 0 The thread is being woken up so that it can conclude.
1029  * \retval non-zero The thread is being woken up to do more work.
1030  */
1031 static int worker_idle(struct worker_thread *worker)
1032 {
1033         struct timeval start = ast_tvnow();
1034         struct timespec end = {
1035                 .tv_sec = start.tv_sec + worker->options.idle_timeout,
1036                 .tv_nsec = start.tv_usec * 1000,
1037         };
1038         SCOPED_MUTEX(lock, &worker->lock);
1039         if (worker->state != ALIVE) {
1040                 return 0;
1041         }
1042         threadpool_active_thread_idle(worker->pool, worker);
1043         while (!worker->wake_up) {
1044                 if (worker->options.idle_timeout <= 0) {
1045                         ast_cond_wait(&worker->cond, lock);
1046                 } else if (ast_cond_timedwait(&worker->cond, lock, &end) == ETIMEDOUT) {
1047                         break;
1048                 }
1049         }
1050
1051         if (!worker->wake_up) {
1052                 ast_debug(1, "Worker thread idle timeout reached. Dying.\n");
1053                 threadpool_idle_thread_dead(worker->pool, worker);
1054                 worker->state = DEAD;
1055         }
1056         worker->wake_up = 0;
1057         return worker->state == ALIVE;
1058 }
1059
1060 /*!
1061  * \brief Change a worker's state
1062  *
1063  * The threadpool calls into this function in order to let a worker know
1064  * how it should proceed.
1065  */
1066 static void worker_set_state(struct worker_thread *worker, enum worker_state state)
1067 {
1068         SCOPED_MUTEX(lock, &worker->lock);
1069         worker->state = state;
1070         worker->wake_up = 1;
1071         ast_cond_signal(&worker->cond);
1072 }
1073