7b7df4e682aa53e3b6fa0de6dd5a6745b2fadb8a
[asterisk/asterisk.git] / main / threadpool.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2012, Digium, Inc.
5  *
6  * Mark Michelson <mmmichelson@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19
20 #include "asterisk.h"
21
22 #include "asterisk/threadpool.h"
23 #include "asterisk/taskprocessor.h"
24 #include "asterisk/astobj2.h"
25 #include "asterisk/utils.h"
26
27 #define THREAD_BUCKETS 89
28
29 static int id_counter;
30
31 struct ast_threadpool {
32         struct ast_threadpool_listener *listener;
33         struct ao2_container *active_threads;
34         struct ao2_container *idle_threads;
35         struct ao2_container *zombie_threads;
36         struct ast_taskprocessor *tps;
37         struct ast_taskprocessor *control_tps;
38 };
39
40 enum worker_state {
41         ALIVE,
42         ZOMBIE,
43         DEAD,
44 };
45
46 struct worker_thread {
47         int id;
48         ast_cond_t cond;
49         ast_mutex_t lock;
50         pthread_t thread;
51         struct ast_threadpool *pool;
52         enum worker_state state;
53         int wake_up;
54 };
55
56 static int worker_thread_hash(const void *obj, int flags)
57 {
58         const struct worker_thread *worker = obj;
59
60         return worker->id;
61 }
62
63 static int worker_thread_cmp(void *obj, void *arg, int flags)
64 {
65         struct worker_thread *worker1 = obj;
66         struct worker_thread *worker2 = arg;
67
68         return worker1->id == worker2->id ? CMP_MATCH : 0;
69 }
70
71 static void worker_thread_destroy(void *obj)
72 {
73         struct worker_thread *worker = obj;
74         ast_mutex_destroy(&worker->lock);
75         ast_cond_destroy(&worker->cond);
76 }
77
78 static int worker_active(struct worker_thread *worker);
79
80 static void *worker_start(void *arg)
81 {
82         struct worker_thread *worker = arg;
83
84         worker_active(worker);
85         return NULL;
86 }
87
88 static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool)
89 {
90         struct worker_thread *worker = ao2_alloc(sizeof(*worker), worker_thread_destroy);
91         if (!worker) {
92                 /* XXX Dangit! */
93                 return NULL;
94         }
95         worker->id = ast_atomic_fetchadd_int(&id_counter, 1);
96         ast_mutex_init(&worker->lock);
97         ast_cond_init(&worker->cond, NULL);
98         worker->pool = pool;
99         worker->thread = AST_PTHREADT_NULL;
100         worker->state = ALIVE;
101         if (ast_pthread_create(&worker->thread, NULL, worker_start, worker) < 0) {
102                 /* XXX Poop! */
103                 ao2_ref(worker, -1);
104                 return NULL;
105         }
106         return worker;
107 }
108
109 static void threadpool_send_state_changed(struct ast_threadpool *pool)
110 {
111         int active_size = ao2_container_count(pool->active_threads);
112         int idle_size = ao2_container_count(pool->idle_threads);
113         int zombie_size = ao2_container_count(pool->zombie_threads);
114
115         pool->listener->callbacks->state_changed(pool->listener, active_size, idle_size, zombie_size);
116 }
117
118 struct thread_worker_pair {
119         struct ast_threadpool *pool;
120         struct worker_thread *worker;
121 };
122
123 static void thread_worker_pair_destructor(void *obj)
124 {
125         struct thread_worker_pair *pair = obj;
126         ao2_ref(pair->pool, -1);
127         ao2_ref(pair->worker, -1);
128 }
129
130 static struct thread_worker_pair *thread_worker_pair_init(struct ast_threadpool *pool,
131                 struct worker_thread *worker)
132 {
133         struct thread_worker_pair *pair = ao2_alloc(sizeof(*pair), thread_worker_pair_destructor);
134         if (!pair) {
135                 /*XXX Crap */
136                 return NULL;
137         }
138         ao2_ref(pool, +1);
139         pair->pool = pool;
140         ao2_ref(worker, +1);
141         pair->worker = worker;
142         return pair;
143 }
144
145 static int queued_active_thread_idle(void *data)
146 {
147         struct thread_worker_pair *pair = data;
148
149         ao2_link(pair->pool->idle_threads, pair->worker);
150         ao2_unlink(pair->pool->active_threads, pair->worker);
151
152         threadpool_send_state_changed(pair->pool);
153
154         ao2_ref(pair, -1);
155         return 0;
156 }
157
158 static void threadpool_active_thread_idle(struct ast_threadpool *pool,
159                 struct worker_thread *worker)
160 {
161         struct thread_worker_pair *pair = thread_worker_pair_init(pool, worker);
162         if (!pair) {
163                 /*XXX Crap */
164                 return;
165         }
166         ast_taskprocessor_push(pool->control_tps, queued_active_thread_idle, pair);
167 }
168
169 static int queued_zombie_thread_dead(void *data)
170 {
171         struct thread_worker_pair *pair = data;
172
173         ao2_unlink(pair->pool->zombie_threads, pair->worker);
174         threadpool_send_state_changed(pair->pool);
175
176         ao2_ref(pair, -1);
177         return 0;
178 }
179
180 static void threadpool_zombie_thread_dead(struct ast_threadpool *pool,
181                 struct worker_thread *worker)
182 {
183         struct thread_worker_pair *pair = thread_worker_pair_init(pool, worker);
184         if (!pair) {
185                 /* XXX Crap */
186                 return;
187         }
188         ast_taskprocessor_push(pool->control_tps, queued_zombie_thread_dead, pair);
189 }
190
191 static int worker_idle(struct worker_thread *worker)
192 {
193         SCOPED_MUTEX(lock, &worker->lock);
194         if (worker->state != ALIVE) {
195                 return 0;
196         }
197         threadpool_active_thread_idle(worker->pool, worker);
198         while (!worker->wake_up) {
199                 ast_cond_wait(&worker->cond, lock);
200         }
201         worker->wake_up = 0;
202         return worker->state == ALIVE;
203 }
204
205 static int threadpool_execute(struct ast_threadpool *pool)
206 {
207         return ast_taskprocessor_execute(pool->tps);
208 }
209
210 static int worker_active(struct worker_thread *worker)
211 {
212         int alive = 1;
213         while (alive) {
214                 if (threadpool_execute(worker->pool)) {
215                         alive = worker_idle(worker);
216                 }
217         }
218
219         /* Reaching this portion means the thread is
220          * on death's door. It may have been killed while
221          * it was idle, in which case it can just die
222          * peacefully. If it's a zombie, though, then
223          * it needs to let the pool know so
224          * that the thread can be removed from the
225          * list of zombie threads.
226          */
227         if (worker->state == ZOMBIE) {
228                 threadpool_zombie_thread_dead(worker->pool, worker);
229         }
230
231         return 0;
232 }
233
234 static void worker_set_state(struct worker_thread *worker, enum worker_state state)
235 {
236         SCOPED_MUTEX(lock, &worker->lock);
237         worker->state = state;
238         worker->wake_up = 1;
239         ast_cond_signal(&worker->cond);
240 }
241
242 static int worker_shutdown(void *obj, void *arg, int flags)
243 {
244         struct worker_thread *worker = obj;
245
246         worker_set_state(worker, DEAD);
247         if (worker->thread != AST_PTHREADT_NULL) {
248                 pthread_join(worker->thread, NULL);
249                 worker->thread = AST_PTHREADT_NULL;
250         }
251         return 0;
252 }
253
254 static void threadpool_tps_listener_destroy(void *private_data)
255 {
256         struct ast_threadpool *pool = private_data;
257         /* XXX Probably should let the listener know we're being destroyed? */
258
259         /* Threads should all be shut down by now, so this should be a painless
260          * operation
261          */
262         ao2_ref(pool->active_threads, -1);
263         ao2_ref(pool->idle_threads, -1);
264         ao2_ref(pool->zombie_threads, -1);
265         ao2_ref(pool->listener, -1);
266         ao2_ref(pool, -1);
267 }
268
269 static void *threadpool_tps_listener_alloc(struct ast_taskprocessor_listener *listener)
270 {
271         RAII_VAR(struct ast_threadpool *, pool,
272                         ao2_alloc(sizeof(*pool), threadpool_tps_listener_destroy), ao2_cleanup);
273
274         pool->control_tps = ast_taskprocessor_get("CHANGE THIS", TPS_REF_DEFAULT);
275         if (!pool->control_tps) {
276                 return NULL;
277         }
278         pool->active_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
279         if (!pool->active_threads) {
280                 return NULL;
281         }
282         pool->idle_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
283         if (!pool->idle_threads) {
284                 return NULL;
285         }
286         pool->zombie_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
287         if (!pool->zombie_threads) {
288                 return NULL;
289         }
290
291         pool->tps = listener->tps;
292
293         ao2_ref(pool, +1);
294         return pool;
295 }
296
297 struct task_pushed_data {
298         struct ast_threadpool *pool;
299         int was_empty;
300 };
301
302 static void task_pushed_data_destroy(void *obj)
303 {
304         struct task_pushed_data *tpd = obj;
305         ao2_ref(tpd->pool, -1);
306 }
307
308 static struct task_pushed_data *task_pushed_data_alloc(struct ast_threadpool *pool,
309                 int was_empty)
310 {
311         struct task_pushed_data *tpd = ao2_alloc(sizeof(*tpd),
312                         task_pushed_data_destroy);
313
314         if (!tpd) {
315                 return NULL;
316         }
317         ao2_ref(pool, +1);
318         tpd->pool = pool;
319         tpd->was_empty = was_empty;
320         return tpd;
321 }
322
323 static int activate_threads(void *obj, void *arg, int flags)
324 {
325         struct worker_thread *worker = obj;
326         struct ast_threadpool *pool = arg;
327
328         ao2_link(pool->active_threads, worker);
329         worker_set_state(worker, ALIVE);
330         return 0;
331 }
332
333 static int handle_task_pushed(void *data)
334 {
335         struct task_pushed_data *tpd = data;
336         struct ast_threadpool *pool = tpd->pool;
337         int was_empty = tpd->was_empty;
338
339         pool->listener->callbacks->tps_task_pushed(pool->listener, was_empty);
340         ao2_callback(pool->idle_threads, OBJ_UNLINK, activate_threads, pool);
341         ao2_ref(tpd, -1);
342         return 0;
343 }
344
345 static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener,
346                 int was_empty)
347 {
348         struct ast_threadpool *pool = listener->private_data;
349         struct task_pushed_data *tpd = task_pushed_data_alloc(pool, was_empty);
350
351         if (!tpd) {
352                 /* XXX Drat! */
353                 return;
354         }
355
356         ast_taskprocessor_push(pool->control_tps, handle_task_pushed, tpd);
357 }
358
359 static int handle_emptied(void *data)
360 {
361         struct ast_threadpool *pool = data;
362
363         pool->listener->callbacks->emptied(pool->listener);
364         ao2_ref(pool, -1);
365         return 0;
366 }
367
368 static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener)
369 {
370         struct ast_threadpool *pool = listener->private_data;
371
372         ao2_ref(pool, +1);
373         ast_taskprocessor_push(pool->control_tps, handle_emptied, pool);
374 }
375
376 static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener)
377 {
378         /*
379          * The threadpool triggers the taskprocessor to shut down. As a result,
380          * we have the freedom of shutting things down in three stages:
381          *
382          * 1) Before the tasprocessor is shut down
383          * 2) During taskprocessor shutdown (here)
384          * 3) After taskprocessor shutdown
385          *
386          * In the spirit of the taskprocessor shutdown, this would be
387          * where we make sure that all the worker threads are no longer
388          * executing. We could just do this before we even shut down
389          * the taskprocessor, but this feels more "right".
390          */
391
392         struct ast_threadpool *pool = listener->private_data;
393         ao2_callback(pool->active_threads, 0, worker_shutdown, NULL);
394         ao2_callback(pool->idle_threads, 0, worker_shutdown, NULL);
395         ao2_callback(pool->zombie_threads, 0, worker_shutdown, NULL);
396 }
397
398 static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks = {
399         .alloc = threadpool_tps_listener_alloc,
400         .task_pushed = threadpool_tps_task_pushed,
401         .emptied = threadpool_tps_emptied,
402         .shutdown = threadpool_tps_shutdown,
403         .destroy = threadpool_tps_listener_destroy,
404 };
405
406 static void grow(struct ast_threadpool *pool, int delta)
407 {
408         int i;
409         for (i = 0; i < delta; ++i) {
410                 struct worker_thread *worker = worker_thread_alloc(pool);
411                 if (!worker) {
412                         /* XXX Abandon */
413                         return;
414                 }
415                 ao2_link(pool->active_threads, worker);
416         }
417 }
418
419 static int kill_threads(void *obj, void *arg, int flags)
420 {
421         struct worker_thread *worker = obj;
422         int *num_to_kill = arg;
423
424         if ((*num_to_kill)-- > 0) {
425                 worker_shutdown(worker, arg, flags);
426                 return CMP_MATCH;
427         } else {
428                 return CMP_STOP;
429         }
430 }
431
432 static int zombify_threads(void *obj, void *arg, void *data, int flags)
433 {
434         struct worker_thread *worker = obj;
435         struct ast_threadpool *pool = arg;
436         int *num_to_zombify = data;
437
438         if ((*num_to_zombify)-- > 0) {
439                 ao2_link(pool->zombie_threads, worker);
440                 worker_set_state(worker, ZOMBIE);
441                 return CMP_MATCH;
442         } else {
443                 return CMP_STOP;
444         }
445 }
446
447 static void shrink(struct ast_threadpool *pool, int delta)
448 {
449         /* 
450          * Preference is to kill idle threads, but
451          * we'll move on to deactivating active threads
452          * if we have to
453          */
454         int idle_threads = ao2_container_count(pool->idle_threads);
455         int idle_threads_to_kill = MIN(delta, idle_threads);
456         int active_threads_to_zombify = delta - idle_threads_to_kill;
457
458         ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE | OBJ_NOLOCK,
459                         kill_threads, &idle_threads_to_kill);
460
461         ao2_callback_data(pool->active_threads, OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE | OBJ_NOLOCK,
462                         zombify_threads, pool, &active_threads_to_zombify);
463 }
464
465 struct set_size_data {
466         struct ast_threadpool *pool;
467         int size;
468 };
469
470 static void set_size_data_destroy(void *obj)
471 {
472         struct set_size_data *ssd = obj;
473         ao2_ref(ssd->pool, -1);
474 }
475
476 static struct set_size_data *set_size_data_alloc(struct ast_threadpool *pool,
477                 int size)
478 {
479         struct set_size_data *ssd = ao2_alloc(sizeof(*ssd), set_size_data_destroy);
480         if (!ssd) {
481                 /* XXX Crap */
482                 return NULL;
483         }
484
485         ao2_ref(pool, +1);
486         ssd->pool = pool;
487         ssd->size = size;
488         return ssd;
489 }
490
491 static int queued_set_size(void *data)
492 {
493         struct set_size_data *ssd = data;
494         struct ast_threadpool *pool = ssd->pool;
495         int num_threads = ssd->size;
496
497         /* We don't count zombie threads as being "live when potentially resizing */
498         int current_size = ao2_container_count(pool->active_threads) +
499                 ao2_container_count(pool->idle_threads);
500
501         if (current_size == num_threads) {
502                 return 0;
503         }
504
505         if (current_size < num_threads) {
506                 grow(pool, num_threads - current_size);
507         } else {
508                 shrink(pool, current_size - num_threads);
509         }
510
511         threadpool_send_state_changed(pool);
512         ao2_ref(ssd, -1);
513         return 0;
514 }
515
516 void ast_threadpool_set_size(struct ast_threadpool *pool, unsigned int size)
517 {
518         struct set_size_data *ssd;
519
520         ssd = set_size_data_alloc(pool, size);
521         if (!ssd) {
522                 /* XXX *groan* */
523                 return;
524         }
525
526         ast_taskprocessor_push(pool->control_tps, queued_set_size, ssd);
527 }
528
529 struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *listener, int initial_size)
530 {
531         struct ast_threadpool *pool;
532         struct ast_taskprocessor *tps;
533         RAII_VAR(struct ast_taskprocessor_listener *, tps_listener,
534                         ast_taskprocessor_listener_alloc(&threadpool_tps_listener_callbacks),
535                         ao2_cleanup);
536
537         if (!tps_listener) {
538                 return NULL;
539         }
540
541         tps = ast_taskprocessor_create_with_listener("XXX CHANGE THIS XXX", tps_listener);
542
543         if (!tps) {
544                 return NULL;
545         }
546
547         pool = tps_listener->private_data;
548         ast_threadpool_set_size(pool, initial_size);
549         return pool;
550 }
551
552 void ast_threadpool_shutdown(struct ast_threadpool *pool)
553 {
554         /* Pretty simple really. We just shut down the
555          * taskprocessors and everything else just
556          * takes care of itself via the taskprocessor callbacks
557          */
558         ast_taskprocessor_unreference(pool->control_tps);
559         ast_taskprocessor_unreference(pool->tps);
560 }