BuildSystem: Remove unused variables.
[asterisk/asterisk.git] / main / threadpool.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2012-2013, Digium, Inc.
5  *
6  * Mark Michelson <mmmichelson@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19
20 #include "asterisk.h"
21
22 #include "asterisk/threadpool.h"
23 #include "asterisk/taskprocessor.h"
24 #include "asterisk/astobj2.h"
25 #include "asterisk/utils.h"
26
27 /* Needs to stay prime if increased */
28 #define THREAD_BUCKETS 89
29
30 /*!
31  * \brief An opaque threadpool structure
32  *
33  * A threadpool is a collection of threads that execute
34  * tasks from a common queue.
35  */
36 struct ast_threadpool {
37         /*! Threadpool listener */
38         struct ast_threadpool_listener *listener;
39         /*!
40          * \brief The container of active threads.
41          * Active threads are those that are currently running tasks
42          */
43         struct ao2_container *active_threads;
44         /*!
45          * \brief The container of idle threads.
46          * Idle threads are those that are currenly waiting to run tasks
47          */
48         struct ao2_container *idle_threads;
49         /*!
50          * \brief The container of zombie threads.
51          * Zombie threads may be running tasks, but they are scheduled to die soon
52          */
53         struct ao2_container *zombie_threads;
54         /*!
55          * \brief The main taskprocessor
56          *
57          * Tasks that are queued in this taskprocessor are
58          * doled out to the worker threads. Worker threads that
59          * execute tasks from the threadpool are executing tasks
60          * in this taskprocessor.
61          *
62          * The threadpool itself is actually the private data for
63          * this taskprocessor's listener. This way, as taskprocessor
64          * changes occur, the threadpool can alert its listeners
65          * appropriately.
66          */
67         struct ast_taskprocessor *tps;
68         /*!
69          * \brief The control taskprocessor
70          *
71          * This is a standard taskprocessor that uses the default
72          * taskprocessor listener. In other words, all tasks queued to
73          * this taskprocessor have a single thread that executes the
74          * tasks.
75          *
76          * All tasks that modify the state of the threadpool and all tasks
77          * that call out to threadpool listeners are pushed to this
78          * taskprocessor.
79          *
80          * For instance, when the threadpool changes sizes, a task is put
81          * into this taskprocessor to do so. When it comes time to tell the
82          * threadpool listener that worker threads have changed state,
83          * the task is placed in this taskprocessor.
84          *
85          * This is done for three main reasons
86          * 1) It ensures that listeners are given an accurate portrayal
87          * of the threadpool's current state. In other words, when a listener
88          * gets told a count of active, idle and zombie threads, it does not
89          * need to worry that internal state of the threadpool might be different
90          * from what it has been told.
91          * 2) It minimizes the locking required in both the threadpool and in
92          * threadpool listener's callbacks.
93          * 3) It ensures that listener callbacks are called in the same order
94          * that the threadpool had its state change.
95          */
96         struct ast_taskprocessor *control_tps;
97         /*! True if the threadpool is in the process of shutting down */
98         int shutting_down;
99         /*! Threadpool-specific options */
100         struct ast_threadpool_options options;
101 };
102
103 /*!
104  * \brief listener for a threadpool
105  *
106  * The listener is notified of changes in a threadpool. It can
107  * react by doing things like increasing the number of threads
108  * in the pool
109  */
110 struct ast_threadpool_listener {
111         /*! Callbacks called by the threadpool */
112         const struct ast_threadpool_listener_callbacks *callbacks;
113         /*! User data for the listener */
114         void *user_data;
115 };
116
117 /*!
118  * \brief states for worker threads
119  */
120 enum worker_state {
121         /*! The worker is either active or idle */
122         ALIVE,
123         /*!
124          * The worker has been asked to shut down but
125          * may still be in the process of executing tasks.
126          * This transition happens when the threadpool needs
127          * to shrink and needs to kill active threads in order
128          * to do so.
129          */
130         ZOMBIE,
131         /*!
132          * The worker has been asked to shut down. Typically
133          * only idle threads go to this state directly, but
134          * active threads may go straight to this state when
135          * the threadpool is shut down.
136          */
137         DEAD,
138 };
139
140 /*!
141  * A thread that executes threadpool tasks
142  */
143 struct worker_thread {
144         /*! A unique (within a run of Asterisk) ID for the thread. Used for hashing and searching */
145         int id;
146         /*! Condition used in conjunction with state changes */
147         ast_cond_t cond;
148         /*! Lock used alongside the condition for state changes */
149         ast_mutex_t lock;
150         /*! The actual thread that is executing tasks */
151         pthread_t thread;
152         /*! A pointer to the threadpool. Needed to be able to execute tasks */
153         struct ast_threadpool *pool;
154         /*! The current state of the worker thread */
155         enum worker_state state;
156         /*! A boolean used to determine if an idle thread should become active */
157         int wake_up;
158         /*! Options for this threadpool */
159         struct ast_threadpool_options options;
160 };
161
162 /* Worker thread forward declarations. See definitions for documentation */
163 static int worker_thread_hash(const void *obj, int flags);
164 static int worker_thread_cmp(void *obj, void *arg, int flags);
165 static void worker_thread_destroy(void *obj);
166 static void worker_active(struct worker_thread *worker);
167 static void *worker_start(void *arg);
168 static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool);
169 static int worker_thread_start(struct worker_thread *worker);
170 static int worker_idle(struct worker_thread *worker);
171 static int worker_set_state(struct worker_thread *worker, enum worker_state state);
172 static void worker_shutdown(struct worker_thread *worker);
173
174 /*!
175  * \brief Notify the threadpool listener that the state has changed.
176  *
177  * This notifies the threadpool listener via its state_changed callback.
178  * \param pool The threadpool whose state has changed
179  */
180 static void threadpool_send_state_changed(struct ast_threadpool *pool)
181 {
182         int active_size = ao2_container_count(pool->active_threads);
183         int idle_size = ao2_container_count(pool->idle_threads);
184
185         if (pool->listener && pool->listener->callbacks->state_changed) {
186                 pool->listener->callbacks->state_changed(pool, pool->listener, active_size, idle_size);
187         }
188 }
189
190 /*!
191  * \brief Struct used for queued operations involving worker state changes
192  */
193 struct thread_worker_pair {
194         /*! Threadpool that contains the worker whose state has changed */
195         struct ast_threadpool *pool;
196         /*! Worker whose state has changed */
197         struct worker_thread *worker;
198 };
199
200 /*!
201  * \brief Destructor for thread_worker_pair
202  */
203 static void thread_worker_pair_destructor(void *obj)
204 {
205         struct thread_worker_pair *pair = obj;
206         ao2_ref(pair->worker, -1);
207 }
208
209 /*!
210  * \brief Allocate and initialize a thread_worker_pair
211  * \param pool Threadpool to assign to the thread_worker_pair
212  * \param worker Worker thread to assign to the thread_worker_pair
213  */
214 static struct thread_worker_pair *thread_worker_pair_alloc(struct ast_threadpool *pool,
215                 struct worker_thread *worker)
216 {
217         struct thread_worker_pair *pair = ao2_alloc(sizeof(*pair), thread_worker_pair_destructor);
218         if (!pair) {
219                 return NULL;
220         }
221         pair->pool = pool;
222         ao2_ref(worker, +1);
223         pair->worker = worker;
224         return pair;
225 }
226
227 /*!
228  * \brief Move a worker thread from the active container to the idle container.
229  *
230  * This function is called from the threadpool's control taskprocessor thread.
231  * \param data A thread_worker_pair containing the threadpool and the worker to move.
232  * \return 0
233  */
234 static int queued_active_thread_idle(void *data)
235 {
236         struct thread_worker_pair *pair = data;
237
238         ao2_link(pair->pool->idle_threads, pair->worker);
239         ao2_unlink(pair->pool->active_threads, pair->worker);
240
241         threadpool_send_state_changed(pair->pool);
242
243         ao2_ref(pair, -1);
244         return 0;
245 }
246
247 /*!
248  * \brief Queue a task to move a thread from the active list to the idle list
249  *
250  * This is called by a worker thread when it runs out of tasks to perform and
251  * goes idle.
252  * \param pool The threadpool to which the worker belongs
253  * \param worker The worker thread that has gone idle
254  */
255 static void threadpool_active_thread_idle(struct ast_threadpool *pool,
256                 struct worker_thread *worker)
257 {
258         struct thread_worker_pair *pair;
259         SCOPED_AO2LOCK(lock, pool);
260         if (pool->shutting_down) {
261                 return;
262         }
263         pair = thread_worker_pair_alloc(pool, worker);
264         if (!pair) {
265                 return;
266         }
267         ast_taskprocessor_push(pool->control_tps, queued_active_thread_idle, pair);
268 }
269
270 /*!
271  * \brief Kill a zombie thread
272  *
273  * This runs from the threadpool's control taskprocessor thread.
274  *
275  * \param data A thread_worker_pair containing the threadpool and the zombie thread
276  * \return 0
277  */
278 static int queued_zombie_thread_dead(void *data)
279 {
280         struct thread_worker_pair *pair = data;
281
282         ao2_unlink(pair->pool->zombie_threads, pair->worker);
283         threadpool_send_state_changed(pair->pool);
284
285         ao2_ref(pair, -1);
286         return 0;
287 }
288
289 /*!
290  * \brief Queue a task to kill a zombie thread
291  *
292  * This is called by a worker thread when it acknowledges that it is time for
293  * it to die.
294  */
295 static void threadpool_zombie_thread_dead(struct ast_threadpool *pool,
296                 struct worker_thread *worker)
297 {
298         struct thread_worker_pair *pair;
299         SCOPED_AO2LOCK(lock, pool);
300         if (pool->shutting_down) {
301                 return;
302         }
303         pair = thread_worker_pair_alloc(pool, worker);
304         if (!pair) {
305                 return;
306         }
307         ast_taskprocessor_push(pool->control_tps, queued_zombie_thread_dead, pair);
308 }
309
310 static int queued_idle_thread_dead(void *data)
311 {
312         struct thread_worker_pair *pair = data;
313
314         ao2_unlink(pair->pool->idle_threads, pair->worker);
315         threadpool_send_state_changed(pair->pool);
316
317         ao2_ref(pair, -1);
318         return 0;
319 }
320
321 static void threadpool_idle_thread_dead(struct ast_threadpool *pool,
322                 struct worker_thread *worker)
323 {
324         struct thread_worker_pair *pair;
325         SCOPED_AO2LOCK(lock, pool);
326         if (pool->shutting_down) {
327                 return;
328         }
329         pair = thread_worker_pair_alloc(pool, worker);
330         if (!pair) {
331                 return;
332         }
333         ast_taskprocessor_push(pool->control_tps, queued_idle_thread_dead, pair);
334 }
335
336 /*!
337  * \brief Execute a task in the threadpool
338  *
339  * This is the function that worker threads call in order to execute tasks
340  * in the threadpool
341  *
342  * \param pool The pool to which the tasks belong.
343  * \retval 0 Either the pool has been shut down or there are no tasks.
344  * \retval 1 There are still tasks remaining in the pool.
345  */
346 static int threadpool_execute(struct ast_threadpool *pool)
347 {
348         ao2_lock(pool);
349         if (!pool->shutting_down) {
350                 ao2_unlock(pool);
351                 return ast_taskprocessor_execute(pool->tps);
352         }
353         ao2_unlock(pool);
354         return 0;
355 }
356
357 /*!
358  * \brief Destroy a threadpool's components.
359  *
360  * This is the destructor called automatically when the threadpool's
361  * reference count reaches zero. This is not to be confused with
362  * threadpool_destroy.
363  *
364  * By the time this actually gets called, most of the cleanup has already
365  * been done in the pool. The only thing left to do is to release the
366  * final reference to the threadpool listener.
367  *
368  * \param obj The pool to destroy
369  */
370 static void threadpool_destructor(void *obj)
371 {
372         struct ast_threadpool *pool = obj;
373         ao2_cleanup(pool->listener);
374 }
375
376 /*
377  * \brief Allocate a threadpool
378  *
379  * This is implemented as a taskprocessor listener's alloc callback. This
380  * is because the threadpool exists as the private data on a taskprocessor
381  * listener.
382  *
383  * \param name The name of the threadpool.
384  * \param options The options the threadpool uses.
385  * \retval NULL Could not initialize threadpool properly
386  * \retval non-NULL The newly-allocated threadpool
387  */
388 static struct ast_threadpool *threadpool_alloc(const char *name, const struct ast_threadpool_options *options)
389 {
390         RAII_VAR(struct ast_threadpool *, pool, NULL, ao2_cleanup);
391         struct ast_str *control_tps_name;
392
393         pool = ao2_alloc(sizeof(*pool), threadpool_destructor);
394         control_tps_name = ast_str_create(64);
395         if (!pool || !control_tps_name) {
396                 ast_free(control_tps_name);
397                 return NULL;
398         }
399
400         ast_str_set(&control_tps_name, 0, "%s-control", name);
401
402         pool->control_tps = ast_taskprocessor_get(ast_str_buffer(control_tps_name), TPS_REF_DEFAULT);
403         ast_free(control_tps_name);
404         if (!pool->control_tps) {
405                 return NULL;
406         }
407         pool->active_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
408         if (!pool->active_threads) {
409                 return NULL;
410         }
411         pool->idle_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
412         if (!pool->idle_threads) {
413                 return NULL;
414         }
415         pool->zombie_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
416         if (!pool->zombie_threads) {
417                 return NULL;
418         }
419         pool->options = *options;
420
421         ao2_ref(pool, +1);
422         return pool;
423 }
424
425 static int threadpool_tps_start(struct ast_taskprocessor_listener *listener)
426 {
427         return 0;
428 }
429
430 /*!
431  * \brief helper used for queued task when tasks are pushed
432  */
433 struct task_pushed_data {
434         /*! Pool into which a task was pushed */
435         struct ast_threadpool *pool;
436         /*! Indicator of whether the pool had no tasks prior to the new task being added */
437         int was_empty;
438 };
439
440 /*!
441  * \brief Allocate and initialize a task_pushed_data
442  * \param pool The threadpool to set in the task_pushed_data
443  * \param was_empty The was_empty value to set in the task_pushed_data
444  * \retval NULL Unable to allocate task_pushed_data
445  * \retval non-NULL The newly-allocated task_pushed_data
446  */
447 static struct task_pushed_data *task_pushed_data_alloc(struct ast_threadpool *pool,
448                 int was_empty)
449 {
450         struct task_pushed_data *tpd = ao2_alloc(sizeof(*tpd), NULL);
451
452         if (!tpd) {
453                 return NULL;
454         }
455         tpd->pool = pool;
456         tpd->was_empty = was_empty;
457         return tpd;
458 }
459
460 /*!
461  * \brief Activate idle threads
462  *
463  * This function always returns CMP_MATCH because all workers that this
464  * function acts on need to be seen as matches so they are unlinked from the
465  * list of idle threads.
466  *
467  * Called as an ao2_callback in the threadpool's control taskprocessor thread.
468  * \param obj The worker to activate
469  * \param arg The pool where the worker belongs
470  * \retval CMP_MATCH
471  */
472 static int activate_thread(void *obj, void *arg, int flags)
473 {
474         struct worker_thread *worker = obj;
475         struct ast_threadpool *pool = arg;
476
477         if (!ao2_link(pool->active_threads, worker)) {
478                 /* If we can't link the idle thread into the active container, then
479                  * we'll just leave the thread idle and not wake it up.
480                  */
481                 ast_log(LOG_WARNING, "Failed to activate thread %d. Remaining idle\n",
482                                 worker->id);
483                 return 0;
484         }
485
486         if (worker_set_state(worker, ALIVE)) {
487                 ast_debug(1, "Failed to activate thread %d. It is dead\n",
488                                 worker->id);
489                 /* The worker thread will no longer exist in the active threads or
490                  * idle threads container after this.
491                  */
492                 ao2_unlink(pool->active_threads, worker);
493         }
494
495         return CMP_MATCH;
496 }
497
498 /*!
499  * \brief Add threads to the threadpool
500  *
501  * This function is called from the threadpool's control taskprocessor thread.
502  * \param pool The pool that is expanding
503  * \delta The number of threads to add to the pool
504  */
505 static void grow(struct ast_threadpool *pool, int delta)
506 {
507         int i;
508
509         int current_size = ao2_container_count(pool->active_threads) +
510                 ao2_container_count(pool->idle_threads);
511
512         if (pool->options.max_size && current_size + delta > pool->options.max_size) {
513                 delta = pool->options.max_size - current_size;
514         }
515
516         ast_debug(3, "Increasing threadpool %s's size by %d\n",
517                         ast_taskprocessor_name(pool->tps), delta);
518
519         for (i = 0; i < delta; ++i) {
520                 struct worker_thread *worker = worker_thread_alloc(pool);
521                 if (!worker) {
522                         return;
523                 }
524                 if (ao2_link(pool->idle_threads, worker)) {
525                         if (worker_thread_start(worker)) {
526                                 ast_log(LOG_ERROR, "Unable to start worker thread %d. Destroying.\n", worker->id);
527                                 ao2_unlink(pool->active_threads, worker);
528                         }
529                 } else {
530                         ast_log(LOG_WARNING, "Failed to activate worker thread %d. Destroying.\n", worker->id);
531                 }
532                 ao2_ref(worker, -1);
533         }
534 }
535
536 /*!
537  * \brief Queued task called when tasks are pushed into the threadpool
538  *
539  * This function first calls into the threadpool's listener to let it know
540  * that a task has been pushed. It then wakes up all idle threads and moves
541  * them into the active thread container.
542  * \param data A task_pushed_data
543  * \return 0
544  */
545 static int queued_task_pushed(void *data)
546 {
547         struct task_pushed_data *tpd = data;
548         struct ast_threadpool *pool = tpd->pool;
549         int was_empty = tpd->was_empty;
550         unsigned int existing_active;
551
552         if (pool->listener && pool->listener->callbacks->task_pushed) {
553                 pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty);
554         }
555
556         existing_active = ao2_container_count(pool->active_threads);
557
558         /* The first pass transitions any existing idle threads to be active, and
559          * will also remove any worker threads that have recently entered the dead
560          * state.
561          */
562         ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
563                         activate_thread, pool);
564
565         /* If no idle threads could be transitioned to active grow the pool as permitted. */
566         if (ao2_container_count(pool->active_threads) == existing_active) {
567                 if (!pool->options.auto_increment) {
568                         ao2_ref(tpd, -1);
569                         return 0;
570                 }
571                 grow(pool, pool->options.auto_increment);
572                 /* An optional second pass transitions any newly added threads. */
573                 ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
574                                 activate_thread, pool);
575         }
576
577         threadpool_send_state_changed(pool);
578         ao2_ref(tpd, -1);
579         return 0;
580 }
581
582 /*!
583  * \brief Taskprocessor listener callback called when a task is added
584  *
585  * The threadpool uses this opportunity to queue a task on its control taskprocessor
586  * in order to activate idle threads and notify the threadpool listener that the
587  * task has been pushed.
588  * \param listener The taskprocessor listener. The threadpool is the listener's private data
589  * \param was_empty True if the taskprocessor was empty prior to the task being pushed
590  */
591 static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener,
592                 int was_empty)
593 {
594         struct ast_threadpool *pool = ast_taskprocessor_listener_get_user_data(listener);
595         struct task_pushed_data *tpd;
596         SCOPED_AO2LOCK(lock, pool);
597
598         if (pool->shutting_down) {
599                 return;
600         }
601         tpd = task_pushed_data_alloc(pool, was_empty);
602         if (!tpd) {
603                 return;
604         }
605
606         ast_taskprocessor_push(pool->control_tps, queued_task_pushed, tpd);
607 }
608
609 /*!
610  * \brief Queued task that handles the case where the threadpool's taskprocessor is emptied
611  *
612  * This simply lets the threadpool's listener know that the threadpool is devoid of tasks
613  * \param data The pool that has become empty
614  * \return 0
615  */
616 static int queued_emptied(void *data)
617 {
618         struct ast_threadpool *pool = data;
619
620         /* We already checked for existence of this callback when this was queued */
621         pool->listener->callbacks->emptied(pool, pool->listener);
622         return 0;
623 }
624
625 /*!
626  * \brief Taskprocessor listener emptied callback
627  *
628  * The threadpool queues a task to let the threadpool listener know that
629  * the threadpool no longer contains any tasks.
630  * \param listener The taskprocessor listener. The threadpool is the listener's private data.
631  */
632 static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener)
633 {
634         struct ast_threadpool *pool = ast_taskprocessor_listener_get_user_data(listener);
635         SCOPED_AO2LOCK(lock, pool);
636
637         if (pool->shutting_down) {
638                 return;
639         }
640
641         if (pool->listener && pool->listener->callbacks->emptied) {
642                 ast_taskprocessor_push(pool->control_tps, queued_emptied, pool);
643         }
644 }
645
646 /*!
647  * \brief Taskprocessor listener shutdown callback
648  *
649  * The threadpool will shut down and destroy all of its worker threads when
650  * this is called back. By the time this gets called, the taskprocessor's
651  * control taskprocessor has already been destroyed. Therefore there is no risk
652  * in outright destroying the worker threads here.
653  * \param listener The taskprocessor listener. The threadpool is the listener's private data.
654  */
655 static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener)
656 {
657         struct ast_threadpool *pool = ast_taskprocessor_listener_get_user_data(listener);
658
659         if (pool->listener && pool->listener->callbacks->shutdown) {
660                 pool->listener->callbacks->shutdown(pool->listener);
661         }
662         ao2_cleanup(pool->active_threads);
663         ao2_cleanup(pool->idle_threads);
664         ao2_cleanup(pool->zombie_threads);
665         ao2_cleanup(pool);
666 }
667
668 /*!
669  * \brief Table of taskprocessor listener callbacks for threadpool's main taskprocessor
670  */
671 static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks = {
672         .start = threadpool_tps_start,
673         .task_pushed = threadpool_tps_task_pushed,
674         .emptied = threadpool_tps_emptied,
675         .shutdown = threadpool_tps_shutdown,
676 };
677
678 /*!
679  * \brief ao2 callback to kill a set number of threads.
680  *
681  * Threads will be unlinked from the container as long as the
682  * counter has not reached zero. The counter is decremented with
683  * each thread that is removed.
684  * \param obj The worker thread up for possible destruction
685  * \param arg The counter
686  * \param flags Unused
687  * \retval CMP_MATCH The counter has not reached zero, so this flag should be removed.
688  * \retval CMP_STOP The counter has reached zero so no more threads should be removed.
689  */
690 static int kill_threads(void *obj, void *arg, int flags)
691 {
692         int *num_to_kill = arg;
693
694         if (*num_to_kill > 0) {
695                 --(*num_to_kill);
696                 return CMP_MATCH;
697         } else {
698                 return CMP_STOP;
699         }
700 }
701
702 /*!
703  * \brief ao2 callback to zombify a set number of threads.
704  *
705  * Threads will be zombified as long as the counter has not reached
706  * zero. The counter is decremented with each thread that is zombified.
707  *
708  * Zombifying a thread involves removing it from its current container,
709  * adding it to the zombie container, and changing the state of the
710  * worker to a zombie
711  *
712  * This callback is called from the threadpool control taskprocessor thread.
713  *
714  * \param obj The worker thread that may be zombified
715  * \param arg The pool to which the worker belongs
716  * \param data The counter
717  * \param flags Unused
718  * \retval CMP_MATCH The zombified thread should be removed from its current container
719  * \retval CMP_STOP Stop attempting to zombify threads
720  */
721 static int zombify_threads(void *obj, void *arg, void *data, int flags)
722 {
723         struct worker_thread *worker = obj;
724         struct ast_threadpool *pool = arg;
725         int *num_to_zombify = data;
726
727         if ((*num_to_zombify)-- > 0) {
728                 if (!ao2_link(pool->zombie_threads, worker)) {
729                         ast_log(LOG_WARNING, "Failed to zombify active thread %d. Thread will remain active\n", worker->id);
730                         return 0;
731                 }
732                 worker_set_state(worker, ZOMBIE);
733                 return CMP_MATCH;
734         } else {
735                 return CMP_STOP;
736         }
737 }
738
739 /*!
740  * \brief Remove threads from the threadpool
741  *
742  * The preference is to kill idle threads. However, if there are
743  * more threads to remove than there are idle threads, then active
744  * threads will be zombified instead.
745  *
746  * This function is called from the threadpool control taskprocessor thread.
747  *
748  * \param pool The threadpool to remove threads from
749  * \param delta The number of threads to remove
750  */
751 static void shrink(struct ast_threadpool *pool, int delta)
752 {
753         /*
754          * Preference is to kill idle threads, but
755          * we'll move on to deactivating active threads
756          * if we have to
757          */
758         int idle_threads = ao2_container_count(pool->idle_threads);
759         int idle_threads_to_kill = MIN(delta, idle_threads);
760         int active_threads_to_zombify = delta - idle_threads_to_kill;
761
762         ast_debug(3, "Destroying %d idle threads in threadpool %s\n", idle_threads_to_kill,
763                         ast_taskprocessor_name(pool->tps));
764
765         ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
766                         kill_threads, &idle_threads_to_kill);
767
768         ast_debug(3, "Destroying %d active threads in threadpool %s\n", active_threads_to_zombify,
769                         ast_taskprocessor_name(pool->tps));
770
771         ao2_callback_data(pool->active_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
772                         zombify_threads, pool, &active_threads_to_zombify);
773 }
774
775 /*!
776  * \brief Helper struct used for queued operations that change the size of the threadpool
777  */
778 struct set_size_data {
779         /*! The pool whose size is to change */
780         struct ast_threadpool *pool;
781         /*! The requested new size of the pool */
782         unsigned int size;
783 };
784
785 /*!
786  * \brief Allocate and initialize a set_size_data
787  * \param pool The pool for the set_size_data
788  * \param size The size to store in the set_size_data
789  */
790 static struct set_size_data *set_size_data_alloc(struct ast_threadpool *pool,
791                 unsigned int size)
792 {
793         struct set_size_data *ssd = ao2_alloc(sizeof(*ssd), NULL);
794         if (!ssd) {
795                 return NULL;
796         }
797
798         ssd->pool = pool;
799         ssd->size = size;
800         return ssd;
801 }
802
803 /*!
804  * \brief Change the size of the threadpool
805  *
806  * This can either result in shrinking or growing the threadpool depending
807  * on the new desired size and the current size.
808  *
809  * This function is run from the threadpool control taskprocessor thread
810  *
811  * \param data A set_size_data used for determining how to act
812  * \return 0
813  */
814 static int queued_set_size(void *data)
815 {
816         RAII_VAR(struct set_size_data *, ssd, data, ao2_cleanup);
817         struct ast_threadpool *pool = ssd->pool;
818         unsigned int num_threads = ssd->size;
819
820         /* We don't count zombie threads as being "live" when potentially resizing */
821         unsigned int current_size = ao2_container_count(pool->active_threads) +
822                         ao2_container_count(pool->idle_threads);
823
824         if (current_size == num_threads) {
825                 ast_debug(3, "Not changing threadpool size since new size %u is the same as current %u\n",
826                           num_threads, current_size);
827                 return 0;
828         }
829
830         if (current_size < num_threads) {
831                 ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
832                                 activate_thread, pool);
833
834                 /* As the above may have altered the number of current threads update it */
835                 current_size = ao2_container_count(pool->active_threads) +
836                                 ao2_container_count(pool->idle_threads);
837                 grow(pool, num_threads - current_size);
838                 ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
839                                 activate_thread, pool);
840         } else {
841                 shrink(pool, current_size - num_threads);
842         }
843
844         threadpool_send_state_changed(pool);
845         return 0;
846 }
847
848 void ast_threadpool_set_size(struct ast_threadpool *pool, unsigned int size)
849 {
850         struct set_size_data *ssd;
851         SCOPED_AO2LOCK(lock, pool);
852         if (pool->shutting_down) {
853                 return;
854         }
855
856         ssd = set_size_data_alloc(pool, size);
857         if (!ssd) {
858                 return;
859         }
860
861         ast_taskprocessor_push(pool->control_tps, queued_set_size, ssd);
862 }
863
864 struct ast_threadpool_listener *ast_threadpool_listener_alloc(
865                 const struct ast_threadpool_listener_callbacks *callbacks, void *user_data)
866 {
867         struct ast_threadpool_listener *listener = ao2_alloc(sizeof(*listener), NULL);
868         if (!listener) {
869                 return NULL;
870         }
871         listener->callbacks = callbacks;
872         listener->user_data = user_data;
873         return listener;
874 }
875
876 void *ast_threadpool_listener_get_user_data(const struct ast_threadpool_listener *listener)
877 {
878         return listener->user_data;
879 }
880
881 struct pool_options_pair {
882         struct ast_threadpool *pool;
883         struct ast_threadpool_options options;
884 };
885
886 struct ast_threadpool *ast_threadpool_create(const char *name,
887                 struct ast_threadpool_listener *listener,
888                 const struct ast_threadpool_options *options)
889 {
890         struct ast_taskprocessor *tps;
891         RAII_VAR(struct ast_taskprocessor_listener *, tps_listener, NULL, ao2_cleanup);
892         RAII_VAR(struct ast_threadpool *, pool, NULL, ao2_cleanup);
893
894         pool = threadpool_alloc(name, options);
895         if (!pool) {
896                 return NULL;
897         }
898
899         tps_listener = ast_taskprocessor_listener_alloc(&threadpool_tps_listener_callbacks, pool);
900         if (!tps_listener) {
901                 return NULL;
902         }
903
904         if (options->version != AST_THREADPOOL_OPTIONS_VERSION) {
905                 ast_log(LOG_WARNING, "Incompatible version of threadpool options in use.\n");
906                 return NULL;
907         }
908
909         tps = ast_taskprocessor_create_with_listener(name, tps_listener);
910         if (!tps) {
911                 return NULL;
912         }
913
914         pool->tps = tps;
915         if (listener) {
916                 ao2_ref(listener, +1);
917                 pool->listener = listener;
918         }
919         ast_threadpool_set_size(pool, pool->options.initial_size);
920         ao2_ref(pool, +1);
921         return pool;
922 }
923
924 int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data)
925 {
926         SCOPED_AO2LOCK(lock, pool);
927         if (!pool->shutting_down) {
928                 return ast_taskprocessor_push(pool->tps, task, data);
929         }
930         return -1;
931 }
932
933 void ast_threadpool_shutdown(struct ast_threadpool *pool)
934 {
935         if (!pool) {
936                 return;
937         }
938         /* Shut down the taskprocessors and everything else just
939          * takes care of itself via the taskprocessor callbacks
940          */
941         ao2_lock(pool);
942         pool->shutting_down = 1;
943         ao2_unlock(pool);
944         ast_taskprocessor_unreference(pool->control_tps);
945         ast_taskprocessor_unreference(pool->tps);
946 }
947
948 /*!
949  * A monotonically increasing integer used for worker
950  * thread identification.
951  */
952 static int worker_id_counter;
953
954 static int worker_thread_hash(const void *obj, int flags)
955 {
956         const struct worker_thread *worker = obj;
957
958         return worker->id;
959 }
960
961 static int worker_thread_cmp(void *obj, void *arg, int flags)
962 {
963         struct worker_thread *worker1 = obj;
964         struct worker_thread *worker2 = arg;
965
966         return worker1->id == worker2->id ? CMP_MATCH : 0;
967 }
968
969 /*!
970  * \brief shut a worker thread down
971  *
972  * Set the worker dead and then wait for its thread
973  * to finish executing.
974  *
975  * \param worker The worker thread to shut down
976  */
977 static void worker_shutdown(struct worker_thread *worker)
978 {
979         worker_set_state(worker, DEAD);
980         if (worker->thread != AST_PTHREADT_NULL) {
981                 pthread_join(worker->thread, NULL);
982                 worker->thread = AST_PTHREADT_NULL;
983         }
984 }
985
986 /*!
987  * \brief Worker thread destructor
988  *
989  * Called automatically when refcount reaches 0. Shuts
990  * down the worker thread and destroys its component
991  * parts
992  */
993 static void worker_thread_destroy(void *obj)
994 {
995         struct worker_thread *worker = obj;
996         ast_debug(3, "Destroying worker thread %d\n", worker->id);
997         worker_shutdown(worker);
998         ast_mutex_destroy(&worker->lock);
999         ast_cond_destroy(&worker->cond);
1000 }
1001
1002 /*!
1003  * \brief start point for worker threads
1004  *
1005  * Worker threads start in the active state but may
1006  * immediately go idle if there is no work to be
1007  * done
1008  *
1009  * \param arg The worker thread
1010  * \retval NULL
1011  */
1012 static void *worker_start(void *arg)
1013 {
1014         struct worker_thread *worker = arg;
1015         enum worker_state saved_state;
1016
1017         if (worker->options.thread_start) {
1018                 worker->options.thread_start();
1019         }
1020
1021         ast_mutex_lock(&worker->lock);
1022         while (worker_idle(worker)) {
1023                 ast_mutex_unlock(&worker->lock);
1024                 worker_active(worker);
1025                 ast_mutex_lock(&worker->lock);
1026                 if (worker->state != ALIVE) {
1027                         break;
1028                 }
1029                 threadpool_active_thread_idle(worker->pool, worker);
1030         }
1031         saved_state = worker->state;
1032         ast_mutex_unlock(&worker->lock);
1033
1034         /* Reaching this portion means the thread is
1035          * on death's door. It may have been killed while
1036          * it was idle, in which case it can just die
1037          * peacefully. If it's a zombie, though, then
1038          * it needs to let the pool know so
1039          * that the thread can be removed from the
1040          * list of zombie threads.
1041          */
1042         if (saved_state == ZOMBIE) {
1043                 threadpool_zombie_thread_dead(worker->pool, worker);
1044         }
1045
1046         if (worker->options.thread_end) {
1047                 worker->options.thread_end();
1048         }
1049         return NULL;
1050 }
1051
1052 /*!
1053  * \brief Allocate and initialize a new worker thread
1054  *
1055  * This will create, initialize, and start the thread.
1056  *
1057  * \param pool The threadpool to which the worker will be added
1058  * \retval NULL Failed to allocate or start the worker thread
1059  * \retval non-NULL The newly-created worker thread
1060  */
1061 static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool)
1062 {
1063         struct worker_thread *worker = ao2_alloc(sizeof(*worker), worker_thread_destroy);
1064         if (!worker) {
1065                 return NULL;
1066         }
1067         worker->id = ast_atomic_fetchadd_int(&worker_id_counter, 1);
1068         ast_mutex_init(&worker->lock);
1069         ast_cond_init(&worker->cond, NULL);
1070         worker->pool = pool;
1071         worker->thread = AST_PTHREADT_NULL;
1072         worker->state = ALIVE;
1073         worker->options = pool->options;
1074         return worker;
1075 }
1076
1077 static int worker_thread_start(struct worker_thread *worker)
1078 {
1079         return ast_pthread_create(&worker->thread, NULL, worker_start, worker);
1080 }
1081
1082 /*!
1083  * \brief Active loop for worker threads
1084  *
1085  * The worker will stay in this loop for its lifetime,
1086  * executing tasks as they become available. If there
1087  * are no tasks currently available, then the thread
1088  * will go idle.
1089  *
1090  * \param worker The worker thread executing tasks.
1091  */
1092 static void worker_active(struct worker_thread *worker)
1093 {
1094         int alive;
1095
1096         /* The following is equivalent to
1097          *
1098          * while (threadpool_execute(worker->pool));
1099          *
1100          * However, reviewers have suggested in the past
1101          * doing that can cause optimizers to (wrongly)
1102          * optimize the code away.
1103          */
1104         do {
1105                 alive = threadpool_execute(worker->pool);
1106         } while (alive);
1107 }
1108
1109 /*!
1110  * \brief Idle function for worker threads
1111  *
1112  * The worker waits here until it gets told by the threadpool
1113  * to wake up.
1114  *
1115  * worker is locked before entering this function.
1116  *
1117  * \param worker The idle worker
1118  * \retval 0 The thread is being woken up so that it can conclude.
1119  * \retval non-zero The thread is being woken up to do more work.
1120  */
1121 static int worker_idle(struct worker_thread *worker)
1122 {
1123         struct timeval start = ast_tvnow();
1124         struct timespec end = {
1125                 .tv_sec = start.tv_sec + worker->options.idle_timeout,
1126                 .tv_nsec = start.tv_usec * 1000,
1127         };
1128         while (!worker->wake_up) {
1129                 if (worker->options.idle_timeout <= 0) {
1130                         ast_cond_wait(&worker->cond, &worker->lock);
1131                 } else if (ast_cond_timedwait(&worker->cond, &worker->lock, &end) == ETIMEDOUT) {
1132                         break;
1133                 }
1134         }
1135
1136         if (!worker->wake_up) {
1137                 ast_debug(1, "Worker thread idle timeout reached. Dying.\n");
1138                 threadpool_idle_thread_dead(worker->pool, worker);
1139                 worker->state = DEAD;
1140         }
1141         worker->wake_up = 0;
1142         return worker->state == ALIVE;
1143 }
1144
1145 /*!
1146  * \brief Change a worker's state
1147  *
1148  * The threadpool calls into this function in order to let a worker know
1149  * how it should proceed.
1150  *
1151  * \retval -1 failure (state transition not permitted)
1152  * \retval 0 success
1153  */
1154 static int worker_set_state(struct worker_thread *worker, enum worker_state state)
1155 {
1156         SCOPED_MUTEX(lock, &worker->lock);
1157
1158         switch (state) {
1159         case ALIVE:
1160                 /* This can occur due to a race condition between being told to go active
1161                  * and an idle timeout happening.
1162                  */
1163                 if (worker->state == DEAD) {
1164                         return -1;
1165                 }
1166                 ast_assert(worker->state != ZOMBIE);
1167                 break;
1168         case DEAD:
1169                 break;
1170         case ZOMBIE:
1171                 ast_assert(worker->state != DEAD);
1172                 break;
1173         }
1174
1175         worker->state = state;
1176         worker->wake_up = 1;
1177         ast_cond_signal(&worker->cond);
1178
1179         return 0;
1180 }
1181
1182 /*! Serializer group shutdown control object. */
1183 struct ast_serializer_shutdown_group {
1184         /*! Shutdown thread waits on this conditional. */
1185         ast_cond_t cond;
1186         /*! Count of serializers needing to shutdown. */
1187         int count;
1188 };
1189
1190 static void serializer_shutdown_group_dtor(void *vdoomed)
1191 {
1192         struct ast_serializer_shutdown_group *doomed = vdoomed;
1193
1194         ast_cond_destroy(&doomed->cond);
1195 }
1196
1197 struct ast_serializer_shutdown_group *ast_serializer_shutdown_group_alloc(void)
1198 {
1199         struct ast_serializer_shutdown_group *shutdown_group;
1200
1201         shutdown_group = ao2_alloc(sizeof(*shutdown_group), serializer_shutdown_group_dtor);
1202         if (!shutdown_group) {
1203                 return NULL;
1204         }
1205         ast_cond_init(&shutdown_group->cond, NULL);
1206         return shutdown_group;
1207 }
1208
1209 int ast_serializer_shutdown_group_join(struct ast_serializer_shutdown_group *shutdown_group, int timeout)
1210 {
1211         int remaining;
1212         ast_mutex_t *lock;
1213
1214         if (!shutdown_group) {
1215                 return 0;
1216         }
1217
1218         lock = ao2_object_get_lockaddr(shutdown_group);
1219         ast_assert(lock != NULL);
1220
1221         ao2_lock(shutdown_group);
1222         if (timeout) {
1223                 struct timeval start;
1224                 struct timespec end;
1225
1226                 start = ast_tvnow();
1227                 end.tv_sec = start.tv_sec + timeout;
1228                 end.tv_nsec = start.tv_usec * 1000;
1229                 while (shutdown_group->count) {
1230                         if (ast_cond_timedwait(&shutdown_group->cond, lock, &end)) {
1231                                 /* Error or timed out waiting for the count to reach zero. */
1232                                 break;
1233                         }
1234                 }
1235         } else {
1236                 while (shutdown_group->count) {
1237                         if (ast_cond_wait(&shutdown_group->cond, lock)) {
1238                                 /* Error */
1239                                 break;
1240                         }
1241                 }
1242         }
1243         remaining = shutdown_group->count;
1244         ao2_unlock(shutdown_group);
1245         return remaining;
1246 }
1247
1248 /*!
1249  * \internal
1250  * \brief Increment the number of serializer members in the group.
1251  * \since 13.5.0
1252  *
1253  * \param shutdown_group Group shutdown controller.
1254  *
1255  * \return Nothing
1256  */
1257 static void serializer_shutdown_group_inc(struct ast_serializer_shutdown_group *shutdown_group)
1258 {
1259         ao2_lock(shutdown_group);
1260         ++shutdown_group->count;
1261         ao2_unlock(shutdown_group);
1262 }
1263
1264 /*!
1265  * \internal
1266  * \brief Decrement the number of serializer members in the group.
1267  * \since 13.5.0
1268  *
1269  * \param shutdown_group Group shutdown controller.
1270  *
1271  * \return Nothing
1272  */
1273 static void serializer_shutdown_group_dec(struct ast_serializer_shutdown_group *shutdown_group)
1274 {
1275         ao2_lock(shutdown_group);
1276         --shutdown_group->count;
1277         if (!shutdown_group->count) {
1278                 ast_cond_signal(&shutdown_group->cond);
1279         }
1280         ao2_unlock(shutdown_group);
1281 }
1282
1283 struct serializer {
1284         /*! Threadpool the serializer will use to process the jobs. */
1285         struct ast_threadpool *pool;
1286         /*! Which group will wait for this serializer to shutdown. */
1287         struct ast_serializer_shutdown_group *shutdown_group;
1288 };
1289
1290 static void serializer_dtor(void *obj)
1291 {
1292         struct serializer *ser = obj;
1293
1294         ao2_cleanup(ser->pool);
1295         ser->pool = NULL;
1296         ao2_cleanup(ser->shutdown_group);
1297         ser->shutdown_group = NULL;
1298 }
1299
1300 static struct serializer *serializer_create(struct ast_threadpool *pool,
1301         struct ast_serializer_shutdown_group *shutdown_group)
1302 {
1303         struct serializer *ser;
1304
1305         ser = ao2_alloc_options(sizeof(*ser), serializer_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
1306         if (!ser) {
1307                 return NULL;
1308         }
1309         ao2_ref(pool, +1);
1310         ser->pool = pool;
1311         ser->shutdown_group = ao2_bump(shutdown_group);
1312         return ser;
1313 }
1314
1315 AST_THREADSTORAGE_RAW(current_serializer);
1316
1317 static int execute_tasks(void *data)
1318 {
1319         struct ast_taskprocessor *tps = data;
1320
1321         ast_threadstorage_set_ptr(&current_serializer, tps);
1322         while (ast_taskprocessor_execute(tps)) {
1323                 /* No-op */
1324         }
1325         ast_threadstorage_set_ptr(&current_serializer, NULL);
1326
1327         ast_taskprocessor_unreference(tps);
1328         return 0;
1329 }
1330
1331 static void serializer_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
1332 {
1333         if (was_empty) {
1334                 struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener);
1335                 struct ast_taskprocessor *tps = ast_taskprocessor_listener_get_tps(listener);
1336
1337                 if (ast_threadpool_push(ser->pool, execute_tasks, tps)) {
1338                         ast_taskprocessor_unreference(tps);
1339                 }
1340         }
1341 }
1342
1343 static int serializer_start(struct ast_taskprocessor_listener *listener)
1344 {
1345         /* No-op */
1346         return 0;
1347 }
1348
1349 static void serializer_shutdown(struct ast_taskprocessor_listener *listener)
1350 {
1351         struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener);
1352
1353         if (ser->shutdown_group) {
1354                 serializer_shutdown_group_dec(ser->shutdown_group);
1355         }
1356         ao2_cleanup(ser);
1357 }
1358
1359 static struct ast_taskprocessor_listener_callbacks serializer_tps_listener_callbacks = {
1360         .task_pushed = serializer_task_pushed,
1361         .start = serializer_start,
1362         .shutdown = serializer_shutdown,
1363 };
1364
1365 struct ast_taskprocessor *ast_threadpool_serializer_get_current(void)
1366 {
1367         return ast_threadstorage_get_ptr(&current_serializer);
1368 }
1369
1370 struct ast_taskprocessor *ast_threadpool_serializer_group(const char *name,
1371         struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
1372 {
1373         struct serializer *ser;
1374         struct ast_taskprocessor_listener *listener;
1375         struct ast_taskprocessor *tps;
1376
1377         ser = serializer_create(pool, shutdown_group);
1378         if (!ser) {
1379                 return NULL;
1380         }
1381
1382         listener = ast_taskprocessor_listener_alloc(&serializer_tps_listener_callbacks, ser);
1383         if (!listener) {
1384                 ao2_ref(ser, -1);
1385                 return NULL;
1386         }
1387
1388         tps = ast_taskprocessor_create_with_listener(name, listener);
1389         if (!tps) {
1390                 /* ser ref transferred to listener but not cleaned without tps */
1391                 ao2_ref(ser, -1);
1392         } else if (shutdown_group) {
1393                 serializer_shutdown_group_inc(shutdown_group);
1394         }
1395
1396         ao2_ref(listener, -1);
1397         return tps;
1398 }
1399
1400 struct ast_taskprocessor *ast_threadpool_serializer(const char *name, struct ast_threadpool *pool)
1401 {
1402         return ast_threadpool_serializer_group(name, pool, NULL);
1403 }
1404
1405 long ast_threadpool_queue_size(struct ast_threadpool *pool)
1406 {
1407         return ast_taskprocessor_size(pool->tps);
1408 }