Remove alloc and destroy callbacks from the taskprocessor.
[asterisk/asterisk.git] / main / threadpool.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2012, Digium, Inc.
5  *
6  * Mark Michelson <mmmichelson@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19
20 #include "asterisk.h"
21
22 #include "asterisk/threadpool.h"
23 #include "asterisk/taskprocessor.h"
24 #include "asterisk/astobj2.h"
25 #include "asterisk/utils.h"
26
27 #define THREAD_BUCKETS 89
28
29 /*!
30  * \brief An opaque threadpool structure
31  *
32  * A threadpool is a collection of threads that execute
33  * tasks from a common queue.
34  */
35 struct ast_threadpool {
36         /*! Threadpool listener */
37         struct ast_threadpool_listener *listener;
38         /*!
39          * \brief The container of active threads.
40          * Active threads are those that are currently running tasks
41          */
42         struct ao2_container *active_threads;
43         /*!
44          * \brief The container of idle threads.
45          * Idle threads are those that are currenly waiting to run tasks
46          */
47         struct ao2_container *idle_threads;
48         /*!
49          * \brief The container of zombie threads.
50          * Zombie threads may be running tasks, but they are scheduled to die soon
51          */
52         struct ao2_container *zombie_threads;
53         /*!
54          * \brief The main taskprocessor
55          *
56          * Tasks that are queued in this taskprocessor are
57          * doled out to the worker threads. Worker threads that
58          * execute tasks from the threadpool are executing tasks
59          * in this taskprocessor.
60          *
61          * The threadpool itself is actually the private data for
62          * this taskprocessor's listener. This way, as taskprocessor
63          * changes occur, the threadpool can alert its listeners
64          * appropriately.
65          */
66         struct ast_taskprocessor *tps;
67         /*!
68          * \brief The control taskprocessor
69          *
70          * This is a standard taskprocessor that uses the default
71          * taskprocessor listener. In other words, all tasks queued to
72          * this taskprocessor have a single thread that executes the
73          * tasks.
74          *
75          * All tasks that modify the state of the threadpool and all tasks
76          * that call out to threadpool listeners are pushed to this
77          * taskprocessor.
78          *
79          * For instance, when the threadpool changes sizes, a task is put
80          * into this taskprocessor to do so. When it comes time to tell the
81          * threadpool listener that worker threads have changed state,
82          * the task is placed in this taskprocessor.
83          *
84          * This is done for three main reasons
85          * 1) It ensures that listeners are given an accurate portrayal
86          * of the threadpool's current state. In other words, when a listener
87          * gets told a count of active, idle and zombie threads, it does not
88          * need to worry that internal state of the threadpool might be different
89          * from what it has been told.
90          * 2) It minimizes the locking required in both the threadpool and in
91          * threadpool listener's callbacks.
92          * 3) It ensures that listener callbacks are called in the same order
93          * that the threadpool had its state change.
94          */
95         struct ast_taskprocessor *control_tps;
96         /*! True if the threadpool is in the processof shutting down */
97         int shutting_down;
98         /*! Threadpool-specific options */
99         struct ast_threadpool_options options;
100 };
101
102 /*!
103  * \brief states for worker threads
104  */
105 enum worker_state {
106         /*! The worker is either active or idle */
107         ALIVE,
108         /*!
109          * The worker has been asked to shut down but
110          * may still be in the process of executing tasks.
111          * This transition happens when the threadpool needs
112          * to shrink and needs to kill active threads in order
113          * to do so.
114          */
115         ZOMBIE,
116         /*!
117          * The worker has been asked to shut down. Typically
118          * only idle threads go to this state directly, but
119          * active threads may go straight to this state when
120          * the threadpool is shut down.
121          */
122         DEAD,
123 };
124
125 /*!
126  * A thread that executes threadpool tasks
127  */
128 struct worker_thread {
129         /*! A unique (within a run of Asterisk) ID for the thread. Used for hashing and searching */
130         int id;
131         /*! Condition used in conjunction with state changes */
132         ast_cond_t cond;
133         /*! Lock used alongside the condition for state changes */
134         ast_mutex_t lock;
135         /*! The actual thread that is executing tasks */
136         pthread_t thread;
137         /*! A pointer to the threadpool. Needed to be able to execute tasks */
138         struct ast_threadpool *pool;
139         /*! The current state of the worker thread */
140         enum worker_state state;
141         /*! A boolean used to determine if an idle thread should become active */
142         int wake_up;
143         /*! Options for this threadpool */
144         struct ast_threadpool_options options;
145 };
146
147 /* Worker thread forward declarations. See definitions for documentation */
148 static int worker_thread_hash(const void *obj, int flags);
149 static int worker_thread_cmp(void *obj, void *arg, int flags);
150 static void worker_thread_destroy(void *obj);
151 static void worker_active(struct worker_thread *worker);
152 static void *worker_start(void *arg);
153 static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool);
154 static int worker_thread_start(struct worker_thread *worker);
155 static int worker_idle(struct worker_thread *worker);
156 static void worker_set_state(struct worker_thread *worker, enum worker_state state);
157 static void worker_shutdown(struct worker_thread *worker);
158
159 /*!
160  * \brief Notify the threadpool listener that the state has changed.
161  *
162  * This notifies the threadpool listener via its state_changed callback.
163  * \param pool The threadpool whose state has changed
164  */
165 static void threadpool_send_state_changed(struct ast_threadpool *pool)
166 {
167         int active_size = ao2_container_count(pool->active_threads);
168         int idle_size = ao2_container_count(pool->idle_threads);
169
170         if (pool->listener && pool->listener->callbacks->state_changed) {
171                 pool->listener->callbacks->state_changed(pool, pool->listener, active_size, idle_size);
172         }
173 }
174
175 /*!
176  * \brief Struct used for queued operations involving worker state changes
177  */
178 struct thread_worker_pair {
179         /*! Threadpool that contains the worker whose state has changed */
180         struct ast_threadpool *pool;
181         /*! Worker whose state has changed */
182         struct worker_thread *worker;
183 };
184
185 /*!
186  * \brief Destructor for thread_worker_pair
187  */
188 static void thread_worker_pair_destructor(void *obj)
189 {
190         struct thread_worker_pair *pair = obj;
191         ao2_ref(pair->worker, -1);
192 }
193
194 /*!
195  * \brief Allocate and initialize a thread_worker_pair
196  * \param pool Threadpool to assign to the thread_worker_pair
197  * \param worker Worker thread to assign to the thread_worker_pair
198  */
199 static struct thread_worker_pair *thread_worker_pair_alloc(struct ast_threadpool *pool,
200                 struct worker_thread *worker)
201 {
202         struct thread_worker_pair *pair = ao2_alloc(sizeof(*pair), thread_worker_pair_destructor);
203         if (!pair) {
204                 return NULL;
205         }
206         pair->pool = pool;
207         ao2_ref(worker, +1);
208         pair->worker = worker;
209         return pair;
210 }
211
212 /*!
213  * \brief Move a worker thread from the active container to the idle container.
214  *
215  * This function is called from the threadpool's control taskprocessor thread.
216  * \param data A thread_worker_pair containing the threadpool and the worker to move.
217  * \return 0
218  */
219 static int queued_active_thread_idle(void *data)
220 {
221         struct thread_worker_pair *pair = data;
222
223         ao2_link(pair->pool->idle_threads, pair->worker);
224         ao2_unlink(pair->pool->active_threads, pair->worker);
225
226         threadpool_send_state_changed(pair->pool);
227
228         ao2_ref(pair, -1);
229         return 0;
230 }
231
232 /*!
233  * \brief Queue a task to move a thread from the active list to the idle list
234  *
235  * This is called by a worker thread when it runs out of tasks to perform and
236  * goes idle.
237  * \param pool The threadpool to which the worker belongs
238  * \param worker The worker thread that has gone idle
239  */
240 static void threadpool_active_thread_idle(struct ast_threadpool *pool,
241                 struct worker_thread *worker)
242 {
243         struct thread_worker_pair *pair;
244         SCOPED_AO2LOCK(lock, pool);
245         if (pool->shutting_down) {
246                 return;
247         }
248         pair = thread_worker_pair_alloc(pool, worker);
249         if (!pair) {
250                 return;
251         }
252         ast_taskprocessor_push(pool->control_tps, queued_active_thread_idle, pair);
253 }
254
255 /*!
256  * \brief Kill a zombie thread
257  *
258  * This runs from the threadpool's control taskprocessor thread.
259  *
260  * \param data A thread_worker_pair containing the threadpool and the zombie thread
261  * \return 0
262  */
263 static int queued_zombie_thread_dead(void *data)
264 {
265         struct thread_worker_pair *pair = data;
266
267         ao2_unlink(pair->pool->zombie_threads, pair->worker);
268         threadpool_send_state_changed(pair->pool);
269
270         ao2_ref(pair, -1);
271         return 0;
272 }
273
274 /*!
275  * \brief Queue a task to kill a zombie thread
276  *
277  * This is called by a worker thread when it acknowledges that it is time for
278  * it to die.
279  */
280 static void threadpool_zombie_thread_dead(struct ast_threadpool *pool,
281                 struct worker_thread *worker)
282 {
283         struct thread_worker_pair *pair;
284         SCOPED_AO2LOCK(lock, pool);
285         if (pool->shutting_down) {
286                 return;
287         }
288         pair = thread_worker_pair_alloc(pool, worker);
289         if (!pair) {
290                 return;
291         }
292         ast_taskprocessor_push(pool->control_tps, queued_zombie_thread_dead, pair);
293 }
294
295 static int queued_idle_thread_dead(void *data)
296 {
297         struct thread_worker_pair *pair = data;
298
299         ao2_unlink(pair->pool->idle_threads, pair->worker);
300         threadpool_send_state_changed(pair->pool);
301
302         ao2_ref(pair, -1);
303         return 0;
304 }
305
306 static void threadpool_idle_thread_dead(struct ast_threadpool *pool,
307                 struct worker_thread *worker)
308 {
309         struct thread_worker_pair *pair;
310         SCOPED_AO2LOCK(lock, pool);
311         if (pool->shutting_down) {
312                 return;
313         }
314         pair = thread_worker_pair_alloc(pool, worker);
315         if (!pair) {
316                 return;
317         }
318         ast_taskprocessor_push(pool->control_tps, queued_idle_thread_dead, pair);
319 }
320
321 /*!
322  * \brief Execute a task in the threadpool
323  *
324  * This is the function that worker threads call in order to execute tasks
325  * in the threadpool
326  *
327  * \param pool The pool to which the tasks belong.
328  * \retval 0 Either the pool has been shut down or there are no tasks.
329  * \retval 1 There are still tasks remaining in the pool.
330  */
331 static int threadpool_execute(struct ast_threadpool *pool)
332 {
333         ao2_lock(pool);
334         if (!pool->shutting_down) {
335                 ao2_unlock(pool);
336                 return ast_taskprocessor_execute(pool->tps);
337         }
338         ao2_unlock(pool);
339         return 0;
340 }
341
342 /*!
343  * \brief Destroy a threadpool's components.
344  *
345  * This is the destructor called automatically when the threadpool's
346  * reference count reaches zero. This is not to be confused with
347  * threadpool_destroy.
348  *
349  * By the time this actually gets called, most of the cleanup has already
350  * been done in the pool. The only thing left to do is to release the
351  * final reference to the threadpool listener.
352  *
353  * \param obj The pool to destroy
354  */
355 static void threadpool_destructor(void *obj)
356 {
357         struct ast_threadpool *pool = obj;
358         ao2_cleanup(pool->listener);
359 }
360
361 /*
362  * \brief Allocate a threadpool
363  *
364  * This is implemented as a taskprocessor listener's alloc callback. This
365  * is because the threadpool exists as the private data on a taskprocessor
366  * listener.
367  *
368  * \param name The name of the threadpool.
369  * \param options The options the threadpool uses.
370  * \retval NULL Could not initialize threadpool properly
371  * \retval non-NULL The newly-allocated threadpool
372  */
373 static void *threadpool_alloc(const char *name, const struct ast_threadpool_options *options)
374 {
375         RAII_VAR(struct ast_threadpool *, pool,
376                         ao2_alloc(sizeof(*pool), threadpool_destructor), ao2_cleanup);
377         struct ast_str *control_tps_name = ast_str_create(64);
378
379         if (!control_tps_name) {
380                 return NULL;
381         }
382
383         ast_str_set(&control_tps_name, 0, "%s-control", name);
384
385         pool->control_tps = ast_taskprocessor_get(ast_str_buffer(control_tps_name), TPS_REF_DEFAULT);
386         ast_free(control_tps_name);
387         if (!pool->control_tps) {
388                 return NULL;
389         }
390         pool->active_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
391         if (!pool->active_threads) {
392                 return NULL;
393         }
394         pool->idle_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
395         if (!pool->idle_threads) {
396                 return NULL;
397         }
398         pool->zombie_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
399         if (!pool->zombie_threads) {
400                 return NULL;
401         }
402         pool->options = *options;
403
404         ao2_ref(pool, +1);
405         return pool;
406 }
407
408 static int threadpool_tps_start(struct ast_taskprocessor_listener *listener)
409 {
410         return 0;
411 }
412
413 /*!
414  * \brief helper used for queued task when tasks are pushed
415  */
416 struct task_pushed_data {
417         /*! Pool into which a task was pushed */
418         struct ast_threadpool *pool;
419         /*! Indicator of whether the pool had no tasks prior to the new task being added */
420         int was_empty;
421 };
422
423 /*!
424  * \brief Allocate and initialize a task_pushed_data
425  * \param pool The threadpool to set in the task_pushed_data
426  * \param was_empty The was_empty value to set in the task_pushed_data
427  * \retval NULL Unable to allocate task_pushed_data
428  * \retval non-NULL The newly-allocated task_pushed_data
429  */
430 static struct task_pushed_data *task_pushed_data_alloc(struct ast_threadpool *pool,
431                 int was_empty)
432 {
433         struct task_pushed_data *tpd = ao2_alloc(sizeof(*tpd), NULL);
434
435         if (!tpd) {
436                 return NULL;
437         }
438         tpd->pool = pool;
439         tpd->was_empty = was_empty;
440         return tpd;
441 }
442
443 /*!
444  * \brief Activate idle threads
445  *
446  * This function always returns CMP_MATCH because all workers that this
447  * function acts on need to be seen as matches so they are unlinked from the
448  * list of idle threads.
449  *
450  * Called as an ao2_callback in the threadpool's control taskprocessor thread.
451  * \param obj The worker to activate
452  * \param arg The pool where the worker belongs
453  * \retval CMP_MATCH
454  */
455 static int activate_thread(void *obj, void *arg, int flags)
456 {
457         struct worker_thread *worker = obj;
458         struct ast_threadpool *pool = arg;
459
460         if (!ao2_link(pool->active_threads, worker)) {
461                 /* If we can't link the idle thread into the active container, then
462                  * we'll just leave the thread idle and not wake it up.
463                  */
464                 ast_log(LOG_WARNING, "Failed to activate thread %d. Remaining idle\n",
465                                 worker->id);
466                 return 0;
467         }
468         worker_set_state(worker, ALIVE);
469         return CMP_MATCH;
470 }
471
472 /*!
473  * \brief Add threads to the threadpool
474  *
475  * This function is called from the threadpool's control taskprocessor thread.
476  * \param pool The pool that is expanding
477  * \delta The number of threads to add to the pool
478  */
479 static void grow(struct ast_threadpool *pool, int delta)
480 {
481         int i;
482
483         ast_debug(3, "Increasing threadpool %s's size by %d\n",
484                         ast_taskprocessor_name(pool->tps), delta);
485
486         for (i = 0; i < delta; ++i) {
487                 struct worker_thread *worker = worker_thread_alloc(pool);
488                 if (!worker) {
489                         return;
490                 }
491                 if (ao2_link(pool->active_threads, worker)) {
492                         if (worker_thread_start(worker)) {
493                                 ast_log(LOG_ERROR, "Unable to start worker thread %d. Destroying.\n", worker->id);
494                                 ao2_unlink(pool->active_threads, worker);
495                         }
496                 } else {
497                         ast_log(LOG_WARNING, "Failed to activate worker thread %d. Destroying.\n", worker->id);
498                 }
499                 ao2_ref(worker, -1);
500         }
501 }
502
503 /*!
504  * \brief Queued task called when tasks are pushed into the threadpool
505  *
506  * This function first calls into the threadpool's listener to let it know
507  * that a task has been pushed. It then wakes up all idle threads and moves
508  * them into the active thread container.
509  * \param data A task_pushed_data
510  * \return 0
511  */
512 static int queued_task_pushed(void *data)
513 {
514         struct task_pushed_data *tpd = data;
515         struct ast_threadpool *pool = tpd->pool;
516         int was_empty = tpd->was_empty;
517         int state_changed;
518
519         if (pool->listener && pool->listener->callbacks->task_pushed) {
520                 pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty);
521         }
522         if (ao2_container_count(pool->idle_threads) == 0) {
523                 if (pool->options.auto_increment > 0) {
524                         grow(pool, pool->options.auto_increment);
525                         state_changed = 1;
526                 }
527         } else {
528                 ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
529                                 activate_thread, pool);
530                 state_changed = 1;
531         }
532         if (state_changed) {
533                 threadpool_send_state_changed(pool);
534         }
535         ao2_ref(tpd, -1);
536         return 0;
537 }
538
539 /*!
540  * \brief Taskprocessor listener callback called when a task is added
541  *
542  * The threadpool uses this opportunity to queue a task on its control taskprocessor
543  * in order to activate idle threads and notify the threadpool listener that the
544  * task has been pushed.
545  * \param listener The taskprocessor listener. The threadpool is the listener's private data
546  * \param was_empty True if the taskprocessor was empty prior to the task being pushed
547  */
548 static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener,
549                 int was_empty)
550 {
551         struct ast_threadpool *pool = listener->user_data;
552         struct task_pushed_data *tpd;
553         SCOPED_AO2LOCK(lock, pool);
554
555         if (pool->shutting_down) {
556                 return;
557         }
558         tpd = task_pushed_data_alloc(pool, was_empty);
559         if (!tpd) {
560                 return;
561         }
562
563         ast_taskprocessor_push(pool->control_tps, queued_task_pushed, tpd);
564 }
565
566 /*!
567  * \brief Queued task that handles the case where the threadpool's taskprocessor is emptied
568  *
569  * This simply lets the threadpool's listener know that the threadpool is devoid of tasks
570  * \param data The pool that has become empty
571  * \return 0
572  */
573 static int queued_emptied(void *data)
574 {
575         struct ast_threadpool *pool = data;
576
577         /* We already checked for existence of this callback when this was queued */
578         pool->listener->callbacks->emptied(pool, pool->listener);
579         return 0;
580 }
581
582 /*!
583  * \brief Taskprocessor listener emptied callback
584  *
585  * The threadpool queues a task to let the threadpool listener know that
586  * the threadpool no longer contains any tasks.
587  * \param listener The taskprocessor listener. The threadpool is the listener's private data.
588  */
589 static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener)
590 {
591         struct ast_threadpool *pool = listener->user_data;
592         SCOPED_AO2LOCK(lock, pool);
593
594         if (pool->shutting_down) {
595                 return;
596         }
597
598         if (pool->listener && pool->listener->callbacks->emptied) {
599                 ast_taskprocessor_push(pool->control_tps, queued_emptied, pool);
600         }
601 }
602
603 /*!
604  * \brief Taskprocessor listener shutdown callback
605  *
606  * The threadpool will shut down and destroy all of its worker threads when
607  * this is called back. By the time this gets called, the taskprocessor's
608  * control taskprocessor has already been destroyed. Therefore there is no risk
609  * in outright destroying the worker threads here.
610  * \param listener The taskprocessor listener. The threadpool is the listener's private data.
611  */
612 static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener)
613 {
614         struct ast_threadpool *pool = listener->user_data;
615
616         ao2_cleanup(pool->active_threads);
617         ao2_cleanup(pool->idle_threads);
618         ao2_cleanup(pool->zombie_threads);
619         ao2_cleanup(pool);
620 }
621
622 /*!
623  * \brief Table of taskprocessor listener callbacks for threadpool's main taskprocessor
624  */
625 static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks = {
626         .start = threadpool_tps_start,
627         .task_pushed = threadpool_tps_task_pushed,
628         .emptied = threadpool_tps_emptied,
629         .shutdown = threadpool_tps_shutdown,
630 };
631
632 /*!
633  * \brief ao2 callback to kill a set number of threads.
634  *
635  * Threads will be unlinked from the container as long as the
636  * counter has not reached zero. The counter is decremented with
637  * each thread that is removed.
638  * \param obj The worker thread up for possible destruction
639  * \param arg The counter
640  * \param flags Unused
641  * \retval CMP_MATCH The counter has not reached zero, so this flag should be removed.
642  * \retval CMP_STOP The counter has reached zero so no more threads should be removed.
643  */
644 static int kill_threads(void *obj, void *arg, int flags)
645 {
646         int *num_to_kill = arg;
647
648         if (*num_to_kill > 0) {
649                 --(*num_to_kill);
650                 return CMP_MATCH;
651         } else {
652                 return CMP_STOP;
653         }
654 }
655
656 /*!
657  * \brief ao2 callback to zombify a set number of threads.
658  *
659  * Threads will be zombified as long as as the counter has not reached
660  * zero. The counter is decremented with each thread that is zombified.
661  *
662  * Zombifying a thread involves removing it from its current container,
663  * adding it to the zombie container, and changing the state of the
664  * worker to a zombie
665  *
666  * This callback is called from the threadpool control taskprocessor thread.
667  *
668  * \param obj The worker thread that may be zombified
669  * \param arg The pool to which the worker belongs
670  * \param data The counter
671  * \param flags Unused
672  * \retval CMP_MATCH The zombified thread should be removed from its current container
673  * \retval CMP_STOP Stop attempting to zombify threads
674  */
675 static int zombify_threads(void *obj, void *arg, void *data, int flags)
676 {
677         struct worker_thread *worker = obj;
678         struct ast_threadpool *pool = arg;
679         int *num_to_zombify = data;
680
681         if ((*num_to_zombify)-- > 0) {
682                 if (!ao2_link(pool->zombie_threads, worker)) {
683                         ast_log(LOG_WARNING, "Failed to zombify active thread %d. Thread will remain active\n", worker->id);
684                         return 0;
685                 }
686                 worker_set_state(worker, ZOMBIE);
687                 return CMP_MATCH;
688         } else {
689                 return CMP_STOP;
690         }
691 }
692
693 /*!
694  * \brief Remove threads from the threadpool
695  *
696  * The preference is to kill idle threads. However, if there are
697  * more threads to remove than there are idle threads, then active
698  * threads will be zombified instead.
699  *
700  * This function is called from the threadpool control taskprocessor thread.
701  *
702  * \param pool The threadpool to remove threads from
703  * \param delta The number of threads to remove
704  */
705 static void shrink(struct ast_threadpool *pool, int delta)
706 {
707         /*
708          * Preference is to kill idle threads, but
709          * we'll move on to deactivating active threads
710          * if we have to
711          */
712         int idle_threads = ao2_container_count(pool->idle_threads);
713         int idle_threads_to_kill = MIN(delta, idle_threads);
714         int active_threads_to_zombify = delta - idle_threads_to_kill;
715
716         ast_debug(3, "Destroying %d idle threads in threadpool %s\n", idle_threads_to_kill,
717                         ast_taskprocessor_name(pool->tps));
718
719         ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
720                         kill_threads, &idle_threads_to_kill);
721
722         ast_debug(3, "Destroying %d active threads in threadpool %s\n", active_threads_to_zombify,
723                         ast_taskprocessor_name(pool->tps));
724
725         ao2_callback_data(pool->active_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
726                         zombify_threads, pool, &active_threads_to_zombify);
727 }
728
729 /*!
730  * \brief Helper struct used for queued operations that change the size of the threadpool
731  */
732 struct set_size_data {
733         /*! The pool whose size is to change */
734         struct ast_threadpool *pool;
735         /*! The requested new size of the pool */
736         unsigned int size;
737 };
738
739 /*!
740  * \brief Allocate and initialize a set_size_data
741  * \param pool The pool for the set_size_data
742  * \param size The size to store in the set_size_data
743  */
744 static struct set_size_data *set_size_data_alloc(struct ast_threadpool *pool,
745                 unsigned int size)
746 {
747         struct set_size_data *ssd = ao2_alloc(sizeof(*ssd), NULL);
748         if (!ssd) {
749                 return NULL;
750         }
751
752         ssd->pool = pool;
753         ssd->size = size;
754         return ssd;
755 }
756
757 /*!
758  * \brief Change the size of the threadpool
759  *
760  * This can either result in shrinking or growing the threadpool depending
761  * on the new desired size and the current size.
762  *
763  * This function is run from the threadpool control taskprocessor thread
764  *
765  * \param data A set_size_data used for determining how to act
766  * \return 0
767  */
768 static int queued_set_size(void *data)
769 {
770         struct set_size_data *ssd = data;
771         struct ast_threadpool *pool = ssd->pool;
772         unsigned int num_threads = ssd->size;
773
774         /* We don't count zombie threads as being "live when potentially resizing */
775         unsigned int current_size = ao2_container_count(pool->active_threads) +
776                 ao2_container_count(pool->idle_threads);
777
778         if (current_size == num_threads) {
779                 ast_log(LOG_NOTICE, "Not changing threadpool size since new size %u is the same as current %u\n",
780                                 num_threads, current_size);
781                 return 0;
782         }
783
784         if (current_size < num_threads) {
785                 grow(pool, num_threads - current_size);
786         } else {
787                 shrink(pool, current_size - num_threads);
788         }
789
790         threadpool_send_state_changed(pool);
791         ao2_ref(ssd, -1);
792         return 0;
793 }
794
795 void ast_threadpool_set_size(struct ast_threadpool *pool, unsigned int size)
796 {
797         struct set_size_data *ssd;
798         SCOPED_AO2LOCK(lock, pool);
799         if (pool->shutting_down) {
800                 return;
801         }
802
803         ssd = set_size_data_alloc(pool, size);
804         if (!ssd) {
805                 return;
806         }
807
808         ast_taskprocessor_push(pool->control_tps, queued_set_size, ssd);
809 }
810
811 static void listener_destructor(void *obj)
812 {
813         struct ast_threadpool_listener *listener = obj;
814
815         listener->callbacks->destroy(listener->private_data);
816 }
817
818 struct ast_threadpool_listener *ast_threadpool_listener_alloc(
819                 const struct ast_threadpool_listener_callbacks *callbacks)
820 {
821         struct ast_threadpool_listener *listener = ao2_alloc(sizeof(*listener), listener_destructor);
822         if (!listener) {
823                 return NULL;
824         }
825         listener->callbacks = callbacks;
826         listener->private_data = listener->callbacks->alloc(listener);
827         if (!listener->private_data) {
828                 ao2_ref(listener, -1);
829                 return NULL;
830         }
831         return listener;
832 }
833
834 struct pool_options_pair {
835         struct ast_threadpool *pool;
836         struct ast_threadpool_options options;
837 };
838
839 struct ast_threadpool *ast_threadpool_create(const char *name,
840                 struct ast_threadpool_listener *listener,
841                 int initial_size, const struct ast_threadpool_options *options)
842 {
843         struct ast_taskprocessor *tps;
844         RAII_VAR(struct ast_taskprocessor_listener *, tps_listener, NULL, ao2_cleanup);
845         RAII_VAR(struct ast_threadpool *, pool, threadpool_alloc(name, options), ao2_cleanup);
846
847         if (!pool) {
848                 return NULL;
849         }
850         
851         tps_listener = ast_taskprocessor_listener_alloc(&threadpool_tps_listener_callbacks, pool);
852         if (!tps_listener) {
853                 return NULL;
854         }
855
856         if (options->version != AST_THREADPOOL_OPTIONS_VERSION) {
857                 ast_log(LOG_WARNING, "Incompatible version of threadpool options in use.\n");
858                 return NULL;
859         }
860
861         tps = ast_taskprocessor_create_with_listener(name, tps_listener);
862         if (!tps) {
863                 return NULL;
864         }
865
866         pool->tps = tps;
867         if (listener) {
868                 ao2_ref(listener, +1);
869                 pool->listener = listener;
870         }
871         ast_threadpool_set_size(pool, initial_size);
872         ao2_ref(pool, +1);
873         return pool;
874 }
875
876 int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data)
877 {
878         SCOPED_AO2LOCK(lock, pool);
879         if (!pool->shutting_down) {
880                 return ast_taskprocessor_push(pool->tps, task, data);
881         }
882         return 0;
883 }
884
885 void ast_threadpool_shutdown(struct ast_threadpool *pool)
886 {
887         /* Shut down the taskprocessors and everything else just
888          * takes care of itself via the taskprocessor callbacks
889          */
890         ao2_lock(pool);
891         pool->shutting_down = 1;
892         ao2_unlock(pool);
893         ast_taskprocessor_unreference(pool->control_tps);
894         ast_taskprocessor_unreference(pool->tps);
895 }
896
897 /*!
898  * A monotonically increasing integer used for worker
899  * thread identification.
900  */
901 static int worker_id_counter;
902
903 static int worker_thread_hash(const void *obj, int flags)
904 {
905         const struct worker_thread *worker = obj;
906
907         return worker->id;
908 }
909
910 static int worker_thread_cmp(void *obj, void *arg, int flags)
911 {
912         struct worker_thread *worker1 = obj;
913         struct worker_thread *worker2 = arg;
914
915         return worker1->id == worker2->id ? CMP_MATCH : 0;
916 }
917
918 /*!
919  * \brief shut a worker thread down
920  *
921  * Set the worker dead and then wait for its thread
922  * to finish executing.
923  *
924  * \param worker The worker thread to shut down
925  */
926 static void worker_shutdown(struct worker_thread *worker)
927 {
928         worker_set_state(worker, DEAD);
929         if (worker->thread != AST_PTHREADT_NULL) {
930                 pthread_join(worker->thread, NULL);
931                 worker->thread = AST_PTHREADT_NULL;
932         }
933 }
934
935 /*!
936  * \brief Worker thread destructor
937  *
938  * Called automatically when refcount reaches 0. Shuts
939  * down the worker thread and destroys its component
940  * parts
941  */
942 static void worker_thread_destroy(void *obj)
943 {
944         struct worker_thread *worker = obj;
945         ast_debug(3, "Destroying worker thread %d\n", worker->id);
946         worker_shutdown(worker);
947         ast_mutex_destroy(&worker->lock);
948         ast_cond_destroy(&worker->cond);
949 }
950
951 /*!
952  * \brief start point for worker threads
953  *
954  * Worker threads start in the active state but may
955  * immediately go idle if there is no work to be
956  * done
957  *
958  * \param arg The worker thread
959  * \retval NULL
960  */
961 static void *worker_start(void *arg)
962 {
963         struct worker_thread *worker = arg;
964
965         worker_active(worker);
966         return NULL;
967 }
968
969 /*!
970  * \brief Allocate and initialize a new worker thread
971  *
972  * This will create, initialize, and start the thread.
973  *
974  * \param pool The threadpool to which the worker will be added
975  * \retval NULL Failed to allocate or start the worker thread
976  * \retval non-NULL The newly-created worker thread
977  */
978 static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool)
979 {
980         struct worker_thread *worker = ao2_alloc(sizeof(*worker), worker_thread_destroy);
981         if (!worker) {
982                 return NULL;
983         }
984         worker->id = ast_atomic_fetchadd_int(&worker_id_counter, 1);
985         ast_mutex_init(&worker->lock);
986         ast_cond_init(&worker->cond, NULL);
987         worker->pool = pool;
988         worker->thread = AST_PTHREADT_NULL;
989         worker->state = ALIVE;
990         worker->options = pool->options;
991         return worker;
992 }
993
994 static int worker_thread_start(struct worker_thread *worker)
995 {
996         return ast_pthread_create(&worker->thread, NULL, worker_start, worker);
997 }
998
999 /*!
1000  * \brief Active loop for worker threads
1001  *
1002  * The worker will stay in this loop for its lifetime,
1003  * executing tasks as they become available. If there
1004  * are no tasks currently available, then the thread
1005  * will go idle.
1006  *
1007  * \param worker The worker thread executing tasks.
1008  */
1009 static void worker_active(struct worker_thread *worker)
1010 {
1011         int alive = 1;
1012         while (alive) {
1013                 if (!threadpool_execute(worker->pool)) {
1014                         alive = worker_idle(worker);
1015                 }
1016         }
1017
1018         /* Reaching this portion means the thread is
1019          * on death's door. It may have been killed while
1020          * it was idle, in which case it can just die
1021          * peacefully. If it's a zombie, though, then
1022          * it needs to let the pool know so
1023          * that the thread can be removed from the
1024          * list of zombie threads.
1025          */
1026         if (worker->state == ZOMBIE) {
1027                 threadpool_zombie_thread_dead(worker->pool, worker);
1028         }
1029 }
1030
1031 /*!
1032  * \brief Idle function for worker threads
1033  *
1034  * The worker waits here until it gets told by the threadpool
1035  * to wake up.
1036  *
1037  * \param worker The idle worker
1038  * \retval 0 The thread is being woken up so that it can conclude.
1039  * \retval non-zero The thread is being woken up to do more work.
1040  */
1041 static int worker_idle(struct worker_thread *worker)
1042 {
1043         struct timeval start = ast_tvnow();
1044         struct timespec end = {
1045                 .tv_sec = start.tv_sec + worker->options.idle_timeout,
1046                 .tv_nsec = start.tv_usec * 1000,
1047         };
1048         SCOPED_MUTEX(lock, &worker->lock);
1049         if (worker->state != ALIVE) {
1050                 return 0;
1051         }
1052         threadpool_active_thread_idle(worker->pool, worker);
1053         while (!worker->wake_up) {
1054                 if (worker->options.idle_timeout <= 0) {
1055                         ast_cond_wait(&worker->cond, lock);
1056                 } else if (ast_cond_timedwait(&worker->cond, lock, &end) == ETIMEDOUT) {
1057                         break;
1058                 }
1059         }
1060
1061         if (!worker->wake_up) {
1062                 ast_debug(1, "Worker thread idle timeout reached. Dying.\n");
1063                 threadpool_idle_thread_dead(worker->pool, worker);
1064                 worker->state = DEAD;
1065         }
1066         worker->wake_up = 0;
1067         return worker->state == ALIVE;
1068 }
1069
1070 /*!
1071  * \brief Change a worker's state
1072  *
1073  * The threadpool calls into this function in order to let a worker know
1074  * how it should proceed.
1075  */
1076 static void worker_set_state(struct worker_thread *worker, enum worker_state state)
1077 {
1078         SCOPED_MUTEX(lock, &worker->lock);
1079         worker->state = state;
1080         worker->wake_up = 1;
1081         ast_cond_signal(&worker->cond);
1082 }
1083