fix a few more XML documentation problems
[asterisk/asterisk.git] / res / res_timing_pthread.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2008, Digium, Inc.
5  *
6  * Russell Bryant <russell@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! 
20  * \file
21  * \author Russell Bryant <russell@digium.com>
22  *
23  * \brief pthread timing interface 
24  */
25
26 /*** MODULEINFO
27         <conflict>res_timing_timerfd</conflict>
28         <conflict>res_timing_dahdi</conflict>
29  ***/
30
31 #include "asterisk.h"
32
33 ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
34
35 #include <math.h>
36 #include <sys/select.h>
37
38 #include "asterisk/module.h"
39 #include "asterisk/timing.h"
40 #include "asterisk/utils.h"
41 #include "asterisk/astobj2.h"
42 #include "asterisk/time.h"
43 #include "asterisk/lock.h"
44
45 static void *timing_funcs_handle;
46
47 static int pthread_timer_open(void);
48 static void pthread_timer_close(int handle);
49 static int pthread_timer_set_rate(int handle, unsigned int rate);
50 static void pthread_timer_ack(int handle, unsigned int quantity);
51 static int pthread_timer_enable_continuous(int handle);
52 static int pthread_timer_disable_continuous(int handle);
53 static enum ast_timing_event pthread_timer_get_event(int handle);
54 static unsigned int pthread_timer_get_max_rate(int handle);
55
56 static struct ast_timing_functions pthread_timing_functions = {
57         .timer_open = pthread_timer_open,
58         .timer_close = pthread_timer_close,
59         .timer_set_rate = pthread_timer_set_rate,
60         .timer_ack = pthread_timer_ack,
61         .timer_enable_continuous = pthread_timer_enable_continuous,
62         .timer_disable_continuous = pthread_timer_disable_continuous,
63         .timer_get_event = pthread_timer_get_event,
64         .timer_get_max_rate = pthread_timer_get_max_rate,
65 };
66
67 /* 1 tick / 10 ms */
68 #define MAX_RATE 100
69
70 static struct ao2_container *pthread_timers;
71 #define PTHREAD_TIMER_BUCKETS 563
72
73 enum {
74         PIPE_READ =  0,
75         PIPE_WRITE = 1
76 };
77
78 enum pthread_timer_state {
79         TIMER_STATE_IDLE,
80         TIMER_STATE_TICKING,
81         TIMER_STATE_CONTINUOUS,
82 };
83
84 struct pthread_timer {
85         int pipe[2];
86         enum pthread_timer_state state;
87         unsigned int rate;
88         /*! Interval in ms for current rate */
89         unsigned int interval;
90         unsigned int tick_count;
91         struct timeval start;
92 };
93
94 static void pthread_timer_destructor(void *obj);
95 static struct pthread_timer *find_timer(int handle, int unlinkobj);
96 static void write_byte(int wr_fd);
97 static void read_pipe(int rd_fd, unsigned int num, int clear);
98
99 /*!
100  * \brief Data for the timing thread
101  */
102 static struct {
103         pthread_t thread;
104         ast_mutex_t lock;
105         ast_cond_t cond;
106         unsigned int stop:1;
107 } timing_thread;
108
109 static int pthread_timer_open(void)
110 {
111         struct pthread_timer *timer;
112         int fd;
113
114         if (!(timer = ao2_alloc(sizeof(*timer), pthread_timer_destructor))) {
115                 errno = ENOMEM;
116                 return -1;
117         }
118
119         timer->pipe[PIPE_READ] = timer->pipe[PIPE_WRITE] = -1;
120         timer->state = TIMER_STATE_IDLE;
121
122         if (pipe(timer->pipe)) {
123                 ao2_ref(timer, -1);
124                 return -1;
125         }
126
127         ao2_lock(pthread_timers);
128         if (!ao2_container_count(pthread_timers)) {
129                 ast_mutex_lock(&timing_thread.lock);
130                 ast_cond_signal(&timing_thread.cond);
131                 ast_mutex_unlock(&timing_thread.lock);
132         }
133         ao2_link(pthread_timers, timer);
134         ao2_unlock(pthread_timers);
135
136         fd = timer->pipe[PIPE_READ];
137
138         ao2_ref(timer, -1);
139
140         return fd;
141 }
142
143 static void pthread_timer_close(int handle)
144 {
145         struct pthread_timer *timer;
146
147         if (!(timer = find_timer(handle, 1))) {
148                 return;
149         }
150
151         ao2_ref(timer, -1);
152 }
153
154 static void set_state(struct pthread_timer *timer)
155 {
156         unsigned int rate = timer->rate;
157
158         if (rate) {
159                 timer->state = TIMER_STATE_TICKING;
160                 timer->interval = roundf(1000.0 / ((float) rate));
161                 timer->start = ast_tvnow();
162         } else {
163                 timer->state = TIMER_STATE_IDLE;
164                 timer->interval = 0;
165                 timer->start = ast_tv(0, 0);
166         }
167
168         timer->tick_count = 0;
169 }
170
171 static int pthread_timer_set_rate(int handle, unsigned int rate)
172 {
173         struct pthread_timer *timer;
174
175         if (!(timer = find_timer(handle, 0))) {
176                 errno = EINVAL;
177                 return -1;
178         }
179
180         if (rate > MAX_RATE) {
181                 ast_log(LOG_ERROR, "res_timing_pthread only supports timers at a max rate of %d / sec\n",
182                         MAX_RATE);
183                 errno = EINVAL;
184                 return -1;
185         }
186
187         ao2_lock(timer);
188         timer->rate = rate;
189         if (timer->state != TIMER_STATE_CONTINUOUS) {
190                 set_state(timer);
191         }
192         
193         ao2_unlock(timer);
194
195         ao2_ref(timer, -1);
196
197         return 0;
198 }
199
200 static void pthread_timer_ack(int handle, unsigned int quantity)
201 {
202         struct pthread_timer *timer;
203
204         ast_assert(quantity > 0);
205
206         if (!(timer = find_timer(handle, 0))) {
207                 return;
208         }
209
210         if (timer->state == TIMER_STATE_CONTINUOUS) {
211                 /* Leave the pipe alone, please! */
212                 return;
213         }
214
215         read_pipe(timer->pipe[PIPE_READ], quantity, 0);
216
217         ao2_ref(timer, -1);
218 }
219
220 static int pthread_timer_enable_continuous(int handle)
221 {
222         struct pthread_timer *timer;
223
224         if (!(timer = find_timer(handle, 0))) {
225                 errno = EINVAL;
226                 return -1;
227         }
228
229         ao2_lock(timer);
230         timer->state = TIMER_STATE_CONTINUOUS;
231         write_byte(timer->pipe[PIPE_WRITE]);
232         ao2_unlock(timer);
233
234         ao2_ref(timer, -1);
235
236         return 0;
237 }
238
239 static int pthread_timer_disable_continuous(int handle)
240 {
241         struct pthread_timer *timer;
242
243         if (!(timer = find_timer(handle, 0))) {
244                 errno = EINVAL;
245                 return -1;
246         }
247
248         ao2_lock(timer);
249         set_state(timer);
250         read_pipe(timer->pipe[PIPE_READ], 0, 1);
251         ao2_unlock(timer);
252
253         ao2_ref(timer, -1);
254
255         return 0;
256 }
257
258 static enum ast_timing_event pthread_timer_get_event(int handle)
259 {
260         struct pthread_timer *timer;
261         enum ast_timing_event res = AST_TIMING_EVENT_EXPIRED;
262
263         if (!(timer = find_timer(handle, 0))) {
264                 return res;
265         }
266
267         if (timer->state == TIMER_STATE_CONTINUOUS) {
268                 res = AST_TIMING_EVENT_CONTINUOUS;
269         }
270
271         ao2_ref(timer, -1);
272
273         return res;
274 }
275
276 static unsigned int pthread_timer_get_max_rate(int handle)
277 {
278         return MAX_RATE;
279 }
280
281 static struct pthread_timer *find_timer(int handle, int unlinkobj)
282 {
283         struct pthread_timer *timer;
284         struct pthread_timer tmp_timer;
285         int flags = OBJ_POINTER;
286
287         tmp_timer.pipe[PIPE_READ] = handle;
288
289         if (unlinkobj) {
290                 flags |= OBJ_UNLINK;
291         }
292
293         if (!(timer = ao2_find(pthread_timers, &tmp_timer, flags))) {
294                 ast_assert(timer != NULL);
295                 return NULL;
296         }
297
298         return timer;
299 }
300
301 static void pthread_timer_destructor(void *obj)
302 {
303         struct pthread_timer *timer = obj;
304
305         if (timer->pipe[PIPE_READ] > -1) {
306                 close(timer->pipe[PIPE_READ]);
307                 timer->pipe[PIPE_READ] = -1;
308         }
309
310         if (timer->pipe[PIPE_WRITE] > -1) {
311                 close(timer->pipe[PIPE_WRITE]);
312                 timer->pipe[PIPE_WRITE] = -1;
313         }
314 }
315
316 /*!
317  * \note only PIPE_READ is guaranteed valid 
318  */
319 static int pthread_timer_hash(const void *obj, const int flags)
320 {
321         const struct pthread_timer *timer = obj;
322
323         return timer->pipe[PIPE_READ];
324 }
325
326 /*!
327  * \note only PIPE_READ is guaranteed valid 
328  */
329 static int pthread_timer_cmp(void *obj, void *arg, int flags)
330 {
331         struct pthread_timer *timer1 = obj, *timer2 = arg;
332
333         return (timer1->pipe[PIPE_READ] == timer2->pipe[PIPE_READ]) ? CMP_MATCH | CMP_STOP : 0;
334 }
335
336 /*!
337  * \retval 0 no timer tick needed
338  * \retval non-zero write to the timing pipe needed
339  */
340 static int check_timer(struct pthread_timer *timer)
341 {
342         struct timeval now;
343
344         if (timer->state == TIMER_STATE_IDLE || timer->state == TIMER_STATE_CONTINUOUS) {
345                 return 0;       
346         }
347         
348         now = ast_tvnow();
349
350         if (timer->tick_count < (ast_tvdiff_ms(now, timer->start) / timer->interval)) {
351                 timer->tick_count++;
352                 if (!timer->tick_count) {
353                         timer->start = now;
354                 }
355                 return 1;
356         }
357
358         return 0;
359 }
360
361 static void read_pipe(int rd_fd, unsigned int quantity, int clear)
362 {
363         ast_assert(quantity || clear);
364
365         if (!quantity && clear) {
366                 quantity = 1;
367         }
368
369         do {
370                 unsigned char buf[1024];
371                 ssize_t res;
372                 fd_set rfds;
373                 struct timeval timeout = {
374                         .tv_sec = 0,
375                 };
376
377                 /* Make sure there is data to read */
378                 FD_ZERO(&rfds);
379                 FD_SET(rd_fd, &rfds);
380
381                 if (select(rd_fd + 1, &rfds, NULL, NULL, &timeout) != 1) {
382                         break;
383                 }
384
385                 res = read(rd_fd, buf, 
386                         (quantity < sizeof(buf)) ? quantity : sizeof(buf));
387
388                 if (res == -1) {
389                         if (errno == EAGAIN) {
390                                 continue;
391                         }
392                         ast_log(LOG_ERROR, "read failed on timing pipe: %s\n", strerror(errno));
393                         break;
394                 }
395
396                 if (clear) {
397                         continue;
398                 }
399
400                 quantity -= res;
401         } while (quantity);
402 }
403
404 static void write_byte(int wr_fd)
405 {
406         do {
407                 ssize_t res;
408                 unsigned char x = 42;
409
410                 res = write(wr_fd, &x, 1); 
411
412                 if (res == -1) {
413                         if (errno == EAGAIN) {
414                                 continue;
415                         }
416                         ast_log(LOG_ERROR, "Error writing to timing pipe: %s\n", strerror(errno));
417                 }
418         } while (0);
419 }
420
421 static int run_timer(void *obj, void *arg, int flags)
422 {
423         struct pthread_timer *timer = obj;
424
425         if (timer->state == TIMER_STATE_IDLE) {
426                 return 0;
427         }
428
429         ao2_lock(timer);
430
431         if (check_timer(timer)) {
432                 write_byte(timer->pipe[PIPE_WRITE]);
433         }
434         
435         ao2_unlock(timer);
436
437         return 0;
438 }
439
440 static void *do_timing(void *arg)
441 {
442         struct timeval next_wakeup = ast_tvnow();
443
444         while (!timing_thread.stop) {
445                 struct timespec ts = { 0, };
446
447                 ao2_callback(pthread_timers, OBJ_NODATA, run_timer, NULL);
448
449                 next_wakeup = ast_tvadd(next_wakeup, ast_tv(0, 5000));
450
451                 ts.tv_sec = next_wakeup.tv_sec;
452                 ts.tv_nsec = next_wakeup.tv_usec * 1000;
453
454                 ast_mutex_lock(&timing_thread.lock);
455                 if (!timing_thread.stop) {
456                         if (ao2_container_count(pthread_timers)) {
457                                 ast_cond_timedwait(&timing_thread.cond, &timing_thread.lock, &ts);
458                         } else {
459                                 ast_cond_wait(&timing_thread.cond, &timing_thread.lock);
460                         }
461                 }
462                 ast_mutex_unlock(&timing_thread.lock);
463         }
464
465         return NULL;
466 }
467
468 static int init_timing_thread(void)
469 {
470         ast_mutex_init(&timing_thread.lock);
471         ast_cond_init(&timing_thread.cond, NULL);
472
473         if (ast_pthread_create_background(&timing_thread.thread, NULL, do_timing, NULL)) {
474                 ast_log(LOG_ERROR, "Unable to start timing thread.\n");
475                 return -1;
476         }
477
478         return 0;
479 }
480
481 static int load_module(void)
482 {
483         if (!(pthread_timers = ao2_container_alloc(PTHREAD_TIMER_BUCKETS, 
484                 pthread_timer_hash, pthread_timer_cmp))) {
485                 return AST_MODULE_LOAD_DECLINE;
486         }
487
488         if (init_timing_thread()) {
489                 ao2_ref(pthread_timers, -1);
490                 pthread_timers = NULL;
491                 return AST_MODULE_LOAD_DECLINE;
492         }
493
494         return (timing_funcs_handle = ast_install_timing_functions(&pthread_timing_functions)) ?
495                 AST_MODULE_LOAD_SUCCESS : AST_MODULE_LOAD_DECLINE;
496 }
497
498 static int unload_module(void)
499 {
500 #if 0
501         /* XXX code to stop the timing thread ... */
502
503         ast_uninstall_timing_functions(timing_funcs_handle);
504         ao2_ref(pthread_timers, -1);
505 #endif
506
507         /* This module can not currently be unloaded.  No use count handling is being done. */
508
509         return -1;
510 }
511
512 AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "pthread Timing Interface");