Commit some progress towards threadpools.
[asterisk/asterisk.git] / main / threadpool.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2012, Digium, Inc.
5  *
6  * Mark Michelson <mmmichelson@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19
20 #include "asterisk.h"
21
22 #include "asterisk/threadpool.h"
23 #include "asterisk/taskprocessor.h"
24
25 struct ast_threadpool;
26
27 enum worker_state {
28         ALIVE,
29         ZOMBIE,
30         DEAD,
31 };
32
33 struct worker_thread {
34         ast_cond_t cond;
35         ast_mutex_t lock;
36         pthread_t thread;
37         struct ast_threadpool *pool;
38         AST_LIST_ENTRY(struct worker_thread) next;
39         int wake_up;
40         enum worker_state state;
41 };
42
43 static int worker_idle(struct worker_thread *worker)
44 {
45         SCOPED_MUTEX(lock, &worker->lock);
46         if (worker->state != ALIVE) {
47                 return false;
48         }
49         threadpool_active_thread_idle(worker->pool, worker);
50         while (!worker->wake_up) {
51                 ast_cond_wait(&worker->cond, lock);
52         }
53         worker->wake_up = false;
54         return worker->state == ALIVE;
55 }
56
57 static int worker_active(struct worker_thread *worker)
58 {
59         int alive = 1;
60         while (alive) {
61                 if (threadpool_execute(worker->pool)) {
62                         alive = worker_idle(worker);
63                 }
64         }
65
66         /* Reaching this portion means the thread is
67          * on death's door. It may have been killed while
68          * it was idle, in which case it can just die
69          * peacefully. If it's a zombie, though, then
70          * it needs to let the pool know so
71          * that the thread can be removed from the
72          * list of zombie threads.
73          */
74         if (worker->state == ZOMBIE) {
75                 threadpool_zombie_thread_dead(worker->pool, worker);
76         }
77
78         return 0;
79 }
80
81 struct ast_threadpool {
82         struct ast_threadpool_listener *threadpool_listener;
83         int active_threads;
84         int idle_threads;
85         int zombie_threads;
86 }
87
88 static void *threadpool_tps_listener_alloc(struct ast_taskprocessor_listener *listener)
89 {
90         RAII_VAR(ast_threadpool *, threadpool,
91                         ao2_alloc(sizeof(*threadpool), threadpool_destroy), ao2_cleanup);
92
93         return threadpool;
94 }
95
96 static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener)
97 {
98         /* XXX stub */
99 }
100
101 static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener)
102 {
103         /* XXX stub */
104 }
105
106 static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener)
107 {
108         /* XXX stub */
109 }
110
111 static void threadpool_tps_listener_destroy(struct ast_taskprocessor_listener *listener)
112 {
113         /* XXX stub */
114 }
115
116 static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks = {
117         .alloc = threadpool_tps_listener_alloc,
118         .task_pushed = threadpool_tps_task_pushed,
119         .emptied = threadpool_tps_emptied,
120         .shutdown = threadpool_tps_shutdown,
121         .destroy = threadpool_tps_listener_destroy,
122 };
123
124 /*!
125  * \brief Allocate the taskprocessor to be used for the threadpool
126  *
127  * We use a custom taskprocessor listener. We allocate our custom
128  * listener and then create a taskprocessor.
129  */
130 static struct ast_taskprocessor_listener *threadpool_tps_alloc(void)
131 {
132         RAII_VAR(struct threadpool_tps_listener *, tps_listener,
133                         ast_taskprocessor_listener_alloc(&threadpool_tps_listener_callbacks),
134                         ao2_cleanup);
135
136         if (!tps_listener) {
137                 return NULL;
138         }
139
140         return ast_taskprocessor_create_with_listener(tps_listener);
141 }
142
143 void ast_threadpool_set_size(struct ast_threadpool *pool, int size)
144 {
145 }
146
147 struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *listener, int initial_size)
148 {
149         struct ast_threadpool *pool;
150         RAII_VAR(ast_taskprocessor *, tps, threadpool_tps_alloc(), ast_taskprocessor_unreference);
151
152         if (!tps) {
153                 return NULL;
154         }
155
156         pool = tps->listener->private_data;
157         pool->tps = tps;
158         ast_threadpool_set_size(pool, initial_size);
159
160         return pool;
161 }