Add ability to pass arbitrary data to the ao2_callback_fn (called from
[asterisk/asterisk.git] / res / res_timing_pthread.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2008, Digium, Inc.
5  *
6  * Russell Bryant <russell@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! 
20  * \file
21  * \author Russell Bryant <russell@digium.com>
22  *
23  * \brief pthread timing interface 
24  */
25
26 #include "asterisk.h"
27
28 ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
29
30 #include <math.h>
31 #include <sys/select.h>
32
33 #include "asterisk/module.h"
34 #include "asterisk/timing.h"
35 #include "asterisk/utils.h"
36 #include "asterisk/astobj2.h"
37 #include "asterisk/time.h"
38 #include "asterisk/lock.h"
39
40 static void *timing_funcs_handle;
41
42 static int pthread_timer_open(void);
43 static void pthread_timer_close(int handle);
44 static int pthread_timer_set_rate(int handle, unsigned int rate);
45 static void pthread_timer_ack(int handle, unsigned int quantity);
46 static int pthread_timer_enable_continuous(int handle);
47 static int pthread_timer_disable_continuous(int handle);
48 static enum ast_timing_event pthread_timer_get_event(int handle);
49 static unsigned int pthread_timer_get_max_rate(int handle);
50
51 static struct ast_timing_functions pthread_timing_functions = {
52         .timer_open = pthread_timer_open,
53         .timer_close = pthread_timer_close,
54         .timer_set_rate = pthread_timer_set_rate,
55         .timer_ack = pthread_timer_ack,
56         .timer_enable_continuous = pthread_timer_enable_continuous,
57         .timer_disable_continuous = pthread_timer_disable_continuous,
58         .timer_get_event = pthread_timer_get_event,
59         .timer_get_max_rate = pthread_timer_get_max_rate,
60 };
61
62 /* 1 tick / 10 ms */
63 #define MAX_RATE 100
64
65 static struct ao2_container *pthread_timers;
66 #define PTHREAD_TIMER_BUCKETS 563
67
68 enum {
69         PIPE_READ =  0,
70         PIPE_WRITE = 1
71 };
72
73 enum pthread_timer_state {
74         TIMER_STATE_IDLE,
75         TIMER_STATE_TICKING,
76         TIMER_STATE_CONTINUOUS,
77 };
78
79 struct pthread_timer {
80         int pipe[2];
81         enum pthread_timer_state state;
82         unsigned int rate;
83         /*! Interval in ms for current rate */
84         unsigned int interval;
85         unsigned int tick_count;
86         struct timeval start;
87 };
88
89 static void pthread_timer_destructor(void *obj);
90 static struct pthread_timer *find_timer(int handle, int unlinkobj);
91 static void write_byte(int wr_fd);
92 static void read_pipe(int rd_fd, unsigned int num, int clear);
93
94 /*!
95  * \brief Data for the timing thread
96  */
97 static struct {
98         pthread_t thread;
99         ast_mutex_t lock;
100         ast_cond_t cond;
101         unsigned int stop:1;
102 } timing_thread;
103
104 static int pthread_timer_open(void)
105 {
106         struct pthread_timer *timer;
107         int fd;
108
109         if (!(timer = ao2_alloc(sizeof(*timer), pthread_timer_destructor))) {
110                 errno = ENOMEM;
111                 return -1;
112         }
113
114         timer->pipe[PIPE_READ] = timer->pipe[PIPE_WRITE] = -1;
115         timer->state = TIMER_STATE_IDLE;
116
117         if (pipe(timer->pipe)) {
118                 ao2_ref(timer, -1);
119                 return -1;
120         }
121
122         ao2_lock(pthread_timers);
123         if (!ao2_container_count(pthread_timers)) {
124                 ast_mutex_lock(&timing_thread.lock);
125                 ast_cond_signal(&timing_thread.cond);
126                 ast_mutex_unlock(&timing_thread.lock);
127         }
128         ao2_link(pthread_timers, timer);
129         ao2_unlock(pthread_timers);
130
131         fd = timer->pipe[PIPE_READ];
132
133         ao2_ref(timer, -1);
134
135         return fd;
136 }
137
138 static void pthread_timer_close(int handle)
139 {
140         struct pthread_timer *timer;
141
142         if (!(timer = find_timer(handle, 1))) {
143                 return;
144         }
145
146         ao2_ref(timer, -1);
147 }
148
149 static int pthread_timer_set_rate(int handle, unsigned int rate)
150 {
151         struct pthread_timer *timer;
152
153         if (!(timer = find_timer(handle, 0))) {
154                 errno = EINVAL;
155                 return -1;
156         }
157
158         if (rate > MAX_RATE) {
159                 ast_log(LOG_ERROR, "res_timing_pthread only supports timers at a max rate of %d / sec\n",
160                         MAX_RATE);
161                 errno = EINVAL;
162                 return -1;
163         }
164
165         ao2_lock(timer);
166         timer->rate = rate;
167         timer->state = rate ? TIMER_STATE_TICKING : TIMER_STATE_IDLE;
168         timer->interval = rate ? roundf(1000.0 / ((float) rate)) : 0;
169         timer->start = rate ? ast_tvnow() : ast_tv(0, 0);
170         timer->tick_count = 0;
171         ao2_unlock(timer);
172
173         ao2_ref(timer, -1);
174
175         return 0;
176 }
177
178 static void pthread_timer_ack(int handle, unsigned int quantity)
179 {
180         struct pthread_timer *timer;
181
182         ast_assert(quantity > 0);
183
184         if (!(timer = find_timer(handle, 0))) {
185                 return;
186         }
187
188         if (timer->state == TIMER_STATE_CONTINUOUS) {
189                 /* Leave the pipe alone, please! */
190                 return;
191         }
192
193         read_pipe(timer->pipe[PIPE_READ], quantity, 0);
194
195         ao2_ref(timer, -1);
196 }
197
198 static int pthread_timer_enable_continuous(int handle)
199 {
200         struct pthread_timer *timer;
201
202         if (!(timer = find_timer(handle, 0))) {
203                 errno = EINVAL;
204                 return -1;
205         }
206
207         ao2_lock(timer);
208         timer->state = TIMER_STATE_CONTINUOUS;
209         write_byte(timer->pipe[PIPE_WRITE]);
210         ao2_unlock(timer);
211
212         ao2_ref(timer, -1);
213
214         return 0;
215 }
216
217 static int pthread_timer_disable_continuous(int handle)
218 {
219         struct pthread_timer *timer;
220
221         if (!(timer = find_timer(handle, 0))) {
222                 errno = EINVAL;
223                 return -1;
224         }
225
226         ao2_lock(timer);
227         timer->state = timer->rate ? TIMER_STATE_TICKING : TIMER_STATE_IDLE;
228         read_pipe(timer->pipe[PIPE_READ], 0, 1);
229         ao2_unlock(timer);
230
231         ao2_ref(timer, -1);
232
233         return 0;
234 }
235
236 static enum ast_timing_event pthread_timer_get_event(int handle)
237 {
238         struct pthread_timer *timer;
239         enum ast_timing_event res = AST_TIMING_EVENT_EXPIRED;
240
241         if (!(timer = find_timer(handle, 0))) {
242                 return res;
243         }
244
245         if (timer->state == TIMER_STATE_CONTINUOUS) {
246                 res = AST_TIMING_EVENT_CONTINUOUS;
247         }
248
249         ao2_ref(timer, -1);
250
251         return res;
252 }
253
254 static unsigned int pthread_timer_get_max_rate(int handle)
255 {
256         return MAX_RATE;
257 }
258
259 static struct pthread_timer *find_timer(int handle, int unlinkobj)
260 {
261         struct pthread_timer *timer;
262         struct pthread_timer tmp_timer;
263         int flags = OBJ_POINTER;
264
265         tmp_timer.pipe[PIPE_READ] = handle;
266
267         if (unlinkobj) {
268                 flags |= OBJ_UNLINK;
269         }
270
271         if (!(timer = ao2_find(pthread_timers, &tmp_timer, NULL, flags))) {
272                 ast_assert(timer != NULL);
273                 return NULL;
274         }
275
276         return timer;
277 }
278
279 static void pthread_timer_destructor(void *obj)
280 {
281         struct pthread_timer *timer = obj;
282
283         if (timer->pipe[PIPE_READ] > -1) {
284                 close(timer->pipe[PIPE_READ]);
285                 timer->pipe[PIPE_READ] = -1;
286         }
287
288         if (timer->pipe[PIPE_WRITE] > -1) {
289                 close(timer->pipe[PIPE_WRITE]);
290                 timer->pipe[PIPE_WRITE] = -1;
291         }
292 }
293
294 /*!
295  * \note only PIPE_READ is guaranteed valid 
296  */
297 static int pthread_timer_hash(const void *obj, const int flags)
298 {
299         const struct pthread_timer *timer = obj;
300
301         return timer->pipe[PIPE_READ];
302 }
303
304 /*!
305  * \note only PIPE_READ is guaranteed valid 
306  */
307 static int pthread_timer_cmp(void *obj, void *arg, void *data, int flags)
308 {
309         struct pthread_timer *timer1 = obj, *timer2 = arg;
310
311         return (timer1->pipe[PIPE_READ] == timer2->pipe[PIPE_READ]) ? CMP_MATCH | CMP_STOP : 0;
312 }
313
314 /*!
315  * \retval 0 no timer tick needed
316  * \retval non-zero write to the timing pipe needed
317  */
318 static int check_timer(struct pthread_timer *timer)
319 {
320         struct timeval now;
321
322         if (timer->state == TIMER_STATE_IDLE || timer->state == TIMER_STATE_CONTINUOUS) {
323                 return 0;       
324         }
325         
326         now = ast_tvnow();
327
328         if (timer->tick_count < (ast_tvdiff_ms(now, timer->start) / timer->interval)) {
329                 timer->tick_count++;
330                 if (!timer->tick_count) {
331                         timer->start = now;
332                 }
333                 return 1;
334         }
335
336         return 0;
337 }
338
339 static void read_pipe(int rd_fd, unsigned int quantity, int clear)
340 {
341         ast_assert(quantity || clear);
342
343         if (!quantity && clear) {
344                 quantity = 1;
345         }
346
347         do {
348                 unsigned char buf[1024];
349                 ssize_t res;
350                 fd_set rfds;
351                 struct timeval timeout = {
352                         .tv_sec = 0,
353                 };
354
355                 /* Make sure there is data to read */
356                 FD_ZERO(&rfds);
357                 FD_SET(rd_fd, &rfds);
358
359                 if (select(rd_fd + 1, &rfds, NULL, NULL, &timeout) != 1) {
360                         break;
361                 }
362
363                 res = read(rd_fd, buf, 
364                         (quantity < sizeof(buf)) ? quantity : sizeof(buf));
365
366                 if (res == -1) {
367                         if (errno == EAGAIN) {
368                                 continue;
369                         }
370                         ast_log(LOG_ERROR, "read failed on timing pipe: %s\n", strerror(errno));
371                         break;
372                 }
373
374                 if (clear) {
375                         continue;
376                 }
377
378                 quantity -= res;
379         } while (quantity);
380 }
381
382 static void write_byte(int wr_fd)
383 {
384         do {
385                 ssize_t res;
386                 unsigned char x = 42;
387
388                 res = write(wr_fd, &x, 1); 
389
390                 if (res == -1) {
391                         if (errno == EAGAIN) {
392                                 continue;
393                         }
394                         ast_log(LOG_ERROR, "Error writing to timing pipe: %s\n", strerror(errno));
395                 }
396         } while (0);
397 }
398
399 static int run_timer(void *obj, void *arg, void *data, int flags)
400 {
401         struct pthread_timer *timer = obj;
402
403         if (timer->state == TIMER_STATE_IDLE) {
404                 return 0;
405         }
406
407         ao2_lock(timer);
408
409         if (check_timer(timer)) {
410                 write_byte(timer->pipe[PIPE_WRITE]);
411         }
412         
413         ao2_unlock(timer);
414
415         return 0;
416 }
417
418 static void *do_timing(void *arg)
419 {
420         struct timeval next_wakeup = ast_tvnow();
421
422         while (!timing_thread.stop) {
423                 struct timespec ts = { 0, };
424
425                 ao2_callback(pthread_timers, OBJ_NODATA, run_timer, NULL, NULL);
426
427                 next_wakeup = ast_tvadd(next_wakeup, ast_tv(0, 5000));
428
429                 ts.tv_sec = next_wakeup.tv_sec;
430                 ts.tv_nsec = next_wakeup.tv_usec * 1000;
431
432                 ast_mutex_lock(&timing_thread.lock);
433                 if (!timing_thread.stop) {
434                         if (ao2_container_count(pthread_timers)) {
435                                 ast_cond_timedwait(&timing_thread.cond, &timing_thread.lock, &ts);
436                         } else {
437                                 ast_cond_wait(&timing_thread.cond, &timing_thread.lock);
438                         }
439                 }
440                 ast_mutex_unlock(&timing_thread.lock);
441         }
442
443         return NULL;
444 }
445
446 static int init_timing_thread(void)
447 {
448         ast_mutex_init(&timing_thread.lock);
449         ast_cond_init(&timing_thread.cond, NULL);
450
451         if (ast_pthread_create_background(&timing_thread.thread, NULL, do_timing, NULL)) {
452                 ast_log(LOG_ERROR, "Unable to start timing thread.\n");
453                 return -1;
454         }
455
456         return 0;
457 }
458
459 static int load_module(void)
460 {
461         if (!(pthread_timers = ao2_container_alloc(PTHREAD_TIMER_BUCKETS, 
462                 pthread_timer_hash, pthread_timer_cmp))) {
463                 return AST_MODULE_LOAD_DECLINE;
464         }
465
466         if (init_timing_thread()) {
467                 ao2_ref(pthread_timers, -1);
468                 pthread_timers = NULL;
469                 return AST_MODULE_LOAD_DECLINE;
470         }
471
472         return (timing_funcs_handle = ast_install_timing_functions(&pthread_timing_functions)) ?
473                 AST_MODULE_LOAD_SUCCESS : AST_MODULE_LOAD_DECLINE;
474 }
475
476 static int unload_module(void)
477 {
478 #if 0
479         /* XXX code to stop the timing thread ... */
480
481         ast_uninstall_timing_functions(timing_funcs_handle);
482         ao2_ref(pthread_timers, -1);
483 #endif
484
485         /* This module can not currently be unloaded.  No use count handling is being done. */
486
487         return -1;
488 }
489
490 AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "pthread Timing Interface");