Some more progress.
[asterisk/asterisk.git] / main / threadpool.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2012, Digium, Inc.
5  *
6  * Mark Michelson <mmmichelson@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19
20 #include "asterisk.h"
21
22 #include "asterisk/threadpool.h"
23 #include "asterisk/taskprocessor.h"
24
25 #define THREAD_BUCKETS 89
26
27 static int id_counter;
28
29 struct ast_threadpool {
30         struct ast_threadpool_listener *threadpool_listener;
31         struct ao2_container *active_threads;
32         struct ao2_container *idle_threads;
33         struct ao2_container *zombie_threads;
34 }
35
36 enum worker_state {
37         ALIVE,
38         ZOMBIE,
39         DEAD,
40 };
41
42 struct worker_thread {
43         int id;
44         ast_cond_t cond;
45         ast_mutex_t lock;
46         pthread_t thread;
47         struct ast_threadpool *pool;
48         enum worker_state state;
49         int wake_up;
50 };
51
52 static int worker_thread_hash(const void *obj)
53 {
54         struct worker_thread *worker= obj;
55
56         return worker->id;
57 }
58
59 static int worker_thread_cmp(void *obj, void *arg, int flags)
60 {
61         struct worker_thread *worker1 = obj;
62         struct worker_thread *worker2 = arg;
63
64         return worker1->id == worker2->id ? CMP_MATCH : 0;
65 }
66
67 static worker_thread *worker_thread_alloc(struct ast_threadpool *pool)
68 {
69         struct worker_thread *worker = ao2_alloc(1, sizeof(*worker));
70         if (!worker) {
71                 /* XXX Dangit! */
72                 return NULL;
73         }
74         worker->id = ast_atomic_fetchadd_int(&id_counter, 1);
75         ast_mutex_init(&worker->lock);
76         ast_cond_init(&worker->cond, NULL);
77         worker->pool = pool;
78         worker->thread = AST_PTHREADT_NULL;
79         worker->state = ALIVE;
80         if (ast_pthread_create(&worker->thread, NULL, worker_active, worker) < 0) {
81                 /* XXX Poop! */
82                 ao2_ref(worker, -1);
83                 return NULL;
84         }
85         return worker;
86 }
87
88 static void threadpool_send_state_changed(struct ast_threadpool *pool)
89 {
90         int active_size = ao2_container_count(pool->active_threads);
91         int idle_size = ao2_container_count(pool->idle_threads);
92         int zombie_size = ao2_container_count(pool->zombie_threads);
93
94         pool->listener->callbacks->state_changed(pool->listener, active_size, idle_size, zombie_size);
95 }
96
97 struct thread_worker_pair {
98         struct ast_threadpool *pool;
99         struct worker_thread *worker;
100 };
101
102 static void thread_worker_pair_destructor(void *obj)
103 {
104         struct thread_worker_pair *pair = obj;
105         ao2_ref(pair->pool, -1);
106         ao2_ref(pair->worker, -1);
107 }
108
109 struct thread_worker_pair *thread_worker_pair_init(struct ast_threadpool *pool,
110                 struct worker_thread *worker)
111 {
112         struct thread_worker_pair *pair = ao2_alloc(sizeof(*pair), thread_worker_pair_destructor);
113         if (!pair) {
114                 /*XXX Crap */
115                 return NULL;
116         }
117         pair->pool = ao2_ref(pool);
118         pair->worker = ao2_ref(worker);
119         return pair;
120 }
121
122 static int queued_active_thread_idle(void *data)
123 {
124         struct thread_worker_pair *pair = data;
125
126         ao2_link(pair->pool->idle_threads, pair->worker);
127         ao2_unlink(pair->pool->active_threads, pair->worker);
128
129         threadpool_send_state_changed(pair->pool);
130
131         ao2_ref(pair, -1);
132         return 0;
133 }
134
135 static void threadpool_active_thread_idle(struct ast_threadpool *pool,
136                 struct worker_thread *worker)
137 {
138         struct thread_worker_pair *pair = thread_worker_pair_init(pool, worker);
139         if (!pair) {
140                 /*XXX Crap */
141                 return;
142         }
143         ast_taskprocessor_push(pool->control_tps, queued_active_thread_idle(pair));
144 }
145
146 static int queued_zombie_thread_dead(void *data)
147 {
148         struct thread_worker_pair *pair = data;
149
150         ao2_unlink(pair->pool->zombie_threads, pair->worker);
151         threadpool_send_state_changed(pair->pool);
152
153         ao2_ref(pair, -1);
154         return 0;
155 }
156
157 static void threadpool_zombie_thread_dead(struct ast_threadpool *pool,
158                 struct worker_thread *worker)
159 {
160         struct thread_worker_pair *pair = thread_worker_pair_init(pool, worker);
161         if (!pair) {
162                 /* XXX Crap */
163                 return;
164         }
165         ast_taskprocessor_push(pool->control_tps, queued_zombie_thread_dead(pair));
166 }
167
168 static int worker_idle(struct worker_thread *worker)
169 {
170         SCOPED_MUTEX(lock, &worker->lock);
171         if (worker->state != ALIVE) {
172                 return false;
173         }
174         threadpool_active_thread_idle(worker->pool, worker);
175         while (!worker->wake_up) {
176                 ast_cond_wait(&worker->cond, lock);
177         }
178         worker->wake_up = false;
179         return worker->state == ALIVE;
180 }
181
182 static int worker_active(struct worker_thread *worker)
183 {
184         int alive = 1;
185         while (alive) {
186                 if (threadpool_execute(worker->pool)) {
187                         alive = worker_idle(worker);
188                 }
189         }
190
191         /* Reaching this portion means the thread is
192          * on death's door. It may have been killed while
193          * it was idle, in which case it can just die
194          * peacefully. If it's a zombie, though, then
195          * it needs to let the pool know so
196          * that the thread can be removed from the
197          * list of zombie threads.
198          */
199         if (worker->state == ZOMBIE) {
200                 threadpool_zombie_thread_dead(worker->pool, worker);
201         }
202
203         return 0;
204 }
205
206
207 static void *threadpool_tps_listener_alloc(struct ast_taskprocessor_listener *listener)
208 {
209         RAII_VAR(ast_threadpool *, pool,
210                         ao2_alloc(sizeof(*pool), threadpool_destroy), ao2_cleanup);
211
212         pool->control_tps = ast_taskprocessor_get(/* XXX ??? */, TPS_REF_DEFAULT);
213         if (!pool->control_tps) {
214                 return NULL;
215         }
216         pool->active_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
217         if (!pool->active_threads) {
218                 return NULL;
219         }
220         pool->idle_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
221         if (!pool->idle_threads) {
222                 return NULL;
223         }
224         pool->zombie_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
225         if (!pool->zombie_thread) {
226                 return NULL;
227         }
228
229         ao2_ref(pool, +1);
230         return pool;
231 }
232
233 static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener)
234 {
235         /* XXX stub */
236 }
237
238 static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener)
239 {
240         /* XXX stub */
241 }
242
243 static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener)
244 {
245         /* XXX stub */
246 }
247
248 static void threadpool_tps_listener_destroy(struct ast_taskprocessor_listener *listener)
249 {
250         /* XXX stub */
251 }
252
253 static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks = {
254         .alloc = threadpool_tps_listener_alloc,
255         .task_pushed = threadpool_tps_task_pushed,
256         .emptied = threadpool_tps_emptied,
257         .shutdown = threadpool_tps_shutdown,
258         .destroy = threadpool_tps_listener_destroy,
259 };
260
261 /*!
262  * \brief Allocate the taskprocessor to be used for the threadpool
263  *
264  * We use a custom taskprocessor listener. We allocate our custom
265  * listener and then create a taskprocessor.
266  */
267 static struct ast_taskprocessor_listener *threadpool_tps_alloc(void)
268 {
269         RAII_VAR(struct threadpool_tps_listener *, tps_listener,
270                         ast_taskprocessor_listener_alloc(&threadpool_tps_listener_callbacks),
271                         ao2_cleanup);
272
273         if (!tps_listener) {
274                 return NULL;
275         }
276
277         return ast_taskprocessor_create_with_listener(tps_listener);
278 }
279
280 static void grow(struct ast_threadpool *pool, int delta)
281 {
282         int i;
283         for (i = 0; i < delta; ++i) {
284                 struct worker_thread *worker = worker_thread_alloc(pool);
285                 if (!worker) {
286                         /* XXX Abandon */
287                         return;
288                 }
289                 ao2_link(pool->active_threads, worker);
290         }
291 }
292
293 static int kill_threads(void *obj, void *arg, int flags)
294 {
295         int *num_to_kill = arg;
296
297         if ((*num_to_kill)-- > 0) {
298                 return CMP_MATCH;
299         } else {
300                 return CMP_STOP;
301         }
302 }
303
304 static int zombify_threads(void *obj, void *arg, void *data, int flags)
305 {
306         struct worker_thread *worker = obj;
307         struct ast_threadpool *pool = arg;
308         int *num_to_zombify = data;
309
310         if ((*num_to_zombify)-- > 0) {
311                 ao2_link(pool->zombie_threads, worker);
312                 return CMP_MATCH;
313         } else {
314                 return CMP_STOP;
315         }
316 }
317
318 static void shrink(struct ast_threadpool *pool, int delta)
319 {
320         /* 
321          * Preference is to kill idle threads, but
322          * we'll move on to deactivating active threads
323          * if we have to
324          */
325         int idle_threads = ao2_container_count(pool->idle_threads);
326         int idle_threads_to_kill = MIN(delta, idle_threads);
327         int active_threads_to_zombify = delta - idle_threads_to_kill;
328         int i = 0;
329
330         ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE | OBJ_NOLOCK,
331                         kill_threads, &idle_threads_to_kill);
332
333         ao2_callback_data(pool->active_threads, OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE | OBJ_NOLOCK,
334                         zombify_threads, pool, &active_threads_to_zombify);
335 }
336
337 struct set_size_data {
338         struct threadpool *pool;
339         int size;
340 };
341
342 void set_size_data_destroy(void *obj)
343 {
344         struct set_size_data *ssd = obj;
345         ao2_ref(ssd->pool, -1);
346 }
347
348 static int queued_set_size(void *data)
349 {
350         struct set_size_data *ssd = data;
351         struct ast_threadpool *pool = ssd->pool;
352         int num_threads = ssd->size;
353
354         /* We don't count zombie threads as being "live when potentially resizing */
355         int current_size = ao2_container_count(pool->active_threads) +
356                 ao2_container_count(pool->idle_threads);
357
358         if (current_size = num_threads) {
359                 return 0;
360         }
361
362         if (current_size < num_threads) {
363                 grow(pool, num_threads - current_size);
364         } else {
365                 shrink(pool, current_size - num_threads);
366         }
367
368         threadpool_send_state_changed(pool);
369         ao2_ref(set_size_data, -1);
370 }
371
372 void ast_threadpool_set_size(struct ast_threadpool *pool, int size)
373 {
374         struct set_size_data *ssd;
375         if (size < 0) {
376                 ast_log(LOG_WARNING, "Invalid threadpool size used for resizing: %d\n", size);
377                 return;
378         }
379
380         ssd = ao2_alloc(sizeof(*ssd), set_size_data_destroy);
381         if (!ssd) {
382                 /* XXX Crap */
383                 return;
384         }
385
386         ssd->pool = ao2_ref(pool);
387         ssd->size = size;
388
389         ast_taskprocessor_push(pool->control_tps, queued_set_size, ssd);
390 }
391
392 struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *listener, int initial_size)
393 {
394         struct ast_threadpool *pool;
395         RAII_VAR(ast_taskprocessor *, tps, threadpool_tps_alloc(), ast_taskprocessor_unreference);
396
397         if (!tps) {
398                 return NULL;
399         }
400
401         pool = tps->listener->private_data;
402         pool->tps = tps;
403         ast_threadpool_set_size(pool, initial_size);
404
405         return pool;
406 }