This is basically a complete rollback of r155401, as it was determined that
[asterisk/asterisk.git] / res / res_timing_pthread.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2008, Digium, Inc.
5  *
6  * Russell Bryant <russell@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! 
20  * \file
21  * \author Russell Bryant <russell@digium.com>
22  *
23  * \brief pthread timing interface 
24  */
25
26 /*** MODULEINFO
27         <conflict>res_timing_timerfd</conflict>
28         <conflict>res_timing_dahdi</conflict>
29  ***/
30
31 #include "asterisk.h"
32
33 ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
34
35 #include <math.h>
36 #include <sys/select.h>
37
38 #include "asterisk/module.h"
39 #include "asterisk/timing.h"
40 #include "asterisk/utils.h"
41 #include "asterisk/astobj2.h"
42 #include "asterisk/time.h"
43 #include "asterisk/lock.h"
44
45 static void *timing_funcs_handle;
46
47 static int pthread_timer_open(void);
48 static void pthread_timer_close(int handle);
49 static int pthread_timer_set_rate(int handle, unsigned int rate);
50 static void pthread_timer_ack(int handle, unsigned int quantity);
51 static int pthread_timer_enable_continuous(int handle);
52 static int pthread_timer_disable_continuous(int handle);
53 static enum ast_timing_event pthread_timer_get_event(int handle);
54 static unsigned int pthread_timer_get_max_rate(int handle);
55
56 static struct ast_timing_functions pthread_timing_functions = {
57         .timer_open = pthread_timer_open,
58         .timer_close = pthread_timer_close,
59         .timer_set_rate = pthread_timer_set_rate,
60         .timer_ack = pthread_timer_ack,
61         .timer_enable_continuous = pthread_timer_enable_continuous,
62         .timer_disable_continuous = pthread_timer_disable_continuous,
63         .timer_get_event = pthread_timer_get_event,
64         .timer_get_max_rate = pthread_timer_get_max_rate,
65 };
66
67 /* 1 tick / 10 ms */
68 #define MAX_RATE 100
69
70 static struct ao2_container *pthread_timers;
71 #define PTHREAD_TIMER_BUCKETS 563
72
73 enum {
74         PIPE_READ =  0,
75         PIPE_WRITE = 1
76 };
77
78 enum pthread_timer_state {
79         TIMER_STATE_IDLE,
80         TIMER_STATE_TICKING,
81         TIMER_STATE_CONTINUOUS,
82 };
83
84 struct pthread_timer {
85         int pipe[2];
86         enum pthread_timer_state state;
87         unsigned int rate;
88         /*! Interval in ms for current rate */
89         unsigned int interval;
90         unsigned int tick_count;
91         struct timeval start;
92 };
93
94 static void pthread_timer_destructor(void *obj);
95 static struct pthread_timer *find_timer(int handle, int unlinkobj);
96 static void write_byte(int wr_fd);
97 static void read_pipe(int rd_fd, unsigned int num, int clear);
98
99 /*!
100  * \brief Data for the timing thread
101  */
102 static struct {
103         pthread_t thread;
104         ast_mutex_t lock;
105         ast_cond_t cond;
106         unsigned int stop:1;
107 } timing_thread;
108
109 static int pthread_timer_open(void)
110 {
111         struct pthread_timer *timer;
112         int fd;
113
114         if (!(timer = ao2_alloc(sizeof(*timer), pthread_timer_destructor))) {
115                 errno = ENOMEM;
116                 return -1;
117         }
118
119         timer->pipe[PIPE_READ] = timer->pipe[PIPE_WRITE] = -1;
120         timer->state = TIMER_STATE_IDLE;
121
122         if (pipe(timer->pipe)) {
123                 ao2_ref(timer, -1);
124                 return -1;
125         }
126
127         ao2_lock(pthread_timers);
128         if (!ao2_container_count(pthread_timers)) {
129                 ast_mutex_lock(&timing_thread.lock);
130                 ast_cond_signal(&timing_thread.cond);
131                 ast_mutex_unlock(&timing_thread.lock);
132         }
133         ao2_link(pthread_timers, timer);
134         ao2_unlock(pthread_timers);
135
136         fd = timer->pipe[PIPE_READ];
137
138         ao2_ref(timer, -1);
139
140         return fd;
141 }
142
143 static void pthread_timer_close(int handle)
144 {
145         struct pthread_timer *timer;
146
147         if (!(timer = find_timer(handle, 1))) {
148                 return;
149         }
150
151         ao2_ref(timer, -1);
152 }
153
154 static int pthread_timer_set_rate(int handle, unsigned int rate)
155 {
156         struct pthread_timer *timer;
157
158         if (!(timer = find_timer(handle, 0))) {
159                 errno = EINVAL;
160                 return -1;
161         }
162
163         if (rate > MAX_RATE) {
164                 ast_log(LOG_ERROR, "res_timing_pthread only supports timers at a max rate of %d / sec\n",
165                         MAX_RATE);
166                 errno = EINVAL;
167                 return -1;
168         }
169
170         ao2_lock(timer);
171         timer->rate = rate;
172         timer->state = rate ? TIMER_STATE_TICKING : TIMER_STATE_IDLE;
173         timer->interval = rate ? roundf(1000.0 / ((float) rate)) : 0;
174         timer->start = rate ? ast_tvnow() : ast_tv(0, 0);
175         timer->tick_count = 0;
176         ao2_unlock(timer);
177
178         ao2_ref(timer, -1);
179
180         return 0;
181 }
182
183 static void pthread_timer_ack(int handle, unsigned int quantity)
184 {
185         struct pthread_timer *timer;
186
187         ast_assert(quantity > 0);
188
189         if (!(timer = find_timer(handle, 0))) {
190                 return;
191         }
192
193         if (timer->state == TIMER_STATE_CONTINUOUS) {
194                 /* Leave the pipe alone, please! */
195                 return;
196         }
197
198         read_pipe(timer->pipe[PIPE_READ], quantity, 0);
199
200         ao2_ref(timer, -1);
201 }
202
203 static int pthread_timer_enable_continuous(int handle)
204 {
205         struct pthread_timer *timer;
206
207         if (!(timer = find_timer(handle, 0))) {
208                 errno = EINVAL;
209                 return -1;
210         }
211
212         ao2_lock(timer);
213         timer->state = TIMER_STATE_CONTINUOUS;
214         write_byte(timer->pipe[PIPE_WRITE]);
215         ao2_unlock(timer);
216
217         ao2_ref(timer, -1);
218
219         return 0;
220 }
221
222 static int pthread_timer_disable_continuous(int handle)
223 {
224         struct pthread_timer *timer;
225
226         if (!(timer = find_timer(handle, 0))) {
227                 errno = EINVAL;
228                 return -1;
229         }
230
231         ao2_lock(timer);
232         timer->state = timer->rate ? TIMER_STATE_TICKING : TIMER_STATE_IDLE;
233         read_pipe(timer->pipe[PIPE_READ], 0, 1);
234         ao2_unlock(timer);
235
236         ao2_ref(timer, -1);
237
238         return 0;
239 }
240
241 static enum ast_timing_event pthread_timer_get_event(int handle)
242 {
243         struct pthread_timer *timer;
244         enum ast_timing_event res = AST_TIMING_EVENT_EXPIRED;
245
246         if (!(timer = find_timer(handle, 0))) {
247                 return res;
248         }
249
250         if (timer->state == TIMER_STATE_CONTINUOUS) {
251                 res = AST_TIMING_EVENT_CONTINUOUS;
252         }
253
254         ao2_ref(timer, -1);
255
256         return res;
257 }
258
259 static unsigned int pthread_timer_get_max_rate(int handle)
260 {
261         return MAX_RATE;
262 }
263
264 static struct pthread_timer *find_timer(int handle, int unlinkobj)
265 {
266         struct pthread_timer *timer;
267         struct pthread_timer tmp_timer;
268         int flags = OBJ_POINTER;
269
270         tmp_timer.pipe[PIPE_READ] = handle;
271
272         if (unlinkobj) {
273                 flags |= OBJ_UNLINK;
274         }
275
276         if (!(timer = ao2_find(pthread_timers, &tmp_timer, flags))) {
277                 ast_assert(timer != NULL);
278                 return NULL;
279         }
280
281         return timer;
282 }
283
284 static void pthread_timer_destructor(void *obj)
285 {
286         struct pthread_timer *timer = obj;
287
288         if (timer->pipe[PIPE_READ] > -1) {
289                 close(timer->pipe[PIPE_READ]);
290                 timer->pipe[PIPE_READ] = -1;
291         }
292
293         if (timer->pipe[PIPE_WRITE] > -1) {
294                 close(timer->pipe[PIPE_WRITE]);
295                 timer->pipe[PIPE_WRITE] = -1;
296         }
297 }
298
299 /*!
300  * \note only PIPE_READ is guaranteed valid 
301  */
302 static int pthread_timer_hash(const void *obj, const int flags)
303 {
304         const struct pthread_timer *timer = obj;
305
306         return timer->pipe[PIPE_READ];
307 }
308
309 /*!
310  * \note only PIPE_READ is guaranteed valid 
311  */
312 static int pthread_timer_cmp(void *obj, void *arg, int flags)
313 {
314         struct pthread_timer *timer1 = obj, *timer2 = arg;
315
316         return (timer1->pipe[PIPE_READ] == timer2->pipe[PIPE_READ]) ? CMP_MATCH | CMP_STOP : 0;
317 }
318
319 /*!
320  * \retval 0 no timer tick needed
321  * \retval non-zero write to the timing pipe needed
322  */
323 static int check_timer(struct pthread_timer *timer)
324 {
325         struct timeval now;
326
327         if (timer->state == TIMER_STATE_IDLE || timer->state == TIMER_STATE_CONTINUOUS) {
328                 return 0;       
329         }
330         
331         now = ast_tvnow();
332
333         if (timer->tick_count < (ast_tvdiff_ms(now, timer->start) / timer->interval)) {
334                 timer->tick_count++;
335                 if (!timer->tick_count) {
336                         timer->start = now;
337                 }
338                 return 1;
339         }
340
341         return 0;
342 }
343
344 static void read_pipe(int rd_fd, unsigned int quantity, int clear)
345 {
346         ast_assert(quantity || clear);
347
348         if (!quantity && clear) {
349                 quantity = 1;
350         }
351
352         do {
353                 unsigned char buf[1024];
354                 ssize_t res;
355                 fd_set rfds;
356                 struct timeval timeout = {
357                         .tv_sec = 0,
358                 };
359
360                 /* Make sure there is data to read */
361                 FD_ZERO(&rfds);
362                 FD_SET(rd_fd, &rfds);
363
364                 if (select(rd_fd + 1, &rfds, NULL, NULL, &timeout) != 1) {
365                         break;
366                 }
367
368                 res = read(rd_fd, buf, 
369                         (quantity < sizeof(buf)) ? quantity : sizeof(buf));
370
371                 if (res == -1) {
372                         if (errno == EAGAIN) {
373                                 continue;
374                         }
375                         ast_log(LOG_ERROR, "read failed on timing pipe: %s\n", strerror(errno));
376                         break;
377                 }
378
379                 if (clear) {
380                         continue;
381                 }
382
383                 quantity -= res;
384         } while (quantity);
385 }
386
387 static void write_byte(int wr_fd)
388 {
389         do {
390                 ssize_t res;
391                 unsigned char x = 42;
392
393                 res = write(wr_fd, &x, 1); 
394
395                 if (res == -1) {
396                         if (errno == EAGAIN) {
397                                 continue;
398                         }
399                         ast_log(LOG_ERROR, "Error writing to timing pipe: %s\n", strerror(errno));
400                 }
401         } while (0);
402 }
403
404 static int run_timer(void *obj, void *arg, int flags)
405 {
406         struct pthread_timer *timer = obj;
407
408         if (timer->state == TIMER_STATE_IDLE) {
409                 return 0;
410         }
411
412         ao2_lock(timer);
413
414         if (check_timer(timer)) {
415                 write_byte(timer->pipe[PIPE_WRITE]);
416         }
417         
418         ao2_unlock(timer);
419
420         return 0;
421 }
422
423 static void *do_timing(void *arg)
424 {
425         struct timeval next_wakeup = ast_tvnow();
426
427         while (!timing_thread.stop) {
428                 struct timespec ts = { 0, };
429
430                 ao2_callback(pthread_timers, OBJ_NODATA, run_timer, NULL);
431
432                 next_wakeup = ast_tvadd(next_wakeup, ast_tv(0, 5000));
433
434                 ts.tv_sec = next_wakeup.tv_sec;
435                 ts.tv_nsec = next_wakeup.tv_usec * 1000;
436
437                 ast_mutex_lock(&timing_thread.lock);
438                 if (!timing_thread.stop) {
439                         if (ao2_container_count(pthread_timers)) {
440                                 ast_cond_timedwait(&timing_thread.cond, &timing_thread.lock, &ts);
441                         } else {
442                                 ast_cond_wait(&timing_thread.cond, &timing_thread.lock);
443                         }
444                 }
445                 ast_mutex_unlock(&timing_thread.lock);
446         }
447
448         return NULL;
449 }
450
451 static int init_timing_thread(void)
452 {
453         ast_mutex_init(&timing_thread.lock);
454         ast_cond_init(&timing_thread.cond, NULL);
455
456         if (ast_pthread_create_background(&timing_thread.thread, NULL, do_timing, NULL)) {
457                 ast_log(LOG_ERROR, "Unable to start timing thread.\n");
458                 return -1;
459         }
460
461         return 0;
462 }
463
464 static int load_module(void)
465 {
466         if (!(pthread_timers = ao2_container_alloc(PTHREAD_TIMER_BUCKETS, 
467                 pthread_timer_hash, pthread_timer_cmp))) {
468                 return AST_MODULE_LOAD_DECLINE;
469         }
470
471         if (init_timing_thread()) {
472                 ao2_ref(pthread_timers, -1);
473                 pthread_timers = NULL;
474                 return AST_MODULE_LOAD_DECLINE;
475         }
476
477         return (timing_funcs_handle = ast_install_timing_functions(&pthread_timing_functions)) ?
478                 AST_MODULE_LOAD_SUCCESS : AST_MODULE_LOAD_DECLINE;
479 }
480
481 static int unload_module(void)
482 {
483 #if 0
484         /* XXX code to stop the timing thread ... */
485
486         ast_uninstall_timing_functions(timing_funcs_handle);
487         ao2_ref(pthread_timers, -1);
488 #endif
489
490         /* This module can not currently be unloaded.  No use count handling is being done. */
491
492         return -1;
493 }
494
495 AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "pthread Timing Interface");