CI: Various updates to buildAsterisk.sh
[asterisk/asterisk.git] / main / threadpool.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2012-2013, Digium, Inc.
5  *
6  * Mark Michelson <mmmichelson@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19
20 #include "asterisk.h"
21
22 #include "asterisk/threadpool.h"
23 #include "asterisk/taskprocessor.h"
24 #include "asterisk/astobj2.h"
25 #include "asterisk/utils.h"
26
27 /* Needs to stay prime if increased */
28 #define THREAD_BUCKETS 89
29
30 /*!
31  * \brief An opaque threadpool structure
32  *
33  * A threadpool is a collection of threads that execute
34  * tasks from a common queue.
35  */
36 struct ast_threadpool {
37         /*! Threadpool listener */
38         struct ast_threadpool_listener *listener;
39         /*!
40          * \brief The container of active threads.
41          * Active threads are those that are currently running tasks
42          */
43         struct ao2_container *active_threads;
44         /*!
45          * \brief The container of idle threads.
46          * Idle threads are those that are currenly waiting to run tasks
47          */
48         struct ao2_container *idle_threads;
49         /*!
50          * \brief The container of zombie threads.
51          * Zombie threads may be running tasks, but they are scheduled to die soon
52          */
53         struct ao2_container *zombie_threads;
54         /*!
55          * \brief The main taskprocessor
56          *
57          * Tasks that are queued in this taskprocessor are
58          * doled out to the worker threads. Worker threads that
59          * execute tasks from the threadpool are executing tasks
60          * in this taskprocessor.
61          *
62          * The threadpool itself is actually the private data for
63          * this taskprocessor's listener. This way, as taskprocessor
64          * changes occur, the threadpool can alert its listeners
65          * appropriately.
66          */
67         struct ast_taskprocessor *tps;
68         /*!
69          * \brief The control taskprocessor
70          *
71          * This is a standard taskprocessor that uses the default
72          * taskprocessor listener. In other words, all tasks queued to
73          * this taskprocessor have a single thread that executes the
74          * tasks.
75          *
76          * All tasks that modify the state of the threadpool and all tasks
77          * that call out to threadpool listeners are pushed to this
78          * taskprocessor.
79          *
80          * For instance, when the threadpool changes sizes, a task is put
81          * into this taskprocessor to do so. When it comes time to tell the
82          * threadpool listener that worker threads have changed state,
83          * the task is placed in this taskprocessor.
84          *
85          * This is done for three main reasons
86          * 1) It ensures that listeners are given an accurate portrayal
87          * of the threadpool's current state. In other words, when a listener
88          * gets told a count of active, idle and zombie threads, it does not
89          * need to worry that internal state of the threadpool might be different
90          * from what it has been told.
91          * 2) It minimizes the locking required in both the threadpool and in
92          * threadpool listener's callbacks.
93          * 3) It ensures that listener callbacks are called in the same order
94          * that the threadpool had its state change.
95          */
96         struct ast_taskprocessor *control_tps;
97         /*! True if the threadpool is in the process of shutting down */
98         int shutting_down;
99         /*! Threadpool-specific options */
100         struct ast_threadpool_options options;
101 };
102
103 /*!
104  * \brief listener for a threadpool
105  *
106  * The listener is notified of changes in a threadpool. It can
107  * react by doing things like increasing the number of threads
108  * in the pool
109  */
110 struct ast_threadpool_listener {
111         /*! Callbacks called by the threadpool */
112         const struct ast_threadpool_listener_callbacks *callbacks;
113         /*! User data for the listener */
114         void *user_data;
115 };
116
117 /*!
118  * \brief states for worker threads
119  */
120 enum worker_state {
121         /*! The worker is either active or idle */
122         ALIVE,
123         /*!
124          * The worker has been asked to shut down but
125          * may still be in the process of executing tasks.
126          * This transition happens when the threadpool needs
127          * to shrink and needs to kill active threads in order
128          * to do so.
129          */
130         ZOMBIE,
131         /*!
132          * The worker has been asked to shut down. Typically
133          * only idle threads go to this state directly, but
134          * active threads may go straight to this state when
135          * the threadpool is shut down.
136          */
137         DEAD,
138 };
139
140 /*!
141  * A thread that executes threadpool tasks
142  */
143 struct worker_thread {
144         /*! A unique (within a run of Asterisk) ID for the thread. Used for hashing and searching */
145         int id;
146         /*! Condition used in conjunction with state changes */
147         ast_cond_t cond;
148         /*! Lock used alongside the condition for state changes */
149         ast_mutex_t lock;
150         /*! The actual thread that is executing tasks */
151         pthread_t thread;
152         /*! A pointer to the threadpool. Needed to be able to execute tasks */
153         struct ast_threadpool *pool;
154         /*! The current state of the worker thread */
155         enum worker_state state;
156         /*! A boolean used to determine if an idle thread should become active */
157         int wake_up;
158         /*! Options for this threadpool */
159         struct ast_threadpool_options options;
160 };
161
162 /* Worker thread forward declarations. See definitions for documentation */
163 static int worker_thread_hash(const void *obj, int flags);
164 static int worker_thread_cmp(void *obj, void *arg, int flags);
165 static void worker_thread_destroy(void *obj);
166 static void worker_active(struct worker_thread *worker);
167 static void *worker_start(void *arg);
168 static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool);
169 static int worker_thread_start(struct worker_thread *worker);
170 static int worker_idle(struct worker_thread *worker);
171 static int worker_set_state(struct worker_thread *worker, enum worker_state state);
172 static void worker_shutdown(struct worker_thread *worker);
173
174 /*!
175  * \brief Notify the threadpool listener that the state has changed.
176  *
177  * This notifies the threadpool listener via its state_changed callback.
178  * \param pool The threadpool whose state has changed
179  */
180 static void threadpool_send_state_changed(struct ast_threadpool *pool)
181 {
182         int active_size = ao2_container_count(pool->active_threads);
183         int idle_size = ao2_container_count(pool->idle_threads);
184
185         if (pool->listener && pool->listener->callbacks->state_changed) {
186                 pool->listener->callbacks->state_changed(pool, pool->listener, active_size, idle_size);
187         }
188 }
189
190 /*!
191  * \brief Struct used for queued operations involving worker state changes
192  */
193 struct thread_worker_pair {
194         /*! Threadpool that contains the worker whose state has changed */
195         struct ast_threadpool *pool;
196         /*! Worker whose state has changed */
197         struct worker_thread *worker;
198 };
199
200 /*!
201  * \brief Destructor for thread_worker_pair
202  */
203 static void thread_worker_pair_free(struct thread_worker_pair *pair)
204 {
205         ao2_ref(pair->worker, -1);
206         ast_free(pair);
207 }
208
209 /*!
210  * \brief Allocate and initialize a thread_worker_pair
211  * \param pool Threadpool to assign to the thread_worker_pair
212  * \param worker Worker thread to assign to the thread_worker_pair
213  */
214 static struct thread_worker_pair *thread_worker_pair_alloc(struct ast_threadpool *pool,
215                 struct worker_thread *worker)
216 {
217         struct thread_worker_pair *pair = ast_malloc(sizeof(*pair));
218         if (!pair) {
219                 return NULL;
220         }
221         pair->pool = pool;
222         ao2_ref(worker, +1);
223         pair->worker = worker;
224
225         return pair;
226 }
227
228 /*!
229  * \brief Move a worker thread from the active container to the idle container.
230  *
231  * This function is called from the threadpool's control taskprocessor thread.
232  * \param data A thread_worker_pair containing the threadpool and the worker to move.
233  * \return 0
234  */
235 static int queued_active_thread_idle(void *data)
236 {
237         struct thread_worker_pair *pair = data;
238
239         ao2_link(pair->pool->idle_threads, pair->worker);
240         ao2_unlink(pair->pool->active_threads, pair->worker);
241
242         threadpool_send_state_changed(pair->pool);
243
244         thread_worker_pair_free(pair);
245         return 0;
246 }
247
248 /*!
249  * \brief Queue a task to move a thread from the active list to the idle list
250  *
251  * This is called by a worker thread when it runs out of tasks to perform and
252  * goes idle.
253  * \param pool The threadpool to which the worker belongs
254  * \param worker The worker thread that has gone idle
255  */
256 static void threadpool_active_thread_idle(struct ast_threadpool *pool,
257                 struct worker_thread *worker)
258 {
259         struct thread_worker_pair *pair;
260         SCOPED_AO2LOCK(lock, pool);
261
262         if (pool->shutting_down) {
263                 return;
264         }
265
266         pair = thread_worker_pair_alloc(pool, worker);
267         if (!pair) {
268                 return;
269         }
270
271         if (ast_taskprocessor_push(pool->control_tps, queued_active_thread_idle, pair)) {
272                 thread_worker_pair_free(pair);
273         }
274 }
275
276 /*!
277  * \brief Kill a zombie thread
278  *
279  * This runs from the threadpool's control taskprocessor thread.
280  *
281  * \param data A thread_worker_pair containing the threadpool and the zombie thread
282  * \return 0
283  */
284 static int queued_zombie_thread_dead(void *data)
285 {
286         struct thread_worker_pair *pair = data;
287
288         ao2_unlink(pair->pool->zombie_threads, pair->worker);
289         threadpool_send_state_changed(pair->pool);
290
291         thread_worker_pair_free(pair);
292         return 0;
293 }
294
295 /*!
296  * \brief Queue a task to kill a zombie thread
297  *
298  * This is called by a worker thread when it acknowledges that it is time for
299  * it to die.
300  */
301 static void threadpool_zombie_thread_dead(struct ast_threadpool *pool,
302                 struct worker_thread *worker)
303 {
304         struct thread_worker_pair *pair;
305         SCOPED_AO2LOCK(lock, pool);
306
307         if (pool->shutting_down) {
308                 return;
309         }
310
311         pair = thread_worker_pair_alloc(pool, worker);
312         if (!pair) {
313                 return;
314         }
315
316         if (ast_taskprocessor_push(pool->control_tps, queued_zombie_thread_dead, pair)) {
317                 thread_worker_pair_free(pair);
318         }
319 }
320
321 static int queued_idle_thread_dead(void *data)
322 {
323         struct thread_worker_pair *pair = data;
324
325         ao2_unlink(pair->pool->idle_threads, pair->worker);
326         threadpool_send_state_changed(pair->pool);
327
328         thread_worker_pair_free(pair);
329         return 0;
330 }
331
332 static void threadpool_idle_thread_dead(struct ast_threadpool *pool,
333                 struct worker_thread *worker)
334 {
335         struct thread_worker_pair *pair;
336         SCOPED_AO2LOCK(lock, pool);
337
338         if (pool->shutting_down) {
339                 return;
340         }
341
342         pair = thread_worker_pair_alloc(pool, worker);
343         if (!pair) {
344                 return;
345         }
346
347         if (ast_taskprocessor_push(pool->control_tps, queued_idle_thread_dead, pair)) {
348                 thread_worker_pair_free(pair);
349         }
350 }
351
352 /*!
353  * \brief Execute a task in the threadpool
354  *
355  * This is the function that worker threads call in order to execute tasks
356  * in the threadpool
357  *
358  * \param pool The pool to which the tasks belong.
359  * \retval 0 Either the pool has been shut down or there are no tasks.
360  * \retval 1 There are still tasks remaining in the pool.
361  */
362 static int threadpool_execute(struct ast_threadpool *pool)
363 {
364         ao2_lock(pool);
365         if (!pool->shutting_down) {
366                 ao2_unlock(pool);
367                 return ast_taskprocessor_execute(pool->tps);
368         }
369         ao2_unlock(pool);
370         return 0;
371 }
372
373 /*!
374  * \brief Destroy a threadpool's components.
375  *
376  * This is the destructor called automatically when the threadpool's
377  * reference count reaches zero. This is not to be confused with
378  * threadpool_destroy.
379  *
380  * By the time this actually gets called, most of the cleanup has already
381  * been done in the pool. The only thing left to do is to release the
382  * final reference to the threadpool listener.
383  *
384  * \param obj The pool to destroy
385  */
386 static void threadpool_destructor(void *obj)
387 {
388         struct ast_threadpool *pool = obj;
389         ao2_cleanup(pool->listener);
390 }
391
392 /*
393  * \brief Allocate a threadpool
394  *
395  * This is implemented as a taskprocessor listener's alloc callback. This
396  * is because the threadpool exists as the private data on a taskprocessor
397  * listener.
398  *
399  * \param name The name of the threadpool.
400  * \param options The options the threadpool uses.
401  * \retval NULL Could not initialize threadpool properly
402  * \retval non-NULL The newly-allocated threadpool
403  */
404 static struct ast_threadpool *threadpool_alloc(const char *name, const struct ast_threadpool_options *options)
405 {
406         RAII_VAR(struct ast_threadpool *, pool, NULL, ao2_cleanup);
407         struct ast_str *control_tps_name;
408
409         pool = ao2_alloc(sizeof(*pool), threadpool_destructor);
410         control_tps_name = ast_str_create(64);
411         if (!pool || !control_tps_name) {
412                 ast_free(control_tps_name);
413                 return NULL;
414         }
415
416         ast_str_set(&control_tps_name, 0, "%s-control", name);
417
418         pool->control_tps = ast_taskprocessor_get(ast_str_buffer(control_tps_name), TPS_REF_DEFAULT);
419         ast_free(control_tps_name);
420         if (!pool->control_tps) {
421                 return NULL;
422         }
423         pool->active_threads = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
424                 THREAD_BUCKETS, worker_thread_hash, NULL, worker_thread_cmp);
425         if (!pool->active_threads) {
426                 return NULL;
427         }
428         pool->idle_threads = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
429                 THREAD_BUCKETS, worker_thread_hash, NULL, worker_thread_cmp);
430         if (!pool->idle_threads) {
431                 return NULL;
432         }
433         pool->zombie_threads = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
434                 THREAD_BUCKETS, worker_thread_hash, NULL, worker_thread_cmp);
435         if (!pool->zombie_threads) {
436                 return NULL;
437         }
438         pool->options = *options;
439
440         ao2_ref(pool, +1);
441         return pool;
442 }
443
444 static int threadpool_tps_start(struct ast_taskprocessor_listener *listener)
445 {
446         return 0;
447 }
448
449 /*!
450  * \brief helper used for queued task when tasks are pushed
451  */
452 struct task_pushed_data {
453         /*! Pool into which a task was pushed */
454         struct ast_threadpool *pool;
455         /*! Indicator of whether the pool had no tasks prior to the new task being added */
456         int was_empty;
457 };
458
459 /*!
460  * \brief Allocate and initialize a task_pushed_data
461  * \param pool The threadpool to set in the task_pushed_data
462  * \param was_empty The was_empty value to set in the task_pushed_data
463  * \retval NULL Unable to allocate task_pushed_data
464  * \retval non-NULL The newly-allocated task_pushed_data
465  */
466 static struct task_pushed_data *task_pushed_data_alloc(struct ast_threadpool *pool,
467                 int was_empty)
468 {
469         struct task_pushed_data *tpd = ast_malloc(sizeof(*tpd));
470
471         if (!tpd) {
472                 return NULL;
473         }
474         tpd->pool = pool;
475         tpd->was_empty = was_empty;
476         return tpd;
477 }
478
479 /*!
480  * \brief Activate idle threads
481  *
482  * This function always returns CMP_MATCH because all workers that this
483  * function acts on need to be seen as matches so they are unlinked from the
484  * list of idle threads.
485  *
486  * Called as an ao2_callback in the threadpool's control taskprocessor thread.
487  * \param obj The worker to activate
488  * \param arg The pool where the worker belongs
489  * \retval CMP_MATCH
490  */
491 static int activate_thread(void *obj, void *arg, int flags)
492 {
493         struct worker_thread *worker = obj;
494         struct ast_threadpool *pool = arg;
495
496         if (!ao2_link(pool->active_threads, worker)) {
497                 /* If we can't link the idle thread into the active container, then
498                  * we'll just leave the thread idle and not wake it up.
499                  */
500                 ast_log(LOG_WARNING, "Failed to activate thread %d. Remaining idle\n",
501                                 worker->id);
502                 return 0;
503         }
504
505         if (worker_set_state(worker, ALIVE)) {
506                 ast_debug(1, "Failed to activate thread %d. It is dead\n",
507                                 worker->id);
508                 /* The worker thread will no longer exist in the active threads or
509                  * idle threads container after this.
510                  */
511                 ao2_unlink(pool->active_threads, worker);
512         }
513
514         return CMP_MATCH;
515 }
516
517 /*!
518  * \brief Add threads to the threadpool
519  *
520  * This function is called from the threadpool's control taskprocessor thread.
521  * \param pool The pool that is expanding
522  * \delta The number of threads to add to the pool
523  */
524 static void grow(struct ast_threadpool *pool, int delta)
525 {
526         int i;
527
528         int current_size = ao2_container_count(pool->active_threads) +
529                 ao2_container_count(pool->idle_threads);
530
531         if (pool->options.max_size && current_size + delta > pool->options.max_size) {
532                 delta = pool->options.max_size - current_size;
533         }
534
535         ast_debug(3, "Increasing threadpool %s's size by %d\n",
536                         ast_taskprocessor_name(pool->tps), delta);
537
538         for (i = 0; i < delta; ++i) {
539                 struct worker_thread *worker = worker_thread_alloc(pool);
540                 if (!worker) {
541                         return;
542                 }
543                 if (ao2_link(pool->idle_threads, worker)) {
544                         if (worker_thread_start(worker)) {
545                                 ast_log(LOG_ERROR, "Unable to start worker thread %d. Destroying.\n", worker->id);
546                                 ao2_unlink(pool->active_threads, worker);
547                         }
548                 } else {
549                         ast_log(LOG_WARNING, "Failed to activate worker thread %d. Destroying.\n", worker->id);
550                 }
551                 ao2_ref(worker, -1);
552         }
553 }
554
555 /*!
556  * \brief Queued task called when tasks are pushed into the threadpool
557  *
558  * This function first calls into the threadpool's listener to let it know
559  * that a task has been pushed. It then wakes up all idle threads and moves
560  * them into the active thread container.
561  * \param data A task_pushed_data
562  * \return 0
563  */
564 static int queued_task_pushed(void *data)
565 {
566         struct task_pushed_data *tpd = data;
567         struct ast_threadpool *pool = tpd->pool;
568         int was_empty = tpd->was_empty;
569         unsigned int existing_active;
570
571         ast_free(tpd);
572
573         if (pool->listener && pool->listener->callbacks->task_pushed) {
574                 pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty);
575         }
576
577         existing_active = ao2_container_count(pool->active_threads);
578
579         /* The first pass transitions any existing idle threads to be active, and
580          * will also remove any worker threads that have recently entered the dead
581          * state.
582          */
583         ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
584                         activate_thread, pool);
585
586         /* If no idle threads could be transitioned to active grow the pool as permitted. */
587         if (ao2_container_count(pool->active_threads) == existing_active) {
588                 if (!pool->options.auto_increment) {
589                         return 0;
590                 }
591                 grow(pool, pool->options.auto_increment);
592                 /* An optional second pass transitions any newly added threads. */
593                 ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
594                                 activate_thread, pool);
595         }
596
597         threadpool_send_state_changed(pool);
598         return 0;
599 }
600
601 /*!
602  * \brief Taskprocessor listener callback called when a task is added
603  *
604  * The threadpool uses this opportunity to queue a task on its control taskprocessor
605  * in order to activate idle threads and notify the threadpool listener that the
606  * task has been pushed.
607  * \param listener The taskprocessor listener. The threadpool is the listener's private data
608  * \param was_empty True if the taskprocessor was empty prior to the task being pushed
609  */
610 static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener,
611                 int was_empty)
612 {
613         struct ast_threadpool *pool = ast_taskprocessor_listener_get_user_data(listener);
614         struct task_pushed_data *tpd;
615         SCOPED_AO2LOCK(lock, pool);
616
617         if (pool->shutting_down) {
618                 return;
619         }
620
621         tpd = task_pushed_data_alloc(pool, was_empty);
622         if (!tpd) {
623                 return;
624         }
625
626         if (ast_taskprocessor_push(pool->control_tps, queued_task_pushed, tpd)) {
627                 ast_free(tpd);
628         }
629 }
630
631 /*!
632  * \brief Queued task that handles the case where the threadpool's taskprocessor is emptied
633  *
634  * This simply lets the threadpool's listener know that the threadpool is devoid of tasks
635  * \param data The pool that has become empty
636  * \return 0
637  */
638 static int queued_emptied(void *data)
639 {
640         struct ast_threadpool *pool = data;
641
642         /* We already checked for existence of this callback when this was queued */
643         pool->listener->callbacks->emptied(pool, pool->listener);
644         return 0;
645 }
646
647 /*!
648  * \brief Taskprocessor listener emptied callback
649  *
650  * The threadpool queues a task to let the threadpool listener know that
651  * the threadpool no longer contains any tasks.
652  * \param listener The taskprocessor listener. The threadpool is the listener's private data.
653  */
654 static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener)
655 {
656         struct ast_threadpool *pool = ast_taskprocessor_listener_get_user_data(listener);
657         SCOPED_AO2LOCK(lock, pool);
658
659         if (pool->shutting_down) {
660                 return;
661         }
662
663         if (pool->listener && pool->listener->callbacks->emptied) {
664                 if (ast_taskprocessor_push(pool->control_tps, queued_emptied, pool)) {
665                         /* Nothing to do here but we need the check to keep the compiler happy. */
666                 }
667         }
668 }
669
670 /*!
671  * \brief Taskprocessor listener shutdown callback
672  *
673  * The threadpool will shut down and destroy all of its worker threads when
674  * this is called back. By the time this gets called, the taskprocessor's
675  * control taskprocessor has already been destroyed. Therefore there is no risk
676  * in outright destroying the worker threads here.
677  * \param listener The taskprocessor listener. The threadpool is the listener's private data.
678  */
679 static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener)
680 {
681         struct ast_threadpool *pool = ast_taskprocessor_listener_get_user_data(listener);
682
683         if (pool->listener && pool->listener->callbacks->shutdown) {
684                 pool->listener->callbacks->shutdown(pool->listener);
685         }
686         ao2_cleanup(pool->active_threads);
687         ao2_cleanup(pool->idle_threads);
688         ao2_cleanup(pool->zombie_threads);
689         ao2_cleanup(pool);
690 }
691
692 /*!
693  * \brief Table of taskprocessor listener callbacks for threadpool's main taskprocessor
694  */
695 static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks = {
696         .start = threadpool_tps_start,
697         .task_pushed = threadpool_tps_task_pushed,
698         .emptied = threadpool_tps_emptied,
699         .shutdown = threadpool_tps_shutdown,
700 };
701
702 /*!
703  * \brief ao2 callback to kill a set number of threads.
704  *
705  * Threads will be unlinked from the container as long as the
706  * counter has not reached zero. The counter is decremented with
707  * each thread that is removed.
708  * \param obj The worker thread up for possible destruction
709  * \param arg The counter
710  * \param flags Unused
711  * \retval CMP_MATCH The counter has not reached zero, so this flag should be removed.
712  * \retval CMP_STOP The counter has reached zero so no more threads should be removed.
713  */
714 static int kill_threads(void *obj, void *arg, int flags)
715 {
716         int *num_to_kill = arg;
717
718         if (*num_to_kill > 0) {
719                 --(*num_to_kill);
720                 return CMP_MATCH;
721         } else {
722                 return CMP_STOP;
723         }
724 }
725
726 /*!
727  * \brief ao2 callback to zombify a set number of threads.
728  *
729  * Threads will be zombified as long as the counter has not reached
730  * zero. The counter is decremented with each thread that is zombified.
731  *
732  * Zombifying a thread involves removing it from its current container,
733  * adding it to the zombie container, and changing the state of the
734  * worker to a zombie
735  *
736  * This callback is called from the threadpool control taskprocessor thread.
737  *
738  * \param obj The worker thread that may be zombified
739  * \param arg The pool to which the worker belongs
740  * \param data The counter
741  * \param flags Unused
742  * \retval CMP_MATCH The zombified thread should be removed from its current container
743  * \retval CMP_STOP Stop attempting to zombify threads
744  */
745 static int zombify_threads(void *obj, void *arg, void *data, int flags)
746 {
747         struct worker_thread *worker = obj;
748         struct ast_threadpool *pool = arg;
749         int *num_to_zombify = data;
750
751         if ((*num_to_zombify)-- > 0) {
752                 if (!ao2_link(pool->zombie_threads, worker)) {
753                         ast_log(LOG_WARNING, "Failed to zombify active thread %d. Thread will remain active\n", worker->id);
754                         return 0;
755                 }
756                 worker_set_state(worker, ZOMBIE);
757                 return CMP_MATCH;
758         } else {
759                 return CMP_STOP;
760         }
761 }
762
763 /*!
764  * \brief Remove threads from the threadpool
765  *
766  * The preference is to kill idle threads. However, if there are
767  * more threads to remove than there are idle threads, then active
768  * threads will be zombified instead.
769  *
770  * This function is called from the threadpool control taskprocessor thread.
771  *
772  * \param pool The threadpool to remove threads from
773  * \param delta The number of threads to remove
774  */
775 static void shrink(struct ast_threadpool *pool, int delta)
776 {
777         /*
778          * Preference is to kill idle threads, but
779          * we'll move on to deactivating active threads
780          * if we have to
781          */
782         int idle_threads = ao2_container_count(pool->idle_threads);
783         int idle_threads_to_kill = MIN(delta, idle_threads);
784         int active_threads_to_zombify = delta - idle_threads_to_kill;
785
786         ast_debug(3, "Destroying %d idle threads in threadpool %s\n", idle_threads_to_kill,
787                         ast_taskprocessor_name(pool->tps));
788
789         ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
790                         kill_threads, &idle_threads_to_kill);
791
792         ast_debug(3, "Destroying %d active threads in threadpool %s\n", active_threads_to_zombify,
793                         ast_taskprocessor_name(pool->tps));
794
795         ao2_callback_data(pool->active_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
796                         zombify_threads, pool, &active_threads_to_zombify);
797 }
798
799 /*!
800  * \brief Helper struct used for queued operations that change the size of the threadpool
801  */
802 struct set_size_data {
803         /*! The pool whose size is to change */
804         struct ast_threadpool *pool;
805         /*! The requested new size of the pool */
806         unsigned int size;
807 };
808
809 /*!
810  * \brief Allocate and initialize a set_size_data
811  * \param pool The pool for the set_size_data
812  * \param size The size to store in the set_size_data
813  */
814 static struct set_size_data *set_size_data_alloc(struct ast_threadpool *pool,
815                 unsigned int size)
816 {
817         struct set_size_data *ssd = ast_malloc(sizeof(*ssd));
818         if (!ssd) {
819                 return NULL;
820         }
821
822         ssd->pool = pool;
823         ssd->size = size;
824         return ssd;
825 }
826
827 /*!
828  * \brief Change the size of the threadpool
829  *
830  * This can either result in shrinking or growing the threadpool depending
831  * on the new desired size and the current size.
832  *
833  * This function is run from the threadpool control taskprocessor thread
834  *
835  * \param data A set_size_data used for determining how to act
836  * \return 0
837  */
838 static int queued_set_size(void *data)
839 {
840         struct set_size_data *ssd = data;
841         struct ast_threadpool *pool = ssd->pool;
842         unsigned int num_threads = ssd->size;
843
844         /* We don't count zombie threads as being "live" when potentially resizing */
845         unsigned int current_size = ao2_container_count(pool->active_threads) +
846                         ao2_container_count(pool->idle_threads);
847
848         ast_free(ssd);
849
850         if (current_size == num_threads) {
851                 ast_debug(3, "Not changing threadpool size since new size %u is the same as current %u\n",
852                           num_threads, current_size);
853                 return 0;
854         }
855
856         if (current_size < num_threads) {
857                 ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
858                                 activate_thread, pool);
859
860                 /* As the above may have altered the number of current threads update it */
861                 current_size = ao2_container_count(pool->active_threads) +
862                                 ao2_container_count(pool->idle_threads);
863                 grow(pool, num_threads - current_size);
864                 ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
865                                 activate_thread, pool);
866         } else {
867                 shrink(pool, current_size - num_threads);
868         }
869
870         threadpool_send_state_changed(pool);
871         return 0;
872 }
873
874 void ast_threadpool_set_size(struct ast_threadpool *pool, unsigned int size)
875 {
876         struct set_size_data *ssd;
877         SCOPED_AO2LOCK(lock, pool);
878
879         if (pool->shutting_down) {
880                 return;
881         }
882
883         ssd = set_size_data_alloc(pool, size);
884         if (!ssd) {
885                 return;
886         }
887
888         if (ast_taskprocessor_push(pool->control_tps, queued_set_size, ssd)) {
889                 ast_free(ssd);
890         }
891 }
892
893 struct ast_threadpool_listener *ast_threadpool_listener_alloc(
894                 const struct ast_threadpool_listener_callbacks *callbacks, void *user_data)
895 {
896         struct ast_threadpool_listener *listener = ao2_alloc(sizeof(*listener), NULL);
897         if (!listener) {
898                 return NULL;
899         }
900         listener->callbacks = callbacks;
901         listener->user_data = user_data;
902         return listener;
903 }
904
905 void *ast_threadpool_listener_get_user_data(const struct ast_threadpool_listener *listener)
906 {
907         return listener->user_data;
908 }
909
910 struct pool_options_pair {
911         struct ast_threadpool *pool;
912         struct ast_threadpool_options options;
913 };
914
915 struct ast_threadpool *ast_threadpool_create(const char *name,
916                 struct ast_threadpool_listener *listener,
917                 const struct ast_threadpool_options *options)
918 {
919         struct ast_taskprocessor *tps;
920         RAII_VAR(struct ast_taskprocessor_listener *, tps_listener, NULL, ao2_cleanup);
921         RAII_VAR(struct ast_threadpool *, pool, NULL, ao2_cleanup);
922
923         pool = threadpool_alloc(name, options);
924         if (!pool) {
925                 return NULL;
926         }
927
928         tps_listener = ast_taskprocessor_listener_alloc(&threadpool_tps_listener_callbacks, pool);
929         if (!tps_listener) {
930                 return NULL;
931         }
932
933         if (options->version != AST_THREADPOOL_OPTIONS_VERSION) {
934                 ast_log(LOG_WARNING, "Incompatible version of threadpool options in use.\n");
935                 return NULL;
936         }
937
938         tps = ast_taskprocessor_create_with_listener(name, tps_listener);
939         if (!tps) {
940                 return NULL;
941         }
942
943         pool->tps = tps;
944         if (listener) {
945                 ao2_ref(listener, +1);
946                 pool->listener = listener;
947         }
948         ast_threadpool_set_size(pool, pool->options.initial_size);
949         ao2_ref(pool, +1);
950         return pool;
951 }
952
953 int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data)
954 {
955         SCOPED_AO2LOCK(lock, pool);
956         if (!pool->shutting_down) {
957                 return ast_taskprocessor_push(pool->tps, task, data);
958         }
959         return -1;
960 }
961
962 void ast_threadpool_shutdown(struct ast_threadpool *pool)
963 {
964         if (!pool) {
965                 return;
966         }
967         /* Shut down the taskprocessors and everything else just
968          * takes care of itself via the taskprocessor callbacks
969          */
970         ao2_lock(pool);
971         pool->shutting_down = 1;
972         ao2_unlock(pool);
973         ast_taskprocessor_unreference(pool->control_tps);
974         ast_taskprocessor_unreference(pool->tps);
975 }
976
977 /*!
978  * A monotonically increasing integer used for worker
979  * thread identification.
980  */
981 static int worker_id_counter;
982
983 static int worker_thread_hash(const void *obj, int flags)
984 {
985         const struct worker_thread *worker = obj;
986
987         return worker->id;
988 }
989
990 static int worker_thread_cmp(void *obj, void *arg, int flags)
991 {
992         struct worker_thread *worker1 = obj;
993         struct worker_thread *worker2 = arg;
994
995         return worker1->id == worker2->id ? CMP_MATCH : 0;
996 }
997
998 /*!
999  * \brief shut a worker thread down
1000  *
1001  * Set the worker dead and then wait for its thread
1002  * to finish executing.
1003  *
1004  * \param worker The worker thread to shut down
1005  */
1006 static void worker_shutdown(struct worker_thread *worker)
1007 {
1008         worker_set_state(worker, DEAD);
1009         if (worker->thread != AST_PTHREADT_NULL) {
1010                 pthread_join(worker->thread, NULL);
1011                 worker->thread = AST_PTHREADT_NULL;
1012         }
1013 }
1014
1015 /*!
1016  * \brief Worker thread destructor
1017  *
1018  * Called automatically when refcount reaches 0. Shuts
1019  * down the worker thread and destroys its component
1020  * parts
1021  */
1022 static void worker_thread_destroy(void *obj)
1023 {
1024         struct worker_thread *worker = obj;
1025         ast_debug(3, "Destroying worker thread %d\n", worker->id);
1026         worker_shutdown(worker);
1027         ast_mutex_destroy(&worker->lock);
1028         ast_cond_destroy(&worker->cond);
1029 }
1030
1031 /*!
1032  * \brief start point for worker threads
1033  *
1034  * Worker threads start in the active state but may
1035  * immediately go idle if there is no work to be
1036  * done
1037  *
1038  * \param arg The worker thread
1039  * \retval NULL
1040  */
1041 static void *worker_start(void *arg)
1042 {
1043         struct worker_thread *worker = arg;
1044         enum worker_state saved_state;
1045
1046         if (worker->options.thread_start) {
1047                 worker->options.thread_start();
1048         }
1049
1050         ast_mutex_lock(&worker->lock);
1051         while (worker_idle(worker)) {
1052                 ast_mutex_unlock(&worker->lock);
1053                 worker_active(worker);
1054                 ast_mutex_lock(&worker->lock);
1055                 if (worker->state != ALIVE) {
1056                         break;
1057                 }
1058                 threadpool_active_thread_idle(worker->pool, worker);
1059         }
1060         saved_state = worker->state;
1061         ast_mutex_unlock(&worker->lock);
1062
1063         /* Reaching this portion means the thread is
1064          * on death's door. It may have been killed while
1065          * it was idle, in which case it can just die
1066          * peacefully. If it's a zombie, though, then
1067          * it needs to let the pool know so
1068          * that the thread can be removed from the
1069          * list of zombie threads.
1070          */
1071         if (saved_state == ZOMBIE) {
1072                 threadpool_zombie_thread_dead(worker->pool, worker);
1073         }
1074
1075         if (worker->options.thread_end) {
1076                 worker->options.thread_end();
1077         }
1078         return NULL;
1079 }
1080
1081 /*!
1082  * \brief Allocate and initialize a new worker thread
1083  *
1084  * This will create, initialize, and start the thread.
1085  *
1086  * \param pool The threadpool to which the worker will be added
1087  * \retval NULL Failed to allocate or start the worker thread
1088  * \retval non-NULL The newly-created worker thread
1089  */
1090 static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool)
1091 {
1092         struct worker_thread *worker = ao2_alloc(sizeof(*worker), worker_thread_destroy);
1093         if (!worker) {
1094                 return NULL;
1095         }
1096         worker->id = ast_atomic_fetchadd_int(&worker_id_counter, 1);
1097         ast_mutex_init(&worker->lock);
1098         ast_cond_init(&worker->cond, NULL);
1099         worker->pool = pool;
1100         worker->thread = AST_PTHREADT_NULL;
1101         worker->state = ALIVE;
1102         worker->options = pool->options;
1103         return worker;
1104 }
1105
1106 static int worker_thread_start(struct worker_thread *worker)
1107 {
1108         return ast_pthread_create(&worker->thread, NULL, worker_start, worker);
1109 }
1110
1111 /*!
1112  * \brief Active loop for worker threads
1113  *
1114  * The worker will stay in this loop for its lifetime,
1115  * executing tasks as they become available. If there
1116  * are no tasks currently available, then the thread
1117  * will go idle.
1118  *
1119  * \param worker The worker thread executing tasks.
1120  */
1121 static void worker_active(struct worker_thread *worker)
1122 {
1123         int alive;
1124
1125         /* The following is equivalent to
1126          *
1127          * while (threadpool_execute(worker->pool));
1128          *
1129          * However, reviewers have suggested in the past
1130          * doing that can cause optimizers to (wrongly)
1131          * optimize the code away.
1132          */
1133         do {
1134                 alive = threadpool_execute(worker->pool);
1135         } while (alive);
1136 }
1137
1138 /*!
1139  * \brief Idle function for worker threads
1140  *
1141  * The worker waits here until it gets told by the threadpool
1142  * to wake up.
1143  *
1144  * worker is locked before entering this function.
1145  *
1146  * \param worker The idle worker
1147  * \retval 0 The thread is being woken up so that it can conclude.
1148  * \retval non-zero The thread is being woken up to do more work.
1149  */
1150 static int worker_idle(struct worker_thread *worker)
1151 {
1152         struct timeval start = ast_tvnow();
1153         struct timespec end = {
1154                 .tv_sec = start.tv_sec + worker->options.idle_timeout,
1155                 .tv_nsec = start.tv_usec * 1000,
1156         };
1157         while (!worker->wake_up) {
1158                 if (worker->options.idle_timeout <= 0) {
1159                         ast_cond_wait(&worker->cond, &worker->lock);
1160                 } else if (ast_cond_timedwait(&worker->cond, &worker->lock, &end) == ETIMEDOUT) {
1161                         break;
1162                 }
1163         }
1164
1165         if (!worker->wake_up) {
1166                 ast_debug(1, "Worker thread idle timeout reached. Dying.\n");
1167                 threadpool_idle_thread_dead(worker->pool, worker);
1168                 worker->state = DEAD;
1169         }
1170         worker->wake_up = 0;
1171         return worker->state == ALIVE;
1172 }
1173
1174 /*!
1175  * \brief Change a worker's state
1176  *
1177  * The threadpool calls into this function in order to let a worker know
1178  * how it should proceed.
1179  *
1180  * \retval -1 failure (state transition not permitted)
1181  * \retval 0 success
1182  */
1183 static int worker_set_state(struct worker_thread *worker, enum worker_state state)
1184 {
1185         SCOPED_MUTEX(lock, &worker->lock);
1186
1187         switch (state) {
1188         case ALIVE:
1189                 /* This can occur due to a race condition between being told to go active
1190                  * and an idle timeout happening.
1191                  */
1192                 if (worker->state == DEAD) {
1193                         return -1;
1194                 }
1195                 ast_assert(worker->state != ZOMBIE);
1196                 break;
1197         case DEAD:
1198                 break;
1199         case ZOMBIE:
1200                 ast_assert(worker->state != DEAD);
1201                 break;
1202         }
1203
1204         worker->state = state;
1205         worker->wake_up = 1;
1206         ast_cond_signal(&worker->cond);
1207
1208         return 0;
1209 }
1210
1211 /*! Serializer group shutdown control object. */
1212 struct ast_serializer_shutdown_group {
1213         /*! Shutdown thread waits on this conditional. */
1214         ast_cond_t cond;
1215         /*! Count of serializers needing to shutdown. */
1216         int count;
1217 };
1218
1219 static void serializer_shutdown_group_dtor(void *vdoomed)
1220 {
1221         struct ast_serializer_shutdown_group *doomed = vdoomed;
1222
1223         ast_cond_destroy(&doomed->cond);
1224 }
1225
1226 struct ast_serializer_shutdown_group *ast_serializer_shutdown_group_alloc(void)
1227 {
1228         struct ast_serializer_shutdown_group *shutdown_group;
1229
1230         shutdown_group = ao2_alloc(sizeof(*shutdown_group), serializer_shutdown_group_dtor);
1231         if (!shutdown_group) {
1232                 return NULL;
1233         }
1234         ast_cond_init(&shutdown_group->cond, NULL);
1235         return shutdown_group;
1236 }
1237
1238 int ast_serializer_shutdown_group_join(struct ast_serializer_shutdown_group *shutdown_group, int timeout)
1239 {
1240         int remaining;
1241         ast_mutex_t *lock;
1242
1243         if (!shutdown_group) {
1244                 return 0;
1245         }
1246
1247         lock = ao2_object_get_lockaddr(shutdown_group);
1248         ast_assert(lock != NULL);
1249
1250         ao2_lock(shutdown_group);
1251         if (timeout) {
1252                 struct timeval start;
1253                 struct timespec end;
1254
1255                 start = ast_tvnow();
1256                 end.tv_sec = start.tv_sec + timeout;
1257                 end.tv_nsec = start.tv_usec * 1000;
1258                 while (shutdown_group->count) {
1259                         if (ast_cond_timedwait(&shutdown_group->cond, lock, &end)) {
1260                                 /* Error or timed out waiting for the count to reach zero. */
1261                                 break;
1262                         }
1263                 }
1264         } else {
1265                 while (shutdown_group->count) {
1266                         if (ast_cond_wait(&shutdown_group->cond, lock)) {
1267                                 /* Error */
1268                                 break;
1269                         }
1270                 }
1271         }
1272         remaining = shutdown_group->count;
1273         ao2_unlock(shutdown_group);
1274         return remaining;
1275 }
1276
1277 /*!
1278  * \internal
1279  * \brief Increment the number of serializer members in the group.
1280  * \since 13.5.0
1281  *
1282  * \param shutdown_group Group shutdown controller.
1283  *
1284  * \return Nothing
1285  */
1286 static void serializer_shutdown_group_inc(struct ast_serializer_shutdown_group *shutdown_group)
1287 {
1288         ao2_lock(shutdown_group);
1289         ++shutdown_group->count;
1290         ao2_unlock(shutdown_group);
1291 }
1292
1293 /*!
1294  * \internal
1295  * \brief Decrement the number of serializer members in the group.
1296  * \since 13.5.0
1297  *
1298  * \param shutdown_group Group shutdown controller.
1299  *
1300  * \return Nothing
1301  */
1302 static void serializer_shutdown_group_dec(struct ast_serializer_shutdown_group *shutdown_group)
1303 {
1304         ao2_lock(shutdown_group);
1305         --shutdown_group->count;
1306         if (!shutdown_group->count) {
1307                 ast_cond_signal(&shutdown_group->cond);
1308         }
1309         ao2_unlock(shutdown_group);
1310 }
1311
1312 struct serializer {
1313         /*! Threadpool the serializer will use to process the jobs. */
1314         struct ast_threadpool *pool;
1315         /*! Which group will wait for this serializer to shutdown. */
1316         struct ast_serializer_shutdown_group *shutdown_group;
1317 };
1318
1319 static void serializer_dtor(void *obj)
1320 {
1321         struct serializer *ser = obj;
1322
1323         ao2_cleanup(ser->pool);
1324         ser->pool = NULL;
1325         ao2_cleanup(ser->shutdown_group);
1326         ser->shutdown_group = NULL;
1327 }
1328
1329 static struct serializer *serializer_create(struct ast_threadpool *pool,
1330         struct ast_serializer_shutdown_group *shutdown_group)
1331 {
1332         struct serializer *ser;
1333
1334         ser = ao2_alloc_options(sizeof(*ser), serializer_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
1335         if (!ser) {
1336                 return NULL;
1337         }
1338         ao2_ref(pool, +1);
1339         ser->pool = pool;
1340         ser->shutdown_group = ao2_bump(shutdown_group);
1341         return ser;
1342 }
1343
1344 AST_THREADSTORAGE_RAW(current_serializer);
1345
1346 static int execute_tasks(void *data)
1347 {
1348         struct ast_taskprocessor *tps = data;
1349
1350         ast_threadstorage_set_ptr(&current_serializer, tps);
1351         while (ast_taskprocessor_execute(tps)) {
1352                 /* No-op */
1353         }
1354         ast_threadstorage_set_ptr(&current_serializer, NULL);
1355
1356         ast_taskprocessor_unreference(tps);
1357         return 0;
1358 }
1359
1360 static void serializer_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
1361 {
1362         if (was_empty) {
1363                 struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener);
1364                 struct ast_taskprocessor *tps = ast_taskprocessor_listener_get_tps(listener);
1365
1366                 if (ast_threadpool_push(ser->pool, execute_tasks, tps)) {
1367                         ast_taskprocessor_unreference(tps);
1368                 }
1369         }
1370 }
1371
1372 static int serializer_start(struct ast_taskprocessor_listener *listener)
1373 {
1374         /* No-op */
1375         return 0;
1376 }
1377
1378 static void serializer_shutdown(struct ast_taskprocessor_listener *listener)
1379 {
1380         struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener);
1381
1382         if (ser->shutdown_group) {
1383                 serializer_shutdown_group_dec(ser->shutdown_group);
1384         }
1385         ao2_cleanup(ser);
1386 }
1387
1388 static struct ast_taskprocessor_listener_callbacks serializer_tps_listener_callbacks = {
1389         .task_pushed = serializer_task_pushed,
1390         .start = serializer_start,
1391         .shutdown = serializer_shutdown,
1392 };
1393
1394 struct ast_taskprocessor *ast_threadpool_serializer_get_current(void)
1395 {
1396         return ast_threadstorage_get_ptr(&current_serializer);
1397 }
1398
1399 struct ast_taskprocessor *ast_threadpool_serializer_group(const char *name,
1400         struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
1401 {
1402         struct serializer *ser;
1403         struct ast_taskprocessor_listener *listener;
1404         struct ast_taskprocessor *tps;
1405
1406         ser = serializer_create(pool, shutdown_group);
1407         if (!ser) {
1408                 return NULL;
1409         }
1410
1411         listener = ast_taskprocessor_listener_alloc(&serializer_tps_listener_callbacks, ser);
1412         if (!listener) {
1413                 ao2_ref(ser, -1);
1414                 return NULL;
1415         }
1416
1417         tps = ast_taskprocessor_create_with_listener(name, listener);
1418         if (!tps) {
1419                 /* ser ref transferred to listener but not cleaned without tps */
1420                 ao2_ref(ser, -1);
1421         } else if (shutdown_group) {
1422                 serializer_shutdown_group_inc(shutdown_group);
1423         }
1424
1425         ao2_ref(listener, -1);
1426         return tps;
1427 }
1428
1429 struct ast_taskprocessor *ast_threadpool_serializer(const char *name, struct ast_threadpool *pool)
1430 {
1431         return ast_threadpool_serializer_group(name, pool, NULL);
1432 }
1433
1434 long ast_threadpool_queue_size(struct ast_threadpool *pool)
1435 {
1436         return ast_taskprocessor_size(pool->tps);
1437 }