2 * Asterisk -- An open source telephony toolkit.
4 * Copyright (C) 2012, Digium, Inc.
6 * Mark Michelson <mmmichelson@digium.com>
8 * See http://www.asterisk.org for more information about
9 * the Asterisk project. Please do not directly contact
10 * any of the maintainers of this project for assistance;
11 * the project provides a web site, mailing lists and IRC
12 * channels for your use.
14 * This program is free software, distributed under the terms of
15 * the GNU General Public License Version 2. See the LICENSE file
16 * at the top of the source tree.
22 #include "asterisk/threadpool.h"
23 #include "asterisk/taskprocessor.h"
24 #include "asterisk/astobj2.h"
25 #include "asterisk/utils.h"
27 #define THREAD_BUCKETS 89
29 static int id_counter;
31 struct ast_threadpool {
32 struct ast_threadpool_listener *listener;
33 struct ao2_container *active_threads;
34 struct ao2_container *idle_threads;
35 struct ao2_container *zombie_threads;
36 struct ast_taskprocessor *tps;
37 struct ast_taskprocessor *control_tps;
46 struct worker_thread {
51 struct ast_threadpool *pool;
52 enum worker_state state;
56 static int worker_thread_hash(const void *obj, int flags)
58 const struct worker_thread *worker = obj;
63 static int worker_thread_cmp(void *obj, void *arg, int flags)
65 struct worker_thread *worker1 = obj;
66 struct worker_thread *worker2 = arg;
68 return worker1->id == worker2->id ? CMP_MATCH : 0;
71 static void worker_thread_destroy(void *obj)
73 struct worker_thread *worker = obj;
74 ast_mutex_destroy(&worker->lock);
75 ast_cond_destroy(&worker->cond);
78 static int worker_active(struct worker_thread *worker);
80 static void *worker_start(void *arg)
82 struct worker_thread *worker = arg;
84 worker_active(worker);
88 static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool)
90 struct worker_thread *worker = ao2_alloc(sizeof(*worker), worker_thread_destroy);
95 worker->id = ast_atomic_fetchadd_int(&id_counter, 1);
96 ast_mutex_init(&worker->lock);
97 ast_cond_init(&worker->cond, NULL);
99 worker->thread = AST_PTHREADT_NULL;
100 worker->state = ALIVE;
101 if (ast_pthread_create(&worker->thread, NULL, worker_start, worker) < 0) {
109 static void threadpool_send_state_changed(struct ast_threadpool *pool)
111 int active_size = ao2_container_count(pool->active_threads);
112 int idle_size = ao2_container_count(pool->idle_threads);
113 int zombie_size = ao2_container_count(pool->zombie_threads);
115 pool->listener->callbacks->state_changed(pool->listener, active_size, idle_size, zombie_size);
118 struct thread_worker_pair {
119 struct ast_threadpool *pool;
120 struct worker_thread *worker;
123 static void thread_worker_pair_destructor(void *obj)
125 struct thread_worker_pair *pair = obj;
126 ao2_ref(pair->pool, -1);
127 ao2_ref(pair->worker, -1);
130 static struct thread_worker_pair *thread_worker_pair_init(struct ast_threadpool *pool,
131 struct worker_thread *worker)
133 struct thread_worker_pair *pair = ao2_alloc(sizeof(*pair), thread_worker_pair_destructor);
141 pair->worker = worker;
145 static int queued_active_thread_idle(void *data)
147 struct thread_worker_pair *pair = data;
149 ao2_link(pair->pool->idle_threads, pair->worker);
150 ao2_unlink(pair->pool->active_threads, pair->worker);
152 threadpool_send_state_changed(pair->pool);
158 static void threadpool_active_thread_idle(struct ast_threadpool *pool,
159 struct worker_thread *worker)
161 struct thread_worker_pair *pair = thread_worker_pair_init(pool, worker);
166 ast_taskprocessor_push(pool->control_tps, queued_active_thread_idle, pair);
169 static int queued_zombie_thread_dead(void *data)
171 struct thread_worker_pair *pair = data;
173 ao2_unlink(pair->pool->zombie_threads, pair->worker);
174 threadpool_send_state_changed(pair->pool);
180 static void threadpool_zombie_thread_dead(struct ast_threadpool *pool,
181 struct worker_thread *worker)
183 struct thread_worker_pair *pair = thread_worker_pair_init(pool, worker);
188 ast_taskprocessor_push(pool->control_tps, queued_zombie_thread_dead, pair);
191 static int worker_idle(struct worker_thread *worker)
193 SCOPED_MUTEX(lock, &worker->lock);
194 if (worker->state != ALIVE) {
197 threadpool_active_thread_idle(worker->pool, worker);
198 while (!worker->wake_up) {
199 ast_cond_wait(&worker->cond, lock);
202 return worker->state == ALIVE;
205 static int threadpool_execute(struct ast_threadpool *pool)
207 return ast_taskprocessor_execute(pool->tps);
210 static int worker_active(struct worker_thread *worker)
214 if (threadpool_execute(worker->pool)) {
215 alive = worker_idle(worker);
219 /* Reaching this portion means the thread is
220 * on death's door. It may have been killed while
221 * it was idle, in which case it can just die
222 * peacefully. If it's a zombie, though, then
223 * it needs to let the pool know so
224 * that the thread can be removed from the
225 * list of zombie threads.
227 if (worker->state == ZOMBIE) {
228 threadpool_zombie_thread_dead(worker->pool, worker);
234 static void worker_set_state(struct worker_thread *worker, enum worker_state state)
236 SCOPED_MUTEX(lock, &worker->lock);
237 worker->state = state;
239 ast_cond_signal(&worker->cond);
242 static int worker_shutdown(void *obj, void *arg, int flags)
244 struct worker_thread *worker = obj;
246 worker_set_state(worker, DEAD);
247 if (worker->thread != AST_PTHREADT_NULL) {
248 pthread_join(worker->thread, NULL);
249 worker->thread = AST_PTHREADT_NULL;
254 static void threadpool_tps_listener_destroy(void *private_data)
256 struct ast_threadpool *pool = private_data;
257 /* XXX Probably should let the listener know we're being destroyed? */
259 /* Threads should all be shut down by now, so this should be a painless
262 ao2_ref(pool->active_threads, -1);
263 ao2_ref(pool->idle_threads, -1);
264 ao2_ref(pool->zombie_threads, -1);
265 ao2_ref(pool->listener, -1);
269 static void *threadpool_tps_listener_alloc(struct ast_taskprocessor_listener *listener)
271 RAII_VAR(struct ast_threadpool *, pool,
272 ao2_alloc(sizeof(*pool), threadpool_tps_listener_destroy), ao2_cleanup);
274 pool->control_tps = ast_taskprocessor_get("CHANGE THIS", TPS_REF_DEFAULT);
275 if (!pool->control_tps) {
278 pool->active_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
279 if (!pool->active_threads) {
282 pool->idle_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
283 if (!pool->idle_threads) {
286 pool->zombie_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
287 if (!pool->zombie_threads) {
291 pool->tps = listener->tps;
297 struct task_pushed_data {
298 struct ast_threadpool *pool;
302 static void task_pushed_data_destroy(void *obj)
304 struct task_pushed_data *tpd = obj;
305 ao2_ref(tpd->pool, -1);
308 static struct task_pushed_data *task_pushed_data_alloc(struct ast_threadpool *pool,
311 struct task_pushed_data *tpd = ao2_alloc(sizeof(*tpd),
312 task_pushed_data_destroy);
319 tpd->was_empty = was_empty;
323 static int activate_threads(void *obj, void *arg, int flags)
325 struct worker_thread *worker = obj;
326 struct ast_threadpool *pool = arg;
328 ao2_link(pool->active_threads, worker);
329 worker_set_state(worker, ALIVE);
333 static int handle_task_pushed(void *data)
335 struct task_pushed_data *tpd = data;
336 struct ast_threadpool *pool = tpd->pool;
337 int was_empty = tpd->was_empty;
339 pool->listener->callbacks->tps_task_pushed(pool->listener, was_empty);
340 ao2_callback(pool->idle_threads, OBJ_UNLINK, activate_threads, pool);
345 static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener,
348 struct ast_threadpool *pool = listener->private_data;
349 struct task_pushed_data *tpd = task_pushed_data_alloc(pool, was_empty);
356 ast_taskprocessor_push(pool->control_tps, handle_task_pushed, tpd);
359 static int handle_emptied(void *data)
361 struct ast_threadpool *pool = data;
363 pool->listener->callbacks->emptied(pool->listener);
368 static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener)
370 struct ast_threadpool *pool = listener->private_data;
373 ast_taskprocessor_push(pool->control_tps, handle_emptied, pool);
376 static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener)
379 * The threadpool triggers the taskprocessor to shut down. As a result,
380 * we have the freedom of shutting things down in three stages:
382 * 1) Before the tasprocessor is shut down
383 * 2) During taskprocessor shutdown (here)
384 * 3) After taskprocessor shutdown
386 * In the spirit of the taskprocessor shutdown, this would be
387 * where we make sure that all the worker threads are no longer
388 * executing. We could just do this before we even shut down
389 * the taskprocessor, but this feels more "right".
392 struct ast_threadpool *pool = listener->private_data;
393 ao2_callback(pool->active_threads, 0, worker_shutdown, NULL);
394 ao2_callback(pool->idle_threads, 0, worker_shutdown, NULL);
395 ao2_callback(pool->zombie_threads, 0, worker_shutdown, NULL);
398 static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks = {
399 .alloc = threadpool_tps_listener_alloc,
400 .task_pushed = threadpool_tps_task_pushed,
401 .emptied = threadpool_tps_emptied,
402 .shutdown = threadpool_tps_shutdown,
403 .destroy = threadpool_tps_listener_destroy,
406 static void grow(struct ast_threadpool *pool, int delta)
409 for (i = 0; i < delta; ++i) {
410 struct worker_thread *worker = worker_thread_alloc(pool);
415 ao2_link(pool->active_threads, worker);
419 static int kill_threads(void *obj, void *arg, int flags)
421 struct worker_thread *worker = obj;
422 int *num_to_kill = arg;
424 if ((*num_to_kill)-- > 0) {
425 worker_shutdown(worker, arg, flags);
432 static int zombify_threads(void *obj, void *arg, void *data, int flags)
434 struct worker_thread *worker = obj;
435 struct ast_threadpool *pool = arg;
436 int *num_to_zombify = data;
438 if ((*num_to_zombify)-- > 0) {
439 ao2_link(pool->zombie_threads, worker);
440 worker_set_state(worker, ZOMBIE);
447 static void shrink(struct ast_threadpool *pool, int delta)
450 * Preference is to kill idle threads, but
451 * we'll move on to deactivating active threads
454 int idle_threads = ao2_container_count(pool->idle_threads);
455 int idle_threads_to_kill = MIN(delta, idle_threads);
456 int active_threads_to_zombify = delta - idle_threads_to_kill;
458 ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE | OBJ_NOLOCK,
459 kill_threads, &idle_threads_to_kill);
461 ao2_callback_data(pool->active_threads, OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE | OBJ_NOLOCK,
462 zombify_threads, pool, &active_threads_to_zombify);
465 struct set_size_data {
466 struct ast_threadpool *pool;
470 static void set_size_data_destroy(void *obj)
472 struct set_size_data *ssd = obj;
473 ao2_ref(ssd->pool, -1);
476 static struct set_size_data *set_size_data_alloc(struct ast_threadpool *pool,
479 struct set_size_data *ssd = ao2_alloc(sizeof(*ssd), set_size_data_destroy);
491 static int queued_set_size(void *data)
493 struct set_size_data *ssd = data;
494 struct ast_threadpool *pool = ssd->pool;
495 int num_threads = ssd->size;
497 /* We don't count zombie threads as being "live when potentially resizing */
498 int current_size = ao2_container_count(pool->active_threads) +
499 ao2_container_count(pool->idle_threads);
501 if (current_size == num_threads) {
505 if (current_size < num_threads) {
506 grow(pool, num_threads - current_size);
508 shrink(pool, current_size - num_threads);
511 threadpool_send_state_changed(pool);
516 void ast_threadpool_set_size(struct ast_threadpool *pool, unsigned int size)
518 struct set_size_data *ssd;
520 ssd = set_size_data_alloc(pool, size);
526 ast_taskprocessor_push(pool->control_tps, queued_set_size, ssd);
529 struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *listener, int initial_size)
531 struct ast_threadpool *pool;
532 struct ast_taskprocessor *tps;
533 RAII_VAR(struct ast_taskprocessor_listener *, tps_listener,
534 ast_taskprocessor_listener_alloc(&threadpool_tps_listener_callbacks),
541 tps = ast_taskprocessor_create_with_listener("XXX CHANGE THIS XXX", tps_listener);
547 pool = tps_listener->private_data;
548 ast_threadpool_set_size(pool, initial_size);
552 void ast_threadpool_shutdown(struct ast_threadpool *pool)
554 /* Pretty simple really. We just shut down the
555 * taskprocessors and everything else just
556 * takes care of itself via the taskprocessor callbacks
558 ast_taskprocessor_unreference(pool->control_tps);
559 ast_taskprocessor_unreference(pool->tps);