2 * Asterisk -- An open source telephony toolkit.
4 * Copyright (C) 2012, Digium, Inc.
6 * Mark Michelson <mmmichelson@digium.com>
8 * See http://www.asterisk.org for more information about
9 * the Asterisk project. Please do not directly contact
10 * any of the maintainers of this project for assistance;
11 * the project provides a web site, mailing lists and IRC
12 * channels for your use.
14 * This program is free software, distributed under the terms of
15 * the GNU General Public License Version 2. See the LICENSE file
16 * at the top of the source tree.
22 #include "asterisk/threadpool.h"
23 #include "asterisk/taskprocessor.h"
25 #define THREAD_BUCKETS 89
27 static int id_counter;
29 struct ast_threadpool {
30 struct ast_threadpool_listener *threadpool_listener;
31 struct ao2_container *active_threads;
32 struct ao2_container *idle_threads;
33 struct ao2_container *zombie_threads;
42 struct worker_thread {
47 struct ast_threadpool *pool;
48 enum worker_state state;
52 static int worker_thread_hash(const void *obj)
54 struct worker_thread *worker= obj;
59 static int worker_thread_cmp(void *obj, void *arg, int flags)
61 struct worker_thread *worker1 = obj;
62 struct worker_thread *worker2 = arg;
64 return worker1->id == worker2->id ? CMP_MATCH : 0;
67 static worker_thread *worker_thread_alloc(struct ast_threadpool *pool)
69 struct worker_thread *worker = ao2_alloc(1, sizeof(*worker));
74 worker->id = ast_atomic_fetchadd_int(&id_counter, 1);
75 ast_mutex_init(&worker->lock);
76 ast_cond_init(&worker->cond, NULL);
78 worker->thread = AST_PTHREADT_NULL;
79 worker->state = ALIVE;
80 if (ast_pthread_create(&worker->thread, NULL, worker_active, worker) < 0) {
88 static void threadpool_send_state_changed(struct ast_threadpool *pool)
90 int active_size = ao2_container_count(pool->active_threads);
91 int idle_size = ao2_container_count(pool->idle_threads);
92 int zombie_size = ao2_container_count(pool->zombie_threads);
94 pool->listener->callbacks->state_changed(pool->listener, active_size, idle_size, zombie_size);
97 struct thread_worker_pair {
98 struct ast_threadpool *pool;
99 struct worker_thread *worker;
102 static void thread_worker_pair_destructor(void *obj)
104 struct thread_worker_pair *pair = obj;
105 ao2_ref(pair->pool, -1);
106 ao2_ref(pair->worker, -1);
109 struct thread_worker_pair *thread_worker_pair_init(struct ast_threadpool *pool,
110 struct worker_thread *worker)
112 struct thread_worker_pair *pair = ao2_alloc(sizeof(*pair), thread_worker_pair_destructor);
117 pair->pool = ao2_ref(pool);
118 pair->worker = ao2_ref(worker);
122 static int queued_active_thread_idle(void *data)
124 struct thread_worker_pair *pair = data;
126 ao2_link(pair->pool->idle_threads, pair->worker);
127 ao2_unlink(pair->pool->active_threads, pair->worker);
129 threadpool_send_state_changed(pair->pool);
135 static void threadpool_active_thread_idle(struct ast_threadpool *pool,
136 struct worker_thread *worker)
138 struct thread_worker_pair *pair = thread_worker_pair_init(pool, worker);
143 ast_taskprocessor_push(pool->control_tps, queued_active_thread_idle(pair));
146 static int queued_zombie_thread_dead(void *data)
148 struct thread_worker_pair *pair = data;
150 ao2_unlink(pair->pool->zombie_threads, pair->worker);
151 threadpool_send_state_changed(pair->pool);
157 static void threadpool_zombie_thread_dead(struct ast_threadpool *pool,
158 struct worker_thread *worker)
160 struct thread_worker_pair *pair = thread_worker_pair_init(pool, worker);
165 ast_taskprocessor_push(pool->control_tps, queued_zombie_thread_dead(pair));
168 static int worker_idle(struct worker_thread *worker)
170 SCOPED_MUTEX(lock, &worker->lock);
171 if (worker->state != ALIVE) {
174 threadpool_active_thread_idle(worker->pool, worker);
175 while (!worker->wake_up) {
176 ast_cond_wait(&worker->cond, lock);
178 worker->wake_up = false;
179 return worker->state == ALIVE;
182 static int worker_active(struct worker_thread *worker)
186 if (threadpool_execute(worker->pool)) {
187 alive = worker_idle(worker);
191 /* Reaching this portion means the thread is
192 * on death's door. It may have been killed while
193 * it was idle, in which case it can just die
194 * peacefully. If it's a zombie, though, then
195 * it needs to let the pool know so
196 * that the thread can be removed from the
197 * list of zombie threads.
199 if (worker->state == ZOMBIE) {
200 threadpool_zombie_thread_dead(worker->pool, worker);
207 static void *threadpool_tps_listener_alloc(struct ast_taskprocessor_listener *listener)
209 RAII_VAR(ast_threadpool *, pool,
210 ao2_alloc(sizeof(*pool), threadpool_destroy), ao2_cleanup);
212 pool->control_tps = ast_taskprocessor_get(/* XXX ??? */, TPS_REF_DEFAULT);
213 if (!pool->control_tps) {
216 pool->active_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
217 if (!pool->active_threads) {
220 pool->idle_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
221 if (!pool->idle_threads) {
224 pool->zombie_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
225 if (!pool->zombie_thread) {
233 static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener)
238 static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener)
243 static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener)
248 static void threadpool_tps_listener_destroy(struct ast_taskprocessor_listener *listener)
253 static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks = {
254 .alloc = threadpool_tps_listener_alloc,
255 .task_pushed = threadpool_tps_task_pushed,
256 .emptied = threadpool_tps_emptied,
257 .shutdown = threadpool_tps_shutdown,
258 .destroy = threadpool_tps_listener_destroy,
262 * \brief Allocate the taskprocessor to be used for the threadpool
264 * We use a custom taskprocessor listener. We allocate our custom
265 * listener and then create a taskprocessor.
267 static struct ast_taskprocessor_listener *threadpool_tps_alloc(void)
269 RAII_VAR(struct threadpool_tps_listener *, tps_listener,
270 ast_taskprocessor_listener_alloc(&threadpool_tps_listener_callbacks),
277 return ast_taskprocessor_create_with_listener(tps_listener);
280 static void grow(struct ast_threadpool *pool, int delta)
283 for (i = 0; i < delta; ++i) {
284 struct worker_thread *worker = worker_thread_alloc(pool);
289 ao2_link(pool->active_threads, worker);
293 static int kill_threads(void *obj, void *arg, int flags)
295 int *num_to_kill = arg;
297 if ((*num_to_kill)-- > 0) {
304 static int zombify_threads(void *obj, void *arg, void *data, int flags)
306 struct worker_thread *worker = obj;
307 struct ast_threadpool *pool = arg;
308 int *num_to_zombify = data;
310 if ((*num_to_zombify)-- > 0) {
311 ao2_link(pool->zombie_threads, worker);
318 static void shrink(struct ast_threadpool *pool, int delta)
321 * Preference is to kill idle threads, but
322 * we'll move on to deactivating active threads
325 int idle_threads = ao2_container_count(pool->idle_threads);
326 int idle_threads_to_kill = MIN(delta, idle_threads);
327 int active_threads_to_zombify = delta - idle_threads_to_kill;
330 ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE | OBJ_NOLOCK,
331 kill_threads, &idle_threads_to_kill);
333 ao2_callback_data(pool->active_threads, OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE | OBJ_NOLOCK,
334 zombify_threads, pool, &active_threads_to_zombify);
337 struct set_size_data {
338 struct threadpool *pool;
342 void set_size_data_destroy(void *obj)
344 struct set_size_data *ssd = obj;
345 ao2_ref(ssd->pool, -1);
348 static int queued_set_size(void *data)
350 struct set_size_data *ssd = data;
351 struct ast_threadpool *pool = ssd->pool;
352 int num_threads = ssd->size;
354 /* We don't count zombie threads as being "live when potentially resizing */
355 int current_size = ao2_container_count(pool->active_threads) +
356 ao2_container_count(pool->idle_threads);
358 if (current_size = num_threads) {
362 if (current_size < num_threads) {
363 grow(pool, num_threads - current_size);
365 shrink(pool, current_size - num_threads);
368 threadpool_send_state_changed(pool);
369 ao2_ref(set_size_data, -1);
372 void ast_threadpool_set_size(struct ast_threadpool *pool, int size)
374 struct set_size_data *ssd;
376 ast_log(LOG_WARNING, "Invalid threadpool size used for resizing: %d\n", size);
380 ssd = ao2_alloc(sizeof(*ssd), set_size_data_destroy);
386 ssd->pool = ao2_ref(pool);
389 ast_taskprocessor_push(pool->control_tps, queued_set_size, ssd);
392 struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *listener, int initial_size)
394 struct ast_threadpool *pool;
395 RAII_VAR(ast_taskprocessor *, tps, threadpool_tps_alloc(), ast_taskprocessor_unreference);
401 pool = tps->listener->private_data;
403 ast_threadpool_set_size(pool, initial_size);