05a5f8dd1748650acb37e9832ca09e530e7fa595
[asterisk/asterisk.git] / main / threadpool.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2012, Digium, Inc.
5  *
6  * Mark Michelson <mmmichelson@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19
20 #include "asterisk.h"
21
22 #include "asterisk/threadpool.h"
23 #include "asterisk/taskprocessor.h"
24 #include "asterisk/astobj2.h"
25 #include "asterisk/utils.h"
26
27 #define THREAD_BUCKETS 89
28
29 /*!
30  * \brief An opaque threadpool structure
31  *
32  * A threadpool is a collection of threads that execute
33  * tasks from a common queue.
34  */
35 struct ast_threadpool {
36         /*! Threadpool listener */
37         struct ast_threadpool_listener *listener;
38         /*!
39          * \brief The container of active threads.
40          * Active threads are those that are currently running tasks
41          */
42         struct ao2_container *active_threads;
43         /*!
44          * \brief The container of idle threads.
45          * Idle threads are those that are currenly waiting to run tasks
46          */
47         struct ao2_container *idle_threads;
48         /*!
49          * \brief The container of zombie threads.
50          * Zombie threads may be running tasks, but they are scheduled to die soon
51          */
52         struct ao2_container *zombie_threads;
53         /*!
54          * \brief The main taskprocessor
55          *
56          * Tasks that are queued in this taskprocessor are
57          * doled out to the worker threads. Worker threads that
58          * execute tasks from the threadpool are executing tasks
59          * in this taskprocessor.
60          *
61          * The threadpool itself is actually the private data for
62          * this taskprocessor's listener. This way, as taskprocessor
63          * changes occur, the threadpool can alert its listeners
64          * appropriately.
65          */
66         struct ast_taskprocessor *tps;
67         /*!
68          * \brief The control taskprocessor
69          *
70          * This is a standard taskprocessor that uses the default
71          * taskprocessor listener. In other words, all tasks queued to
72          * this taskprocessor have a single thread that executes the
73          * tasks.
74          *
75          * All tasks that modify the state of the threadpool and all tasks
76          * that call out to threadpool listeners are pushed to this
77          * taskprocessor.
78          *
79          * For instance, when the threadpool changes sizes, a task is put
80          * into this taskprocessor to do so. When it comes time to tell the
81          * threadpool listener that worker threads have changed state,
82          * the task is placed in this taskprocessor.
83          *
84          * This is done for three main reasons
85          * 1) It ensures that listeners are given an accurate portrayal
86          * of the threadpool's current state. In other words, when a listener
87          * gets told a count of active, idle and zombie threads, it does not
88          * need to worry that internal state of the threadpool might be different
89          * from what it has been told.
90          * 2) It minimizes the locking required in both the threadpool and in
91          * threadpool listener's callbacks.
92          * 3) It ensures that listener callbacks are called in the same order
93          * that the threadpool had its state change.
94          */
95         struct ast_taskprocessor *control_tps;
96         /*! True if the threadpool is in the processof shutting down */
97         int shutting_down;
98         /*! Threadpool-specific options */
99         struct ast_threadpool_options options;
100 };
101
102 /*!
103  * \brief states for worker threads
104  */
105 enum worker_state {
106         /*! The worker is either active or idle */
107         ALIVE,
108         /*!
109          * The worker has been asked to shut down but
110          * may still be in the process of executing tasks.
111          * This transition happens when the threadpool needs
112          * to shrink and needs to kill active threads in order
113          * to do so.
114          */
115         ZOMBIE,
116         /*!
117          * The worker has been asked to shut down. Typically
118          * only idle threads go to this state directly, but
119          * active threads may go straight to this state when
120          * the threadpool is shut down.
121          */
122         DEAD,
123 };
124
125 /*!
126  * A thread that executes threadpool tasks
127  */
128 struct worker_thread {
129         /*! A unique (within a run of Asterisk) ID for the thread. Used for hashing and searching */
130         int id;
131         /*! Condition used in conjunction with state changes */
132         ast_cond_t cond;
133         /*! Lock used alongside the condition for state changes */
134         ast_mutex_t lock;
135         /*! The actual thread that is executing tasks */
136         pthread_t thread;
137         /*! A pointer to the threadpool. Needed to be able to execute tasks */
138         struct ast_threadpool *pool;
139         /*! The current state of the worker thread */
140         enum worker_state state;
141         /*! A boolean used to determine if an idle thread should become active */
142         int wake_up;
143         /*! Options for this threadpool */
144         struct ast_threadpool_options options;
145 };
146
147 /* Worker thread forward declarations. See definitions for documentation */
148 static int worker_thread_hash(const void *obj, int flags);
149 static int worker_thread_cmp(void *obj, void *arg, int flags);
150 static void worker_thread_destroy(void *obj);
151 static void worker_active(struct worker_thread *worker);
152 static void *worker_start(void *arg);
153 static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool);
154 static int worker_thread_start(struct worker_thread *worker);
155 static int worker_idle(struct worker_thread *worker);
156 static void worker_set_state(struct worker_thread *worker, enum worker_state state);
157 static void worker_shutdown(struct worker_thread *worker);
158
159 /*!
160  * \brief Notify the threadpool listener that the state has changed.
161  *
162  * This notifies the threadpool listener via its state_changed callback.
163  * \param pool The threadpool whose state has changed
164  */
165 static void threadpool_send_state_changed(struct ast_threadpool *pool)
166 {
167         int active_size = ao2_container_count(pool->active_threads);
168         int idle_size = ao2_container_count(pool->idle_threads);
169
170         if (pool->listener && pool->listener->callbacks->state_changed) {
171                 pool->listener->callbacks->state_changed(pool, pool->listener, active_size, idle_size);
172         }
173 }
174
175 /*!
176  * \brief Struct used for queued operations involving worker state changes
177  */
178 struct thread_worker_pair {
179         /*! Threadpool that contains the worker whose state has changed */
180         struct ast_threadpool *pool;
181         /*! Worker whose state has changed */
182         struct worker_thread *worker;
183 };
184
185 /*!
186  * \brief Destructor for thread_worker_pair
187  */
188 static void thread_worker_pair_destructor(void *obj)
189 {
190         struct thread_worker_pair *pair = obj;
191         ao2_ref(pair->worker, -1);
192 }
193
194 /*!
195  * \brief Allocate and initialize a thread_worker_pair
196  * \param pool Threadpool to assign to the thread_worker_pair
197  * \param worker Worker thread to assign to the thread_worker_pair
198  */
199 static struct thread_worker_pair *thread_worker_pair_alloc(struct ast_threadpool *pool,
200                 struct worker_thread *worker)
201 {
202         struct thread_worker_pair *pair = ao2_alloc(sizeof(*pair), thread_worker_pair_destructor);
203         if (!pair) {
204                 return NULL;
205         }
206         pair->pool = pool;
207         ao2_ref(worker, +1);
208         pair->worker = worker;
209         return pair;
210 }
211
212 /*!
213  * \brief Move a worker thread from the active container to the idle container.
214  *
215  * This function is called from the threadpool's control taskprocessor thread.
216  * \param data A thread_worker_pair containing the threadpool and the worker to move.
217  * \return 0
218  */
219 static int queued_active_thread_idle(void *data)
220 {
221         struct thread_worker_pair *pair = data;
222
223         ao2_link(pair->pool->idle_threads, pair->worker);
224         ao2_unlink(pair->pool->active_threads, pair->worker);
225
226         threadpool_send_state_changed(pair->pool);
227
228         ao2_ref(pair, -1);
229         return 0;
230 }
231
232 /*!
233  * \brief Queue a task to move a thread from the active list to the idle list
234  *
235  * This is called by a worker thread when it runs out of tasks to perform and
236  * goes idle.
237  * \param pool The threadpool to which the worker belongs
238  * \param worker The worker thread that has gone idle
239  */
240 static void threadpool_active_thread_idle(struct ast_threadpool *pool,
241                 struct worker_thread *worker)
242 {
243         struct thread_worker_pair *pair;
244         SCOPED_AO2LOCK(lock, pool);
245         if (pool->shutting_down) {
246                 return;
247         }
248         pair = thread_worker_pair_alloc(pool, worker);
249         if (!pair) {
250                 return;
251         }
252         ast_taskprocessor_push(pool->control_tps, queued_active_thread_idle, pair);
253 }
254
255 /*!
256  * \brief Kill a zombie thread
257  *
258  * This runs from the threadpool's control taskprocessor thread.
259  *
260  * \param data A thread_worker_pair containing the threadpool and the zombie thread
261  * \return 0
262  */
263 static int queued_zombie_thread_dead(void *data)
264 {
265         struct thread_worker_pair *pair = data;
266
267         ao2_unlink(pair->pool->zombie_threads, pair->worker);
268         threadpool_send_state_changed(pair->pool);
269
270         ao2_ref(pair, -1);
271         return 0;
272 }
273
274 /*!
275  * \brief Queue a task to kill a zombie thread
276  *
277  * This is called by a worker thread when it acknowledges that it is time for
278  * it to die.
279  */
280 static void threadpool_zombie_thread_dead(struct ast_threadpool *pool,
281                 struct worker_thread *worker)
282 {
283         struct thread_worker_pair *pair;
284         SCOPED_AO2LOCK(lock, pool);
285         if (pool->shutting_down) {
286                 return;
287         }
288         pair = thread_worker_pair_alloc(pool, worker);
289         if (!pair) {
290                 return;
291         }
292         ast_taskprocessor_push(pool->control_tps, queued_zombie_thread_dead, pair);
293 }
294
295 static int queued_idle_thread_dead(void *data)
296 {
297         struct thread_worker_pair *pair = data;
298
299         ao2_unlink(pair->pool->idle_threads, pair->worker);
300         threadpool_send_state_changed(pair->pool);
301
302         ao2_ref(pair, -1);
303         return 0;
304 }
305
306 static void threadpool_idle_thread_dead(struct ast_threadpool *pool,
307                 struct worker_thread *worker)
308 {
309         struct thread_worker_pair *pair;
310         SCOPED_AO2LOCK(lock, pool);
311         if (pool->shutting_down) {
312                 return;
313         }
314         pair = thread_worker_pair_alloc(pool, worker);
315         if (!pair) {
316                 return;
317         }
318         ast_taskprocessor_push(pool->control_tps, queued_idle_thread_dead, pair);
319 }
320
321 /*!
322  * \brief Execute a task in the threadpool
323  *
324  * This is the function that worker threads call in order to execute tasks
325  * in the threadpool
326  *
327  * \param pool The pool to which the tasks belong.
328  * \retval 0 Either the pool has been shut down or there are no tasks.
329  * \retval 1 There are still tasks remaining in the pool.
330  */
331 static int threadpool_execute(struct ast_threadpool *pool)
332 {
333         ao2_lock(pool);
334         if (!pool->shutting_down) {
335                 ao2_unlock(pool);
336                 return ast_taskprocessor_execute(pool->tps);
337         }
338         ao2_unlock(pool);
339         return 0;
340 }
341
342 /*!
343  * \brief Destroy a threadpool's components.
344  *
345  * This is the destructor called automatically when the threadpool's
346  * reference count reaches zero. This is not to be confused with
347  * threadpool_destroy.
348  *
349  * By the time this actually gets called, most of the cleanup has already
350  * been done in the pool. The only thing left to do is to release the
351  * final reference to the threadpool listener.
352  *
353  * \param obj The pool to destroy
354  */
355 static void threadpool_destructor(void *obj)
356 {
357         struct ast_threadpool *pool = obj;
358         ao2_cleanup(pool->listener);
359 }
360
361 /*
362  * \brief Allocate a threadpool
363  *
364  * This is implemented as a taskprocessor listener's alloc callback. This
365  * is because the threadpool exists as the private data on a taskprocessor
366  * listener.
367  *
368  * \param listener The taskprocessor listener where the threadpool will live.
369  * \retval NULL Could not initialize threadpool properly
370  * \retval non-NULL The newly-allocated threadpool
371  */
372 static void *threadpool_alloc(struct ast_taskprocessor_listener *listener)
373 {
374         RAII_VAR(struct ast_threadpool *, pool,
375                         ao2_alloc(sizeof(*pool), threadpool_destructor), ao2_cleanup);
376         struct ast_str *name = ast_str_create(64);
377
378         if (!name) {
379                 return NULL;
380         }
381
382         ast_str_set(&name, 0, "%s-control", ast_taskprocessor_name(listener->tps));
383
384         pool->control_tps = ast_taskprocessor_get(ast_str_buffer(name), TPS_REF_DEFAULT);
385         if (!pool->control_tps) {
386                 return NULL;
387         }
388         pool->active_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
389         if (!pool->active_threads) {
390                 return NULL;
391         }
392         pool->idle_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
393         if (!pool->idle_threads) {
394                 return NULL;
395         }
396         pool->zombie_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
397         if (!pool->zombie_threads) {
398                 return NULL;
399         }
400
401         ao2_ref(pool, +1);
402         return pool;
403 }
404
405 static int threadpool_tps_start(struct ast_taskprocessor_listener *listener)
406 {
407         return 0;
408 }
409
410 /*!
411  * \brief helper used for queued task when tasks are pushed
412  */
413 struct task_pushed_data {
414         /*! Pool into which a task was pushed */
415         struct ast_threadpool *pool;
416         /*! Indicator of whether the pool had no tasks prior to the new task being added */
417         int was_empty;
418 };
419
420 /*!
421  * \brief Allocate and initialize a task_pushed_data
422  * \param pool The threadpool to set in the task_pushed_data
423  * \param was_empty The was_empty value to set in the task_pushed_data
424  * \retval NULL Unable to allocate task_pushed_data
425  * \retval non-NULL The newly-allocated task_pushed_data
426  */
427 static struct task_pushed_data *task_pushed_data_alloc(struct ast_threadpool *pool,
428                 int was_empty)
429 {
430         struct task_pushed_data *tpd = ao2_alloc(sizeof(*tpd), NULL);
431
432         if (!tpd) {
433                 return NULL;
434         }
435         tpd->pool = pool;
436         tpd->was_empty = was_empty;
437         return tpd;
438 }
439
440 /*!
441  * \brief Activate idle threads
442  *
443  * This function always returns CMP_MATCH because all workers that this
444  * function acts on need to be seen as matches so they are unlinked from the
445  * list of idle threads.
446  *
447  * Called as an ao2_callback in the threadpool's control taskprocessor thread.
448  * \param obj The worker to activate
449  * \param arg The pool where the worker belongs
450  * \retval CMP_MATCH
451  */
452 static int activate_thread(void *obj, void *arg, int flags)
453 {
454         struct worker_thread *worker = obj;
455         struct ast_threadpool *pool = arg;
456
457         if (!ao2_link(pool->active_threads, worker)) {
458                 /* If we can't link the idle thread into the active container, then
459                  * we'll just leave the thread idle and not wake it up.
460                  */
461                 ast_log(LOG_WARNING, "Failed to activate thread %d. Remaining idle\n",
462                                 worker->id);
463                 return 0;
464         }
465         worker_set_state(worker, ALIVE);
466         return CMP_MATCH;
467 }
468
469 /*!
470  * \brief Add threads to the threadpool
471  *
472  * This function is called from the threadpool's control taskprocessor thread.
473  * \param pool The pool that is expanding
474  * \delta The number of threads to add to the pool
475  */
476 static void grow(struct ast_threadpool *pool, int delta)
477 {
478         int i;
479
480         ast_debug(3, "Increasing threadpool %s's size by %d\n",
481                         ast_taskprocessor_name(pool->tps), delta);
482
483         for (i = 0; i < delta; ++i) {
484                 struct worker_thread *worker = worker_thread_alloc(pool);
485                 if (!worker) {
486                         return;
487                 }
488                 if (ao2_link(pool->active_threads, worker)) {
489                         if (worker_thread_start(worker)) {
490                                 ast_log(LOG_ERROR, "Unable to start worker thread %d. Destroying.\n", worker->id);
491                                 ao2_unlink(pool->active_threads, worker);
492                         }
493                 } else {
494                         ast_log(LOG_WARNING, "Failed to activate worker thread %d. Destroying.\n", worker->id);
495                 }
496                 ao2_ref(worker, -1);
497         }
498 }
499
500 /*!
501  * \brief Queued task called when tasks are pushed into the threadpool
502  *
503  * This function first calls into the threadpool's listener to let it know
504  * that a task has been pushed. It then wakes up all idle threads and moves
505  * them into the active thread container.
506  * \param data A task_pushed_data
507  * \return 0
508  */
509 static int queued_task_pushed(void *data)
510 {
511         struct task_pushed_data *tpd = data;
512         struct ast_threadpool *pool = tpd->pool;
513         int was_empty = tpd->was_empty;
514         int state_changed;
515
516         if (pool->listener && pool->listener->callbacks->task_pushed) {
517                 pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty);
518         }
519         if (ao2_container_count(pool->idle_threads) == 0) {
520                 if (pool->options.auto_increment > 0) {
521                         grow(pool, pool->options.auto_increment);
522                         state_changed = 1;
523                 }
524         } else {
525                 ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
526                                 activate_thread, pool);
527                 state_changed = 1;
528         }
529         if (state_changed) {
530                 threadpool_send_state_changed(pool);
531         }
532         ao2_ref(tpd, -1);
533         return 0;
534 }
535
536 /*!
537  * \brief Taskprocessor listener callback called when a task is added
538  *
539  * The threadpool uses this opportunity to queue a task on its control taskprocessor
540  * in order to activate idle threads and notify the threadpool listener that the
541  * task has been pushed.
542  * \param listener The taskprocessor listener. The threadpool is the listener's private data
543  * \param was_empty True if the taskprocessor was empty prior to the task being pushed
544  */
545 static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener,
546                 int was_empty)
547 {
548         struct ast_threadpool *pool = listener->private_data;
549         struct task_pushed_data *tpd;
550         SCOPED_AO2LOCK(lock, pool);
551
552         if (pool->shutting_down) {
553                 return;
554         }
555         tpd = task_pushed_data_alloc(pool, was_empty);
556         if (!tpd) {
557                 return;
558         }
559
560         ast_taskprocessor_push(pool->control_tps, queued_task_pushed, tpd);
561 }
562
563 /*!
564  * \brief Queued task that handles the case where the threadpool's taskprocessor is emptied
565  *
566  * This simply lets the threadpool's listener know that the threadpool is devoid of tasks
567  * \param data The pool that has become empty
568  * \return 0
569  */
570 static int queued_emptied(void *data)
571 {
572         struct ast_threadpool *pool = data;
573
574         /* We already checked for existence of this callback when this was queued */
575         pool->listener->callbacks->emptied(pool, pool->listener);
576         return 0;
577 }
578
579 /*!
580  * \brief Taskprocessor listener emptied callback
581  *
582  * The threadpool queues a task to let the threadpool listener know that
583  * the threadpool no longer contains any tasks.
584  * \param listener The taskprocessor listener. The threadpool is the listener's private data.
585  */
586 static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener)
587 {
588         struct ast_threadpool *pool = listener->private_data;
589         SCOPED_AO2LOCK(lock, pool);
590
591         if (pool->shutting_down) {
592                 return;
593         }
594
595         if (pool->listener && pool->listener->callbacks->emptied) {
596                 ast_taskprocessor_push(pool->control_tps, queued_emptied, pool);
597         }
598 }
599
600 /*!
601  * \brief Taskprocessor listener shutdown callback
602  *
603  * The threadpool will shut down and destroy all of its worker threads when
604  * this is called back. By the time this gets called, the taskprocessor's
605  * control taskprocessor has already been destroyed. Therefore there is no risk
606  * in outright destroying the worker threads here.
607  * \param listener The taskprocessor listener. The threadpool is the listener's private data.
608  */
609 static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener)
610 {
611         struct ast_threadpool *pool = listener->private_data;
612
613         ao2_cleanup(pool->active_threads);
614         ao2_cleanup(pool->idle_threads);
615         ao2_cleanup(pool->zombie_threads);
616 }
617
618 /*!
619  * \brief Taskprocessor listener destroy callback
620  *
621  * Since the threadpool is an ao2 object, all that is necessary is to
622  * decrease the refcount. Since the control taskprocessor should already
623  * be destroyed by this point, this should be the final reference to the
624  * threadpool.
625  *
626  * \param private_data The threadpool to destroy
627  */
628 static void threadpool_destroy(void *private_data)
629 {
630         struct ast_threadpool *pool = private_data;
631         ao2_cleanup(pool);
632 }
633
634 /*!
635  * \brief Table of taskprocessor listener callbacks for threadpool's main taskprocessor
636  */
637 static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks = {
638         .alloc = threadpool_alloc,
639         .start = threadpool_tps_start,
640         .task_pushed = threadpool_tps_task_pushed,
641         .emptied = threadpool_tps_emptied,
642         .shutdown = threadpool_tps_shutdown,
643         .destroy = threadpool_destroy,
644 };
645
646 /*!
647  * \brief ao2 callback to kill a set number of threads.
648  *
649  * Threads will be unlinked from the container as long as the
650  * counter has not reached zero. The counter is decremented with
651  * each thread that is removed.
652  * \param obj The worker thread up for possible destruction
653  * \param arg The counter
654  * \param flags Unused
655  * \retval CMP_MATCH The counter has not reached zero, so this flag should be removed.
656  * \retval CMP_STOP The counter has reached zero so no more threads should be removed.
657  */
658 static int kill_threads(void *obj, void *arg, int flags)
659 {
660         int *num_to_kill = arg;
661
662         if (*num_to_kill > 0) {
663                 --(*num_to_kill);
664                 return CMP_MATCH;
665         } else {
666                 return CMP_STOP;
667         }
668 }
669
670 /*!
671  * \brief ao2 callback to zombify a set number of threads.
672  *
673  * Threads will be zombified as long as as the counter has not reached
674  * zero. The counter is decremented with each thread that is zombified.
675  *
676  * Zombifying a thread involves removing it from its current container,
677  * adding it to the zombie container, and changing the state of the
678  * worker to a zombie
679  *
680  * This callback is called from the threadpool control taskprocessor thread.
681  *
682  * \param obj The worker thread that may be zombified
683  * \param arg The pool to which the worker belongs
684  * \param data The counter
685  * \param flags Unused
686  * \retval CMP_MATCH The zombified thread should be removed from its current container
687  * \retval CMP_STOP Stop attempting to zombify threads
688  */
689 static int zombify_threads(void *obj, void *arg, void *data, int flags)
690 {
691         struct worker_thread *worker = obj;
692         struct ast_threadpool *pool = arg;
693         int *num_to_zombify = data;
694
695         if ((*num_to_zombify)-- > 0) {
696                 if (!ao2_link(pool->zombie_threads, worker)) {
697                         ast_log(LOG_WARNING, "Failed to zombify active thread %d. Thread will remain active\n", worker->id);
698                         return 0;
699                 }
700                 worker_set_state(worker, ZOMBIE);
701                 return CMP_MATCH;
702         } else {
703                 return CMP_STOP;
704         }
705 }
706
707 /*!
708  * \brief Remove threads from the threadpool
709  *
710  * The preference is to kill idle threads. However, if there are
711  * more threads to remove than there are idle threads, then active
712  * threads will be zombified instead.
713  *
714  * This function is called from the threadpool control taskprocessor thread.
715  *
716  * \param pool The threadpool to remove threads from
717  * \param delta The number of threads to remove
718  */
719 static void shrink(struct ast_threadpool *pool, int delta)
720 {
721         /*
722          * Preference is to kill idle threads, but
723          * we'll move on to deactivating active threads
724          * if we have to
725          */
726         int idle_threads = ao2_container_count(pool->idle_threads);
727         int idle_threads_to_kill = MIN(delta, idle_threads);
728         int active_threads_to_zombify = delta - idle_threads_to_kill;
729
730         ast_debug(3, "Destroying %d idle threads in threadpool %s\n", idle_threads_to_kill,
731                         ast_taskprocessor_name(pool->tps));
732
733         ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
734                         kill_threads, &idle_threads_to_kill);
735
736         ast_debug(3, "Destroying %d active threads in threadpool %s\n", active_threads_to_zombify,
737                         ast_taskprocessor_name(pool->tps));
738
739         ao2_callback_data(pool->active_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
740                         zombify_threads, pool, &active_threads_to_zombify);
741 }
742
743 /*!
744  * \brief Helper struct used for queued operations that change the size of the threadpool
745  */
746 struct set_size_data {
747         /*! The pool whose size is to change */
748         struct ast_threadpool *pool;
749         /*! The requested new size of the pool */
750         unsigned int size;
751 };
752
753 /*!
754  * \brief Allocate and initialize a set_size_data
755  * \param pool The pool for the set_size_data
756  * \param size The size to store in the set_size_data
757  */
758 static struct set_size_data *set_size_data_alloc(struct ast_threadpool *pool,
759                 unsigned int size)
760 {
761         struct set_size_data *ssd = ao2_alloc(sizeof(*ssd), NULL);
762         if (!ssd) {
763                 return NULL;
764         }
765
766         ssd->pool = pool;
767         ssd->size = size;
768         return ssd;
769 }
770
771 /*!
772  * \brief Change the size of the threadpool
773  *
774  * This can either result in shrinking or growing the threadpool depending
775  * on the new desired size and the current size.
776  *
777  * This function is run from the threadpool control taskprocessor thread
778  *
779  * \param data A set_size_data used for determining how to act
780  * \return 0
781  */
782 static int queued_set_size(void *data)
783 {
784         struct set_size_data *ssd = data;
785         struct ast_threadpool *pool = ssd->pool;
786         unsigned int num_threads = ssd->size;
787
788         /* We don't count zombie threads as being "live when potentially resizing */
789         unsigned int current_size = ao2_container_count(pool->active_threads) +
790                 ao2_container_count(pool->idle_threads);
791
792         if (current_size == num_threads) {
793                 ast_log(LOG_NOTICE, "Not changing threadpool size since new size %u is the same as current %u\n",
794                                 num_threads, current_size);
795                 return 0;
796         }
797
798         if (current_size < num_threads) {
799                 grow(pool, num_threads - current_size);
800         } else {
801                 shrink(pool, current_size - num_threads);
802         }
803
804         threadpool_send_state_changed(pool);
805         ao2_ref(ssd, -1);
806         return 0;
807 }
808
809 void ast_threadpool_set_size(struct ast_threadpool *pool, unsigned int size)
810 {
811         struct set_size_data *ssd;
812         SCOPED_AO2LOCK(lock, pool);
813         if (pool->shutting_down) {
814                 return;
815         }
816
817         ssd = set_size_data_alloc(pool, size);
818         if (!ssd) {
819                 return;
820         }
821
822         ast_taskprocessor_push(pool->control_tps, queued_set_size, ssd);
823 }
824
825 static void listener_destructor(void *obj)
826 {
827         struct ast_threadpool_listener *listener = obj;
828
829         listener->callbacks->destroy(listener->private_data);
830 }
831
832 struct ast_threadpool_listener *ast_threadpool_listener_alloc(
833                 const struct ast_threadpool_listener_callbacks *callbacks)
834 {
835         struct ast_threadpool_listener *listener = ao2_alloc(sizeof(*listener), listener_destructor);
836         if (!listener) {
837                 return NULL;
838         }
839         listener->callbacks = callbacks;
840         listener->private_data = listener->callbacks->alloc(listener);
841         if (!listener->private_data) {
842                 ao2_ref(listener, -1);
843                 return NULL;
844         }
845         return listener;
846 }
847
848 struct pool_options_pair {
849         struct ast_threadpool *pool;
850         struct ast_threadpool_options options;
851 };
852
853 struct ast_threadpool *ast_threadpool_create(const char *name,
854                 struct ast_threadpool_listener *listener,
855                 int initial_size, const struct ast_threadpool_options *options)
856 {
857         struct ast_threadpool *pool;
858         struct ast_taskprocessor *tps;
859         RAII_VAR(struct ast_taskprocessor_listener *, tps_listener,
860                         ast_taskprocessor_listener_alloc(&threadpool_tps_listener_callbacks),
861                         ao2_cleanup);
862
863         if (!tps_listener) {
864                 return NULL;
865         }
866
867         if (options->version != AST_THREADPOOL_OPTIONS_VERSION) {
868                 ast_log(LOG_WARNING, "Incompatible version of threadpool options in use.\n");
869                 return NULL;
870         }
871
872         tps = ast_taskprocessor_create_with_listener(name, tps_listener);
873
874         if (!tps) {
875                 return NULL;
876         }
877
878         pool = tps_listener->private_data;
879         pool->tps = tps;
880         if (listener) {
881                 ao2_ref(listener, +1);
882                 pool->listener = listener;
883         }
884         pool->options = *options;
885         ast_threadpool_set_size(pool, initial_size);
886         return pool;
887 }
888
889 int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data)
890 {
891         SCOPED_AO2LOCK(lock, pool);
892         if (!pool->shutting_down) {
893                 return ast_taskprocessor_push(pool->tps, task, data);
894         }
895         return 0;
896 }
897
898 void ast_threadpool_shutdown(struct ast_threadpool *pool)
899 {
900         /* Shut down the taskprocessors and everything else just
901          * takes care of itself via the taskprocessor callbacks
902          */
903         ao2_lock(pool);
904         pool->shutting_down = 1;
905         ao2_unlock(pool);
906         ast_taskprocessor_unreference(pool->control_tps);
907         ast_taskprocessor_unreference(pool->tps);
908 }
909
910 /*!
911  * A monotonically increasing integer used for worker
912  * thread identification.
913  */
914 static int worker_id_counter;
915
916 static int worker_thread_hash(const void *obj, int flags)
917 {
918         const struct worker_thread *worker = obj;
919
920         return worker->id;
921 }
922
923 static int worker_thread_cmp(void *obj, void *arg, int flags)
924 {
925         struct worker_thread *worker1 = obj;
926         struct worker_thread *worker2 = arg;
927
928         return worker1->id == worker2->id ? CMP_MATCH : 0;
929 }
930
931 /*!
932  * \brief shut a worker thread down
933  *
934  * Set the worker dead and then wait for its thread
935  * to finish executing.
936  *
937  * \param worker The worker thread to shut down
938  */
939 static void worker_shutdown(struct worker_thread *worker)
940 {
941         worker_set_state(worker, DEAD);
942         if (worker->thread != AST_PTHREADT_NULL) {
943                 pthread_join(worker->thread, NULL);
944                 worker->thread = AST_PTHREADT_NULL;
945         }
946 }
947
948 /*!
949  * \brief Worker thread destructor
950  *
951  * Called automatically when refcount reaches 0. Shuts
952  * down the worker thread and destroys its component
953  * parts
954  */
955 static void worker_thread_destroy(void *obj)
956 {
957         struct worker_thread *worker = obj;
958         ast_debug(3, "Destroying worker thread %d\n", worker->id);
959         worker_shutdown(worker);
960         ast_mutex_destroy(&worker->lock);
961         ast_cond_destroy(&worker->cond);
962 }
963
964 /*!
965  * \brief start point for worker threads
966  *
967  * Worker threads start in the active state but may
968  * immediately go idle if there is no work to be
969  * done
970  *
971  * \param arg The worker thread
972  * \retval NULL
973  */
974 static void *worker_start(void *arg)
975 {
976         struct worker_thread *worker = arg;
977
978         worker_active(worker);
979         return NULL;
980 }
981
982 /*!
983  * \brief Allocate and initialize a new worker thread
984  *
985  * This will create, initialize, and start the thread.
986  *
987  * \param pool The threadpool to which the worker will be added
988  * \retval NULL Failed to allocate or start the worker thread
989  * \retval non-NULL The newly-created worker thread
990  */
991 static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool)
992 {
993         struct worker_thread *worker = ao2_alloc(sizeof(*worker), worker_thread_destroy);
994         if (!worker) {
995                 return NULL;
996         }
997         worker->id = ast_atomic_fetchadd_int(&worker_id_counter, 1);
998         ast_mutex_init(&worker->lock);
999         ast_cond_init(&worker->cond, NULL);
1000         worker->pool = pool;
1001         worker->thread = AST_PTHREADT_NULL;
1002         worker->state = ALIVE;
1003         worker->options = pool->options;
1004         return worker;
1005 }
1006
1007 static int worker_thread_start(struct worker_thread *worker)
1008 {
1009         return ast_pthread_create(&worker->thread, NULL, worker_start, worker);
1010 }
1011
1012 /*!
1013  * \brief Active loop for worker threads
1014  *
1015  * The worker will stay in this loop for its lifetime,
1016  * executing tasks as they become available. If there
1017  * are no tasks currently available, then the thread
1018  * will go idle.
1019  *
1020  * \param worker The worker thread executing tasks.
1021  */
1022 static void worker_active(struct worker_thread *worker)
1023 {
1024         int alive = 1;
1025         while (alive) {
1026                 if (!threadpool_execute(worker->pool)) {
1027                         alive = worker_idle(worker);
1028                 }
1029         }
1030
1031         /* Reaching this portion means the thread is
1032          * on death's door. It may have been killed while
1033          * it was idle, in which case it can just die
1034          * peacefully. If it's a zombie, though, then
1035          * it needs to let the pool know so
1036          * that the thread can be removed from the
1037          * list of zombie threads.
1038          */
1039         if (worker->state == ZOMBIE) {
1040                 threadpool_zombie_thread_dead(worker->pool, worker);
1041         }
1042 }
1043
1044 /*!
1045  * \brief Idle function for worker threads
1046  *
1047  * The worker waits here until it gets told by the threadpool
1048  * to wake up.
1049  *
1050  * \param worker The idle worker
1051  * \retval 0 The thread is being woken up so that it can conclude.
1052  * \retval non-zero The thread is being woken up to do more work.
1053  */
1054 static int worker_idle(struct worker_thread *worker)
1055 {
1056         struct timeval start = ast_tvnow();
1057         struct timespec end = {
1058                 .tv_sec = start.tv_sec + worker->options.idle_timeout,
1059                 .tv_nsec = start.tv_usec * 1000,
1060         };
1061         SCOPED_MUTEX(lock, &worker->lock);
1062         if (worker->state != ALIVE) {
1063                 return 0;
1064         }
1065         threadpool_active_thread_idle(worker->pool, worker);
1066         while (!worker->wake_up) {
1067                 if (worker->options.idle_timeout <= 0) {
1068                         ast_cond_wait(&worker->cond, lock);
1069                 } else if (ast_cond_timedwait(&worker->cond, lock, &end) == ETIMEDOUT) {
1070                         break;
1071                 }
1072         }
1073
1074         if (!worker->wake_up) {
1075                 ast_debug(1, "Worker thread idle timeout reached. Dying.\n");
1076                 threadpool_idle_thread_dead(worker->pool, worker);
1077                 worker->state = DEAD;
1078         }
1079         worker->wake_up = 0;
1080         return worker->state == ALIVE;
1081 }
1082
1083 /*!
1084  * \brief Change a worker's state
1085  *
1086  * The threadpool calls into this function in order to let a worker know
1087  * how it should proceed.
1088  */
1089 static void worker_set_state(struct worker_thread *worker, enum worker_state state)
1090 {
1091         SCOPED_MUTEX(lock, &worker->lock);
1092         worker->state = state;
1093         worker->wake_up = 1;
1094         ast_cond_signal(&worker->cond);
1095 }
1096