f0a1d0e983dac078a5cf1f472da61c306852669d
[asterisk/asterisk.git] / res / res_timing_pthread.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2008, Digium, Inc.
5  *
6  * Russell Bryant <russell@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! 
20  * \file
21  * \author Russell Bryant <russell@digium.com>
22  *
23  * \brief pthread timing interface 
24  */
25
26 #include "asterisk.h"
27
28 ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
29
30 #include <math.h>
31 #include <sys/select.h>
32
33 #include "asterisk/module.h"
34 #include "asterisk/timing.h"
35 #include "asterisk/utils.h"
36 #include "asterisk/astobj2.h"
37 #include "asterisk/time.h"
38 #include "asterisk/lock.h"
39
40 static void *timing_funcs_handle;
41
42 static int pthread_timer_open(void);
43 static void pthread_timer_close(int handle);
44 static int pthread_timer_set_rate(int handle, unsigned int rate);
45 static void pthread_timer_ack(int handle, unsigned int quantity);
46 static int pthread_timer_enable_continuous(int handle);
47 static int pthread_timer_disable_continuous(int handle);
48 static enum ast_timing_event pthread_timer_get_event(int handle);
49
50 static struct ast_timing_functions pthread_timing_functions = {
51         .timer_open = pthread_timer_open,
52         .timer_close = pthread_timer_close,
53         .timer_set_rate = pthread_timer_set_rate,
54         .timer_ack = pthread_timer_ack,
55         .timer_enable_continuous = pthread_timer_enable_continuous,
56         .timer_disable_continuous = pthread_timer_disable_continuous,
57         .timer_get_event = pthread_timer_get_event,
58 };
59
60 /* 1 tick / 10 ms */
61 #define MAX_RATE 100
62
63 static struct ao2_container *pthread_timers;
64 #define PTHREAD_TIMER_BUCKETS 563
65
66 enum {
67         PIPE_READ =  0,
68         PIPE_WRITE = 1
69 };
70
71 enum pthread_timer_state {
72         TIMER_STATE_IDLE,
73         TIMER_STATE_TICKING,
74         TIMER_STATE_CONTINUOUS,
75 };
76
77 struct pthread_timer {
78         int pipe[2];
79         enum pthread_timer_state state;
80         unsigned int rate;
81         /*! Interval in ms for current rate */
82         unsigned int interval;
83         unsigned int tick_count;
84         struct timeval start;
85 };
86
87 static void pthread_timer_destructor(void *obj);
88 static struct pthread_timer *find_timer(int handle, int unlink);
89 static void write_byte(int wr_fd);
90 static void read_pipe(int rd_fd, unsigned int num, int clear);
91
92 /*!
93  * \brief Data for the timing thread
94  */
95 static struct {
96         pthread_t thread;
97         ast_mutex_t lock;
98         ast_cond_t cond;
99         unsigned int stop:1;
100 } timing_thread;
101
102 static int pthread_timer_open(void)
103 {
104         struct pthread_timer *timer;
105         int fd;
106
107         if (!(timer = ao2_alloc(sizeof(*timer), pthread_timer_destructor))) {
108                 errno = ENOMEM;
109                 return -1;
110         }
111
112         timer->pipe[PIPE_READ] = timer->pipe[PIPE_WRITE] = -1;
113         timer->state = TIMER_STATE_IDLE;
114
115         if (pipe(timer->pipe)) {
116                 ao2_ref(timer, -1);
117                 return -1;
118         }
119
120         ao2_lock(pthread_timers);
121         if (!ao2_container_count(pthread_timers)) {
122                 ast_mutex_lock(&timing_thread.lock);
123                 ast_cond_signal(&timing_thread.cond);
124                 ast_mutex_unlock(&timing_thread.lock);
125         }
126         ao2_link(pthread_timers, timer);
127         ao2_unlock(pthread_timers);
128
129         fd = timer->pipe[PIPE_READ];
130
131         ao2_ref(timer, -1);
132
133         return fd;
134 }
135
136 static void pthread_timer_close(int handle)
137 {
138         struct pthread_timer *timer;
139
140         if (!(timer = find_timer(handle, 1))) {
141                 return;
142         }
143
144         ao2_ref(timer, -1);
145 }
146
147 static int pthread_timer_set_rate(int handle, unsigned int rate)
148 {
149         struct pthread_timer *timer;
150
151         if (!(timer = find_timer(handle, 0))) {
152                 errno = EINVAL;
153                 return -1;
154         }
155
156         if (rate > MAX_RATE) {
157                 ast_log(LOG_ERROR, "res_timing_pthread only supports timers at a max rate of %d / sec\n",
158                         MAX_RATE);
159                 errno = EINVAL;
160                 return -1;
161         }
162
163         ao2_lock(timer);
164         timer->rate = rate;
165         timer->state = rate ? TIMER_STATE_TICKING : TIMER_STATE_IDLE;
166         timer->interval = rate ? roundf(1000.0 / ((float) rate)) : 0;
167         timer->start = rate ? ast_tvnow() : ast_tv(0, 0);
168         timer->tick_count = 0;
169         ao2_unlock(timer);
170
171         ao2_ref(timer, -1);
172
173         return 0;
174 }
175
176 static void pthread_timer_ack(int handle, unsigned int quantity)
177 {
178         struct pthread_timer *timer;
179
180         ast_assert(quantity > 0);
181
182         if (!(timer = find_timer(handle, 0))) {
183                 return;
184         }
185
186         if (timer->state == TIMER_STATE_CONTINUOUS) {
187                 /* Leave the pipe alone, please! */
188                 return;
189         }
190
191         read_pipe(timer->pipe[PIPE_READ], quantity, 0);
192
193         ao2_ref(timer, -1);
194 }
195
196 static int pthread_timer_enable_continuous(int handle)
197 {
198         struct pthread_timer *timer;
199
200         if (!(timer = find_timer(handle, 0))) {
201                 errno = EINVAL;
202                 return -1;
203         }
204
205         ao2_lock(timer);
206         timer->state = TIMER_STATE_CONTINUOUS;
207         write_byte(timer->pipe[PIPE_WRITE]);
208         ao2_unlock(timer);
209
210         ao2_ref(timer, -1);
211
212         return 0;
213 }
214
215 static int pthread_timer_disable_continuous(int handle)
216 {
217         struct pthread_timer *timer;
218
219         if (!(timer = find_timer(handle, 0))) {
220                 errno = EINVAL;
221                 return -1;
222         }
223
224         ao2_lock(timer);
225         timer->state = timer->rate ? TIMER_STATE_TICKING : TIMER_STATE_IDLE;
226         read_pipe(timer->pipe[PIPE_READ], 0, 1);
227         ao2_unlock(timer);
228
229         ao2_ref(timer, -1);
230
231         return 0;
232 }
233
234 static enum ast_timing_event pthread_timer_get_event(int handle)
235 {
236         struct pthread_timer *timer;
237         enum ast_timing_event res = AST_TIMING_EVENT_EXPIRED;
238
239         if (!(timer = find_timer(handle, 0))) {
240                 return res;
241         }
242
243         if (timer->state == TIMER_STATE_CONTINUOUS) {
244                 res = AST_TIMING_EVENT_CONTINUOUS;
245         }
246
247         ao2_ref(timer, -1);
248
249         return res;
250 }
251
252 static struct pthread_timer *find_timer(int handle, int unlink)
253 {
254         struct pthread_timer *timer;
255         struct pthread_timer tmp_timer;
256         int flags = OBJ_POINTER;
257
258         tmp_timer.pipe[PIPE_READ] = handle;
259
260         if (unlink) {
261                 flags |= OBJ_UNLINK;
262         }
263
264         if (!(timer = ao2_find(pthread_timers, &tmp_timer, flags))) {
265                 ast_assert(timer != NULL);
266                 return NULL;
267         }
268
269         return timer;
270 }
271
272 static void pthread_timer_destructor(void *obj)
273 {
274         struct pthread_timer *timer = obj;
275
276         if (timer->pipe[PIPE_READ] > -1) {
277                 close(timer->pipe[PIPE_READ]);
278                 timer->pipe[PIPE_READ] = -1;
279         }
280
281         if (timer->pipe[PIPE_WRITE] > -1) {
282                 close(timer->pipe[PIPE_WRITE]);
283                 timer->pipe[PIPE_WRITE] = -1;
284         }
285 }
286
287 /*!
288  * \note only PIPE_READ is guaranteed valid 
289  */
290 static int pthread_timer_hash(const void *obj, const int flags)
291 {
292         const struct pthread_timer *timer = obj;
293
294         return timer->pipe[PIPE_READ];
295 }
296
297 /*!
298  * \note only PIPE_READ is guaranteed valid 
299  */
300 static int pthread_timer_cmp(void *obj, void *arg, int flags)
301 {
302         struct pthread_timer *timer1 = obj, *timer2 = arg;
303
304         return (timer1->pipe[PIPE_READ] == timer2->pipe[PIPE_READ]) ? CMP_MATCH : 0;
305 }
306
307 /*!
308  * \retval 0 no timer tick needed
309  * \retval non-zero write to the timing pipe needed
310  */
311 static int check_timer(struct pthread_timer *timer)
312 {
313         struct timeval now;
314
315         if (timer->state == TIMER_STATE_IDLE || timer->state == TIMER_STATE_CONTINUOUS) {
316                 return 0;       
317         }
318         
319         now = ast_tvnow();
320
321         if (timer->tick_count < (ast_tvdiff_ms(now, timer->start) / timer->interval)) {
322                 timer->tick_count++;
323                 if (!timer->tick_count) {
324                         timer->start = now;
325                 }
326                 return 1;
327         }
328
329         return 0;
330 }
331
332 static void read_pipe(int rd_fd, unsigned int quantity, int clear)
333 {
334         ast_assert(quantity || clear);
335
336         if (!quantity && clear) {
337                 quantity = 1;
338         }
339
340         do {
341                 unsigned char buf[1024];
342                 ssize_t res;
343                 fd_set rfds;
344                 struct timeval tv = {
345                         .tv_sec = 0,
346                 };
347
348                 /* Make sure there is data to read */
349                 FD_ZERO(&rfds);
350                 FD_SET(rd_fd, &rfds);
351
352                 if (select(rd_fd + 1, &rfds, NULL, NULL, &tv) != 1) {
353                         break;
354                 }
355
356                 res = read(rd_fd, buf, 
357                         (quantity < sizeof(buf)) ? quantity : sizeof(buf));
358
359                 if (res == -1) {
360                         if (errno == EAGAIN) {
361                                 continue;
362                         }
363                         ast_log(LOG_ERROR, "read failed on timing pipe: %s\n", strerror(errno));
364                         break;
365                 }
366
367                 if (clear) {
368                         continue;
369                 }
370
371                 quantity -= res;
372         } while (quantity);
373 }
374
375 static void write_byte(int wr_fd)
376 {
377         do {
378                 ssize_t res;
379                 unsigned char x = 42;
380
381                 res = write(wr_fd, &x, 1); 
382
383                 if (res == -1) {
384                         if (errno == EAGAIN) {
385                                 continue;
386                         }
387                         ast_log(LOG_ERROR, "Error writing to timing pipe: %s\n", strerror(errno));
388                 }
389         } while (0);
390 }
391
392 static int run_timer(void *obj, void *arg, int flags)
393 {
394         struct pthread_timer *timer = obj;
395
396         if (timer->state == TIMER_STATE_IDLE) {
397                 return 0;
398         }
399
400         ao2_lock(timer);
401
402         if (check_timer(timer)) {
403                 write_byte(timer->pipe[PIPE_WRITE]);
404         }
405         
406         ao2_unlock(timer);
407
408         return 0;
409 }
410
411 static void *do_timing(void *arg)
412 {
413         struct timeval next_wakeup = ast_tvnow();
414
415         while (!timing_thread.stop) {
416                 struct timespec ts = { 0, };
417
418                 ao2_callback(pthread_timers, OBJ_NODATA, run_timer, NULL);
419
420                 next_wakeup = ast_tvadd(next_wakeup, ast_tv(0, 5000));
421
422                 ts.tv_sec = next_wakeup.tv_sec;
423                 ts.tv_nsec = next_wakeup.tv_usec * 1000;
424
425                 ast_mutex_lock(&timing_thread.lock);
426                 if (!timing_thread.stop) {
427                         if (ao2_container_count(pthread_timers)) {
428                                 ast_cond_timedwait(&timing_thread.cond, &timing_thread.lock, &ts);
429                         } else {
430                                 ast_cond_wait(&timing_thread.cond, &timing_thread.lock);
431                         }
432                 }
433                 ast_mutex_unlock(&timing_thread.lock);
434         }
435
436         return NULL;
437 }
438
439 static int init_timing_thread(void)
440 {
441         ast_mutex_init(&timing_thread.lock);
442         ast_cond_init(&timing_thread.cond, NULL);
443
444         if (ast_pthread_create_background(&timing_thread.thread, NULL, do_timing, NULL)) {
445                 ast_log(LOG_ERROR, "Unable to start timing thread.\n");
446                 return -1;
447         }
448
449         return 0;
450 }
451
452 static int load_module(void)
453 {
454         if (!(pthread_timers = ao2_container_alloc(PTHREAD_TIMER_BUCKETS, 
455                 pthread_timer_hash, pthread_timer_cmp))) {
456                 return AST_MODULE_LOAD_DECLINE;
457         }
458
459         if (init_timing_thread()) {
460                 ao2_ref(pthread_timers, -1);
461                 pthread_timers = NULL;
462                 return AST_MODULE_LOAD_DECLINE;
463         }
464
465         return (timing_funcs_handle = ast_install_timing_functions(&pthread_timing_functions)) ?
466                 AST_MODULE_LOAD_SUCCESS : AST_MODULE_LOAD_DECLINE;
467 }
468
469 static int unload_module(void)
470 {
471 #if 0
472         /* XXX code to stop the timing thread ... */
473
474         ast_uninstall_timing_functions(timing_funcs_handle);
475         ao2_ref(pthread_timers, -1);
476 #endif
477
478         /* This module can not currently be unloaded.  No use count handling is being done. */
479
480         return -1;
481 }
482
483 AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "pthread Timing Interface");