Merged revisions 328247 via svnmerge from
[asterisk/asterisk.git] / res / res_timing_pthread.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2008, Digium, Inc.
5  *
6  * Russell Bryant <russell@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*!
20  * \file
21  * \author Russell Bryant <russell@digium.com>
22  *
23  * \brief pthread timing interface
24  */
25
26 /*** MODULEINFO
27         <support_level>extended</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
33
34 #include <math.h>
35 #include <sys/select.h>
36
37 #include "asterisk/module.h"
38 #include "asterisk/timing.h"
39 #include "asterisk/utils.h"
40 #include "asterisk/astobj2.h"
41 #include "asterisk/time.h"
42 #include "asterisk/lock.h"
43 #include "asterisk/poll-compat.h"
44
45 static void *timing_funcs_handle;
46
47 static int pthread_timer_open(void);
48 static void pthread_timer_close(int handle);
49 static int pthread_timer_set_rate(int handle, unsigned int rate);
50 static void pthread_timer_ack(int handle, unsigned int quantity);
51 static int pthread_timer_enable_continuous(int handle);
52 static int pthread_timer_disable_continuous(int handle);
53 static enum ast_timer_event pthread_timer_get_event(int handle);
54 static unsigned int pthread_timer_get_max_rate(int handle);
55
56 static struct ast_timing_interface pthread_timing = {
57         .name = "pthread",
58         .priority = 0, /* use this as a last resort */
59         .timer_open = pthread_timer_open,
60         .timer_close = pthread_timer_close,
61         .timer_set_rate = pthread_timer_set_rate,
62         .timer_ack = pthread_timer_ack,
63         .timer_enable_continuous = pthread_timer_enable_continuous,
64         .timer_disable_continuous = pthread_timer_disable_continuous,
65         .timer_get_event = pthread_timer_get_event,
66         .timer_get_max_rate = pthread_timer_get_max_rate,
67 };
68
69 /* 1 tick / 10 ms */
70 #define MAX_RATE 100
71
72 static struct ao2_container *pthread_timers;
73 #define PTHREAD_TIMER_BUCKETS 563
74
75 enum {
76         PIPE_READ =  0,
77         PIPE_WRITE = 1
78 };
79
80 enum pthread_timer_state {
81         TIMER_STATE_IDLE,
82         TIMER_STATE_TICKING,
83 };
84
85 struct pthread_timer {
86         int pipe[2];
87         enum pthread_timer_state state;
88         unsigned int rate;
89         /*! Interval in ms for current rate */
90         unsigned int interval;
91         unsigned int tick_count;
92         unsigned int pending_ticks;
93         struct timeval start;
94         unsigned int continuous:1;
95 };
96
97 static void pthread_timer_destructor(void *obj);
98 static struct pthread_timer *find_timer(int handle, int unlinkobj);
99 static void write_byte(struct pthread_timer *timer);
100 static void read_pipe(struct pthread_timer *timer, unsigned int num);
101
102 /*!
103  * \brief Data for the timing thread
104  */
105 static struct {
106         pthread_t thread;
107         ast_mutex_t lock;
108         ast_cond_t cond;
109         unsigned int stop:1;
110 } timing_thread;
111
112 static int pthread_timer_open(void)
113 {
114         struct pthread_timer *timer;
115         int fd;
116
117         if (!(timer = ao2_alloc(sizeof(*timer), pthread_timer_destructor))) {
118                 errno = ENOMEM;
119                 return -1;
120         }
121
122         timer->pipe[PIPE_READ] = timer->pipe[PIPE_WRITE] = -1;
123         timer->state = TIMER_STATE_IDLE;
124
125         if (pipe(timer->pipe)) {
126                 ao2_ref(timer, -1);
127                 return -1;
128         }
129
130         ao2_lock(pthread_timers);
131         if (!ao2_container_count(pthread_timers)) {
132                 ast_mutex_lock(&timing_thread.lock);
133                 ast_cond_signal(&timing_thread.cond);
134                 ast_mutex_unlock(&timing_thread.lock);
135         }
136         ao2_link(pthread_timers, timer);
137         ao2_unlock(pthread_timers);
138
139         fd = timer->pipe[PIPE_READ];
140
141         ao2_ref(timer, -1);
142
143         return fd;
144 }
145
146 static void pthread_timer_close(int handle)
147 {
148         struct pthread_timer *timer;
149
150         if (!(timer = find_timer(handle, 1))) {
151                 return;
152         }
153
154         ao2_ref(timer, -1);
155 }
156
157 static int pthread_timer_set_rate(int handle, unsigned int rate)
158 {
159         struct pthread_timer *timer;
160
161         if (!(timer = find_timer(handle, 0))) {
162                 errno = EINVAL;
163                 return -1;
164         }
165
166         if (rate > MAX_RATE) {
167                 ast_log(LOG_ERROR, "res_timing_pthread only supports timers at a "
168                                 "max rate of %d / sec\n", MAX_RATE);
169                 errno = EINVAL;
170                 return -1;
171         }
172
173         ao2_lock(timer);
174
175         if ((timer->rate = rate)) {
176                 timer->interval = roundf(1000.0 / ((float) rate));
177                 timer->start = ast_tvnow();
178                 timer->state = TIMER_STATE_TICKING;
179         } else {
180                 timer->interval = 0;
181                 timer->start = ast_tv(0, 0);
182                 timer->state = TIMER_STATE_IDLE;
183         }
184         timer->tick_count = 0;
185
186         ao2_unlock(timer);
187
188         ao2_ref(timer, -1);
189
190         return 0;
191 }
192
193 static void pthread_timer_ack(int handle, unsigned int quantity)
194 {
195         struct pthread_timer *timer;
196
197         ast_assert(quantity > 0);
198
199         if (!(timer = find_timer(handle, 0))) {
200                 return;
201         }
202
203         ao2_lock(timer);
204         read_pipe(timer, quantity);
205         ao2_unlock(timer);
206
207         ao2_ref(timer, -1);
208 }
209
210 static int pthread_timer_enable_continuous(int handle)
211 {
212         struct pthread_timer *timer;
213
214         if (!(timer = find_timer(handle, 0))) {
215                 errno = EINVAL;
216                 return -1;
217         }
218
219         ao2_lock(timer);
220         if (!timer->continuous) {
221                 timer->continuous = 1;
222                 write_byte(timer);
223         }
224         ao2_unlock(timer);
225
226         ao2_ref(timer, -1);
227
228         return 0;
229 }
230
231 static int pthread_timer_disable_continuous(int handle)
232 {
233         struct pthread_timer *timer;
234
235         if (!(timer = find_timer(handle, 0))) {
236                 errno = EINVAL;
237                 return -1;
238         }
239
240         ao2_lock(timer);
241         if (timer->continuous) {
242                 timer->continuous = 0;
243                 read_pipe(timer, 1);
244         }
245         ao2_unlock(timer);
246
247         ao2_ref(timer, -1);
248
249         return 0;
250 }
251
252 static enum ast_timer_event pthread_timer_get_event(int handle)
253 {
254         struct pthread_timer *timer;
255         enum ast_timer_event res = AST_TIMING_EVENT_EXPIRED;
256
257         if (!(timer = find_timer(handle, 0))) {
258                 return res;
259         }
260
261         ao2_lock(timer);
262         if (timer->continuous && timer->pending_ticks == 1) {
263                 res = AST_TIMING_EVENT_CONTINUOUS;
264         }
265         ao2_unlock(timer);
266
267         ao2_ref(timer, -1);
268
269         return res;
270 }
271
272 static unsigned int pthread_timer_get_max_rate(int handle)
273 {
274         return MAX_RATE;
275 }
276
277 static struct pthread_timer *find_timer(int handle, int unlinkobj)
278 {
279         struct pthread_timer *timer;
280         struct pthread_timer tmp_timer;
281         int flags = OBJ_POINTER;
282
283         tmp_timer.pipe[PIPE_READ] = handle;
284
285         if (unlinkobj) {
286                 flags |= OBJ_UNLINK;
287         }
288
289         if (!(timer = ao2_find(pthread_timers, &tmp_timer, flags))) {
290                 ast_assert(timer != NULL);
291                 return NULL;
292         }
293
294         return timer;
295 }
296
297 static void pthread_timer_destructor(void *obj)
298 {
299         struct pthread_timer *timer = obj;
300
301         if (timer->pipe[PIPE_READ] > -1) {
302                 close(timer->pipe[PIPE_READ]);
303                 timer->pipe[PIPE_READ] = -1;
304         }
305
306         if (timer->pipe[PIPE_WRITE] > -1) {
307                 close(timer->pipe[PIPE_WRITE]);
308                 timer->pipe[PIPE_WRITE] = -1;
309         }
310 }
311
312 /*!
313  * \note only PIPE_READ is guaranteed valid
314  */
315 static int pthread_timer_hash(const void *obj, const int flags)
316 {
317         const struct pthread_timer *timer = obj;
318
319         return timer->pipe[PIPE_READ];
320 }
321
322 /*!
323  * \note only PIPE_READ is guaranteed valid
324  */
325 static int pthread_timer_cmp(void *obj, void *arg, int flags)
326 {
327         struct pthread_timer *timer1 = obj, *timer2 = arg;
328
329         return (timer1->pipe[PIPE_READ] == timer2->pipe[PIPE_READ]) ? CMP_MATCH | CMP_STOP : 0;
330 }
331
332 /*!
333  * \retval 0 no timer tick needed
334  * \retval non-zero write to the timing pipe needed
335  */
336 static int check_timer(struct pthread_timer *timer)
337 {
338         struct timeval now;
339
340         if (timer->state == TIMER_STATE_IDLE) {
341                 return 0;
342         }
343
344         now = ast_tvnow();
345
346         if (timer->tick_count < (ast_tvdiff_ms(now, timer->start) / timer->interval)) {
347                 timer->tick_count++;
348                 if (!timer->tick_count) {
349                         /* Handle overflow. */
350                         timer->start = now;
351                 }
352                 return 1;
353         }
354
355         return 0;
356 }
357
358 /*!
359  * \internal
360  * \pre timer is locked
361  */
362 static void read_pipe(struct pthread_timer *timer, unsigned int quantity)
363 {
364         int rd_fd = timer->pipe[PIPE_READ];
365         int pending_ticks = timer->pending_ticks;
366
367         ast_assert(quantity);
368
369         if (timer->continuous && pending_ticks) {
370                 pending_ticks--;
371         }
372
373         if (quantity > pending_ticks) {
374                 quantity = pending_ticks;
375         }
376
377         if (!quantity) {
378                 return;
379         }
380
381         do {
382                 unsigned char buf[1024];
383                 ssize_t res;
384                 struct pollfd pfd = {
385                         .fd = rd_fd,
386                         .events = POLLIN,
387                 };
388
389                 if (ast_poll(&pfd, 1, 0) != 1) {
390                         ast_debug(1, "Reading not available on timing pipe, "
391                                         "quantity: %u\n", quantity);
392                         break;
393                 }
394
395                 res = read(rd_fd, buf,
396                         (quantity < sizeof(buf)) ? quantity : sizeof(buf));
397
398                 if (res == -1) {
399                         if (errno == EAGAIN) {
400                                 continue;
401                         }
402                         ast_log(LOG_ERROR, "read failed on timing pipe: %s\n",
403                                         strerror(errno));
404                         break;
405                 }
406
407                 quantity -= res;
408                 timer->pending_ticks -= res;
409         } while (quantity);
410 }
411
412 /*!
413  * \internal
414  * \pre timer is locked
415  */
416 static void write_byte(struct pthread_timer *timer)
417 {
418         ssize_t res;
419         unsigned char x = 42;
420
421         do {
422                 res = write(timer->pipe[PIPE_WRITE], &x, 1);
423         } while (res == -1 && errno == EAGAIN);
424
425         if (res == -1) {
426                 ast_log(LOG_ERROR, "Error writing to timing pipe: %s\n",
427                                 strerror(errno));
428         } else {
429                 timer->pending_ticks++;
430         }
431 }
432
433 static int run_timer(void *obj, void *arg, int flags)
434 {
435         struct pthread_timer *timer = obj;
436
437         if (timer->state == TIMER_STATE_IDLE) {
438                 return 0;
439         }
440
441         ao2_lock(timer);
442         if (check_timer(timer)) {
443                 write_byte(timer);
444         }
445         ao2_unlock(timer);
446
447         return 0;
448 }
449
450 static void *do_timing(void *arg)
451 {
452         struct timeval next_wakeup = ast_tvnow();
453
454         while (!timing_thread.stop) {
455                 struct timespec ts = { 0, };
456
457                 ao2_callback(pthread_timers, OBJ_NODATA, run_timer, NULL);
458
459                 next_wakeup = ast_tvadd(next_wakeup, ast_tv(0, 5000));
460
461                 ts.tv_sec = next_wakeup.tv_sec;
462                 ts.tv_nsec = next_wakeup.tv_usec * 1000;
463
464                 ast_mutex_lock(&timing_thread.lock);
465                 if (!timing_thread.stop) {
466                         if (ao2_container_count(pthread_timers)) {
467                                 ast_cond_timedwait(&timing_thread.cond, &timing_thread.lock, &ts);
468                         } else {
469                                 ast_cond_wait(&timing_thread.cond, &timing_thread.lock);
470                         }
471                 }
472                 ast_mutex_unlock(&timing_thread.lock);
473         }
474
475         return NULL;
476 }
477
478 static int init_timing_thread(void)
479 {
480         ast_mutex_init(&timing_thread.lock);
481         ast_cond_init(&timing_thread.cond, NULL);
482
483         if (ast_pthread_create_background(&timing_thread.thread, NULL, do_timing, NULL)) {
484                 ast_log(LOG_ERROR, "Unable to start timing thread.\n");
485                 return -1;
486         }
487
488         return 0;
489 }
490
491 static int load_module(void)
492 {
493         if (!(pthread_timers = ao2_container_alloc(PTHREAD_TIMER_BUCKETS,
494                 pthread_timer_hash, pthread_timer_cmp))) {
495                 return AST_MODULE_LOAD_DECLINE;
496         }
497
498         if (init_timing_thread()) {
499                 ao2_ref(pthread_timers, -1);
500                 pthread_timers = NULL;
501                 return AST_MODULE_LOAD_DECLINE;
502         }
503
504         return (timing_funcs_handle = ast_register_timing_interface(&pthread_timing)) ?
505                 AST_MODULE_LOAD_SUCCESS : AST_MODULE_LOAD_DECLINE;
506 }
507
508 static int unload_module(void)
509 {
510         int res;
511
512         ast_mutex_lock(&timing_thread.lock);
513         timing_thread.stop = 1;
514         ast_cond_signal(&timing_thread.cond);
515         ast_mutex_unlock(&timing_thread.lock);
516         pthread_join(timing_thread.thread, NULL);
517
518         if (!(res = ast_unregister_timing_interface(timing_funcs_handle))) {
519                 ao2_ref(pthread_timers, -1);
520                 pthread_timers = NULL;
521         }
522
523         return res;
524 }
525 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_LOAD_ORDER, "pthread Timing Interface",
526                 .load = load_module,
527                 .unload = unload_module,
528                 .load_pri = AST_MODPRI_CHANNEL_DEPEND,
529                 );