Merge res_timing_pthread. This is a timing interface for Asterisk that
[asterisk/asterisk.git] / res / res_timing_pthread.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2008, Digium, Inc.
5  *
6  * Russell Bryant <russell@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! 
20  * \file
21  * \author Russell Bryant <russell@digium.com>
22  *
23  * \brief pthread timing interface 
24  */
25
26 #include "asterisk.h"
27
28 ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
29
30 #include <math.h>
31 #include <sys/select.h>
32
33 #include "asterisk/module.h"
34 #include "asterisk/timing.h"
35 #include "asterisk/utils.h"
36 #include "asterisk/astobj2.h"
37 #include "asterisk/time.h"
38 #include "asterisk/lock.h"
39
40 static void *timing_funcs_handle;
41
42 static int pthread_timer_open(void);
43 static void pthread_timer_close(int handle);
44 static int pthread_timer_set_rate(int handle, unsigned int rate);
45 static void pthread_timer_ack(int handle, unsigned int quantity);
46 static int pthread_timer_enable_continuous(int handle);
47 static int pthread_timer_disable_continuous(int handle);
48 static enum ast_timing_event pthread_timer_get_event(int handle);
49
50 static struct ast_timing_functions pthread_timing_functions = {
51         .timer_open = pthread_timer_open,
52         .timer_close = pthread_timer_close,
53         .timer_set_rate = pthread_timer_set_rate,
54         .timer_ack = pthread_timer_ack,
55         .timer_enable_continuous = pthread_timer_enable_continuous,
56         .timer_disable_continuous = pthread_timer_disable_continuous,
57         .timer_get_event = pthread_timer_get_event,
58 };
59
60 /* 1 tick / 20 ms */
61 #define TIMING_INTERVAL 20
62 #define MAX_RATE 50
63
64 static struct ao2_container *pthread_timers;
65 #define PTHREAD_TIMER_BUCKETS 563
66
67 enum {
68         PIPE_READ =  0,
69         PIPE_WRITE = 1
70 };
71
72 enum pthread_timer_state {
73         TIMER_STATE_IDLE,
74         TIMER_STATE_TICKING,
75         TIMER_STATE_CONTINUOUS,
76 };
77
78 struct pthread_timer {
79         int pipe[2];
80         enum pthread_timer_state state;
81         unsigned int rate;
82         /*! Interval in ms for current rate */
83         unsigned int interval;
84         unsigned int tick_count;
85         struct timeval start;
86 };
87
88 static void pthread_timer_destructor(void *obj);
89 static struct pthread_timer *find_timer(int handle, int unlink);
90 static void write_byte(int wr_fd);
91 static void read_pipe(int rd_fd, unsigned int num, int clear);
92
93 /*!
94  * \brief Data for the timing thread
95  */
96 static struct {
97         pthread_t thread;
98         ast_mutex_t lock;
99         ast_cond_t cond;
100         unsigned int stop:1;
101 } timing_thread;
102
103 static int pthread_timer_open(void)
104 {
105         struct pthread_timer *timer;
106
107         if (!(timer = ao2_alloc(sizeof(*timer), pthread_timer_destructor))) {
108                 errno = ENOMEM;
109                 return -1;
110         }
111
112         timer->pipe[PIPE_READ] = timer->pipe[PIPE_WRITE] = -1;
113         timer->state = TIMER_STATE_IDLE;
114
115         if (pipe(timer->pipe)) {
116                 ao2_ref(timer, -1);
117                 return -1;
118         }
119
120         ao2_lock(pthread_timers);
121         if (!ao2_container_count(pthread_timers)) {
122                 ast_mutex_lock(&timing_thread.lock);
123                 ast_cond_signal(&timing_thread.cond);
124                 ast_mutex_unlock(&timing_thread.lock);
125         }
126         ao2_link(pthread_timers, timer);
127         ao2_unlock(pthread_timers);
128
129         return timer->pipe[PIPE_READ];
130 }
131
132 static void pthread_timer_close(int handle)
133 {
134         struct pthread_timer *timer;
135
136         if (!(timer = find_timer(handle, 1))) {
137                 return;
138         }
139
140         ao2_ref(timer, -1);
141 }
142
143 static int pthread_timer_set_rate(int handle, unsigned int rate)
144 {
145         struct pthread_timer *timer;
146
147         if (!(timer = find_timer(handle, 0))) {
148                 errno = EINVAL;
149                 return -1;
150         }
151
152         if (rate > 0 && rate < MAX_RATE) {
153                 ast_log(LOG_ERROR, "res_timing_pthread only supports timers at a max rate of %d / sec\n",
154                         MAX_RATE);
155                 errno = EINVAL;
156                 return -1;
157         }
158
159         ao2_lock(timer);
160         timer->rate = rate;
161         timer->state = rate ? TIMER_STATE_TICKING : TIMER_STATE_IDLE;
162         timer->interval = rate ? roundf(1000.0 / ((float) rate)) : 0;
163         timer->start = rate ? ast_tvnow() : ast_tv(0, 0);
164         timer->tick_count = 0;
165         ao2_unlock(timer);
166
167         ao2_ref(timer, -1);
168
169         return 0;
170 }
171
172 static void pthread_timer_ack(int handle, unsigned int quantity)
173 {
174         struct pthread_timer *timer;
175
176         ast_assert(quantity > 0);
177
178         if (!(timer = find_timer(handle, 0))) {
179                 return;
180         }
181
182         if (timer->state == TIMER_STATE_CONTINUOUS) {
183                 /* Leave the pipe alone, please! */
184                 return;
185         }
186
187         read_pipe(timer->pipe[PIPE_READ], quantity, 0);
188
189         ao2_ref(timer, -1);
190 }
191
192 static int pthread_timer_enable_continuous(int handle)
193 {
194         struct pthread_timer *timer;
195
196         if (!(timer = find_timer(handle, 0))) {
197                 errno = EINVAL;
198                 return -1;
199         }
200
201         ao2_lock(timer);
202         timer->state = TIMER_STATE_CONTINUOUS;
203         write_byte(timer->pipe[PIPE_WRITE]);
204         ao2_unlock(timer);
205
206         ao2_ref(timer, -1);
207
208         return 0;
209 }
210
211 static int pthread_timer_disable_continuous(int handle)
212 {
213         struct pthread_timer *timer;
214
215         if (!(timer = find_timer(handle, 0))) {
216                 errno = EINVAL;
217                 return -1;
218         }
219
220         ao2_lock(timer);
221         timer->state = timer->rate ? TIMER_STATE_TICKING : TIMER_STATE_IDLE;
222         read_pipe(timer->pipe[PIPE_READ], 0, 1);
223         ao2_unlock(timer);
224
225         ao2_ref(timer, -1);
226
227         return 0;
228 }
229
230 static enum ast_timing_event pthread_timer_get_event(int handle)
231 {
232         struct pthread_timer *timer;
233         enum ast_timing_event res = AST_TIMING_EVENT_EXPIRED;
234
235         if (!(timer = find_timer(handle, 0))) {
236                 return res;
237         }
238
239         if (timer->state == TIMER_STATE_CONTINUOUS) {
240                 res = AST_TIMING_EVENT_CONTINUOUS;
241         }
242
243         ao2_ref(timer, -1);
244
245         return res;
246 }
247
248 static struct pthread_timer *find_timer(int handle, int unlink)
249 {
250         struct pthread_timer *timer;
251         struct pthread_timer tmp_timer;
252         int flags = OBJ_POINTER;
253
254         tmp_timer.pipe[PIPE_READ] = handle;
255
256         if (unlink) {
257                 flags |= OBJ_UNLINK;
258         }
259
260         if (!(timer = ao2_find(pthread_timers, &tmp_timer, flags))) {
261                 ast_assert(timer != NULL);
262                 return NULL;
263         }
264
265         return timer;
266 }
267
268 static void pthread_timer_destructor(void *obj)
269 {
270         struct pthread_timer *timer = obj;
271
272         if (timer->pipe[PIPE_READ] > -1) {
273                 close(timer->pipe[PIPE_READ]);
274                 timer->pipe[PIPE_READ] = -1;
275         }
276
277         if (timer->pipe[PIPE_WRITE] > -1) {
278                 close(timer->pipe[PIPE_WRITE]);
279                 timer->pipe[PIPE_WRITE] = -1;
280         }
281 }
282
283 /*!
284  * \note only PIPE_READ is guaranteed valid 
285  */
286 static int pthread_timer_hash(const void *obj, const int flags)
287 {
288         const struct pthread_timer *timer = obj;
289
290         return timer->pipe[PIPE_READ];
291 }
292
293 /*!
294  * \note only PIPE_READ is guaranteed valid 
295  */
296 static int pthread_timer_cmp(void *obj, void *arg, int flags)
297 {
298         struct pthread_timer *timer1 = obj, *timer2 = arg;
299
300         return (timer1->pipe[PIPE_READ] == timer2->pipe[PIPE_READ]) ? CMP_MATCH : 0;
301 }
302
303 /*!
304  * \retval 0 no timer tick needed
305  * \retval non-zero write to the timing pipe needed
306  */
307 static int check_timer(struct pthread_timer *timer)
308 {
309         struct timeval now;
310
311         if (timer->state == TIMER_STATE_IDLE || timer->state == TIMER_STATE_CONTINUOUS) {
312                 return 0;       
313         }
314         
315         now = ast_tvnow();
316
317         if (timer->tick_count < (ast_tvdiff_ms(now, timer->start) / timer->interval)) {
318                 timer->tick_count++;
319                 if (!timer->tick_count) {
320                         timer->start = now;
321                 }
322                 return 1;
323         }
324
325         return 0;
326 }
327
328 static void read_pipe(int rd_fd, unsigned int quantity, int clear)
329 {
330
331         ast_assert(quantity || clear);
332
333         if (!quantity && clear) {
334                 quantity = 1;
335         }
336
337         do {
338                 unsigned char buf[1024];
339                 ssize_t res;
340                 fd_set rfds;
341                 struct timeval tv = {
342                         .tv_sec = 0,
343                 };
344
345                 /* Make sure there is data to read */
346                 FD_ZERO(&rfds);
347                 FD_SET(rd_fd, &rfds);
348
349                 if (select(rd_fd + 1, &rfds, NULL, NULL, &tv) != 1) {
350                         break;
351                 }
352
353                 res = read(rd_fd, buf, 
354                         (quantity < sizeof(buf)) ? quantity : sizeof(buf));
355
356                 if (res == -1) {
357                         if (errno == EAGAIN) {
358                                 continue;
359                         }
360                         ast_log(LOG_ERROR, "read failed on timing pipe: %s\n", strerror(errno));
361                         break;
362                 }
363
364                 if (clear) {
365                         continue;
366                 }
367
368                 quantity -= res;
369         } while (quantity);
370 }
371
372 static void write_byte(int wr_fd)
373 {
374         do {
375                 ssize_t res;
376                 unsigned char x = 42;
377
378                 res = write(wr_fd, &x, 1); 
379
380                 if (res == -1) {
381                         if (errno == EAGAIN) {
382                                 continue;
383                         }
384                         ast_log(LOG_ERROR, "Error writing to timing pipe: %s\n", strerror(errno));
385                 }
386         } while (0);
387 }
388
389 static int run_timer(void *obj, void *arg, int flags)
390 {
391         struct pthread_timer *timer = obj;
392
393         if (timer->state == TIMER_STATE_IDLE) {
394                 return 0;
395         }
396
397         ao2_lock(timer);
398
399         if (check_timer(timer)) {
400                 write_byte(timer->pipe[PIPE_WRITE]);
401         }
402         
403         ao2_unlock(timer);
404
405         return 0;
406 }
407
408 static void *do_timing(void *arg)
409 {
410         struct timeval next_wakeup = ast_tvnow();
411
412         while (!timing_thread.stop) {
413                 struct timespec ts = { 0, };
414
415                 ao2_callback(pthread_timers, 0, run_timer, NULL);
416
417                 next_wakeup = ast_tvadd(next_wakeup, ast_tv(0, 10000));
418
419                 ts.tv_sec = next_wakeup.tv_sec;
420                 ts.tv_nsec = next_wakeup.tv_usec * 1000;
421
422                 ast_mutex_lock(&timing_thread.lock);
423                 if (!timing_thread.stop) {
424                         if (ao2_container_count(pthread_timers)) {
425                                 ast_cond_timedwait(&timing_thread.cond, &timing_thread.lock, &ts);
426                         } else {
427                                 ast_cond_wait(&timing_thread.cond, &timing_thread.lock);
428                         }
429                 }
430                 ast_mutex_unlock(&timing_thread.lock);
431         }
432
433         return NULL;
434 }
435
436 static int init_timing_thread(void)
437 {
438         ast_mutex_init(&timing_thread.lock);
439         ast_cond_init(&timing_thread.cond, NULL);
440
441         if (ast_pthread_create_background(&timing_thread.thread, NULL, do_timing, NULL)) {
442                 ast_log(LOG_ERROR, "Unable to start timing thread.\n");
443                 return -1;
444         }
445
446         return 0;
447 }
448
449 static int load_module(void)
450 {
451         if (!(pthread_timers = ao2_container_alloc(PTHREAD_TIMER_BUCKETS, 
452                 pthread_timer_hash, pthread_timer_cmp))) {
453                 return AST_MODULE_LOAD_DECLINE;
454         }
455
456         if (init_timing_thread()) {
457                 ao2_ref(pthread_timers, -1);
458                 pthread_timers = NULL;
459                 return AST_MODULE_LOAD_DECLINE;
460         }
461
462         return (timing_funcs_handle = ast_install_timing_functions(&pthread_timing_functions)) ?
463                 AST_MODULE_LOAD_SUCCESS : AST_MODULE_LOAD_DECLINE;
464 }
465
466 static int unload_module(void)
467 {
468 #if 0
469         /* XXX code to stop the timing thread ... */
470
471         ast_uninstall_timing_functions(timing_funcs_handle);
472         ao2_ref(pthread_timers, -1);
473 #endif
474
475         /* This module can not currently be unloaded.  No use count handling is being done. */
476
477         return -1;
478 }
479
480 AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "pthread Timing Interface");