Use poll() instead of select() in res_timing_pthread to avoid stack corruption.
[asterisk/asterisk.git] / res / res_timing_pthread.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2008, Digium, Inc.
5  *
6  * Russell Bryant <russell@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*!
20  * \file
21  * \author Russell Bryant <russell@digium.com>
22  *
23  * \brief pthread timing interface
24  */
25
26 #include "asterisk.h"
27
28 ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
29
30 #include <math.h>
31 #include <sys/select.h>
32
33 #include "asterisk/module.h"
34 #include "asterisk/timing.h"
35 #include "asterisk/utils.h"
36 #include "asterisk/astobj2.h"
37 #include "asterisk/time.h"
38 #include "asterisk/lock.h"
39 #include "asterisk/poll-compat.h"
40
41 static void *timing_funcs_handle;
42
43 static int pthread_timer_open(void);
44 static void pthread_timer_close(int handle);
45 static int pthread_timer_set_rate(int handle, unsigned int rate);
46 static void pthread_timer_ack(int handle, unsigned int quantity);
47 static int pthread_timer_enable_continuous(int handle);
48 static int pthread_timer_disable_continuous(int handle);
49 static enum ast_timer_event pthread_timer_get_event(int handle);
50 static unsigned int pthread_timer_get_max_rate(int handle);
51
52 static struct ast_timing_interface pthread_timing = {
53         .name = "pthread",
54         .priority = 0, /* use this as a last resort */
55         .timer_open = pthread_timer_open,
56         .timer_close = pthread_timer_close,
57         .timer_set_rate = pthread_timer_set_rate,
58         .timer_ack = pthread_timer_ack,
59         .timer_enable_continuous = pthread_timer_enable_continuous,
60         .timer_disable_continuous = pthread_timer_disable_continuous,
61         .timer_get_event = pthread_timer_get_event,
62         .timer_get_max_rate = pthread_timer_get_max_rate,
63 };
64
65 /* 1 tick / 10 ms */
66 #define MAX_RATE 100
67
68 static struct ao2_container *pthread_timers;
69 #define PTHREAD_TIMER_BUCKETS 563
70
71 enum {
72         PIPE_READ =  0,
73         PIPE_WRITE = 1
74 };
75
76 enum pthread_timer_state {
77         TIMER_STATE_IDLE,
78         TIMER_STATE_TICKING,
79 };
80
81 struct pthread_timer {
82         int pipe[2];
83         enum pthread_timer_state state;
84         unsigned int rate;
85         /*! Interval in ms for current rate */
86         unsigned int interval;
87         unsigned int tick_count;
88         unsigned int pending_ticks;
89         struct timeval start;
90         unsigned int continuous:1;
91 };
92
93 static void pthread_timer_destructor(void *obj);
94 static struct pthread_timer *find_timer(int handle, int unlinkobj);
95 static void write_byte(struct pthread_timer *timer);
96 static void read_pipe(struct pthread_timer *timer, unsigned int num);
97
98 /*!
99  * \brief Data for the timing thread
100  */
101 static struct {
102         pthread_t thread;
103         ast_mutex_t lock;
104         ast_cond_t cond;
105         unsigned int stop:1;
106 } timing_thread;
107
108 static int pthread_timer_open(void)
109 {
110         struct pthread_timer *timer;
111         int fd;
112
113         if (!(timer = ao2_alloc(sizeof(*timer), pthread_timer_destructor))) {
114                 errno = ENOMEM;
115                 return -1;
116         }
117
118         timer->pipe[PIPE_READ] = timer->pipe[PIPE_WRITE] = -1;
119         timer->state = TIMER_STATE_IDLE;
120
121         if (pipe(timer->pipe)) {
122                 ao2_ref(timer, -1);
123                 return -1;
124         }
125
126         ao2_lock(pthread_timers);
127         if (!ao2_container_count(pthread_timers)) {
128                 ast_mutex_lock(&timing_thread.lock);
129                 ast_cond_signal(&timing_thread.cond);
130                 ast_mutex_unlock(&timing_thread.lock);
131         }
132         ao2_link(pthread_timers, timer);
133         ao2_unlock(pthread_timers);
134
135         fd = timer->pipe[PIPE_READ];
136
137         ao2_ref(timer, -1);
138
139         return fd;
140 }
141
142 static void pthread_timer_close(int handle)
143 {
144         struct pthread_timer *timer;
145
146         if (!(timer = find_timer(handle, 1))) {
147                 return;
148         }
149
150         ao2_ref(timer, -1);
151 }
152
153 static int pthread_timer_set_rate(int handle, unsigned int rate)
154 {
155         struct pthread_timer *timer;
156
157         if (!(timer = find_timer(handle, 0))) {
158                 errno = EINVAL;
159                 return -1;
160         }
161
162         if (rate > MAX_RATE) {
163                 ast_log(LOG_ERROR, "res_timing_pthread only supports timers at a "
164                                 "max rate of %d / sec\n", MAX_RATE);
165                 errno = EINVAL;
166                 return -1;
167         }
168
169         ao2_lock(timer);
170
171         if ((timer->rate = rate)) {
172                 timer->interval = roundf(1000.0 / ((float) rate));
173                 timer->start = ast_tvnow();
174                 timer->state = TIMER_STATE_TICKING;
175         } else {
176                 timer->interval = 0;
177                 timer->start = ast_tv(0, 0);
178                 timer->state = TIMER_STATE_IDLE;
179         }
180         timer->tick_count = 0;
181
182         ao2_unlock(timer);
183
184         ao2_ref(timer, -1);
185
186         return 0;
187 }
188
189 static void pthread_timer_ack(int handle, unsigned int quantity)
190 {
191         struct pthread_timer *timer;
192
193         ast_assert(quantity > 0);
194
195         if (!(timer = find_timer(handle, 0))) {
196                 return;
197         }
198
199         ao2_lock(timer);
200         read_pipe(timer, quantity);
201         ao2_unlock(timer);
202
203         ao2_ref(timer, -1);
204 }
205
206 static int pthread_timer_enable_continuous(int handle)
207 {
208         struct pthread_timer *timer;
209
210         if (!(timer = find_timer(handle, 0))) {
211                 errno = EINVAL;
212                 return -1;
213         }
214
215         ao2_lock(timer);
216         if (!timer->continuous) {
217                 timer->continuous = 1;
218                 write_byte(timer);
219         }
220         ao2_unlock(timer);
221
222         ao2_ref(timer, -1);
223
224         return 0;
225 }
226
227 static int pthread_timer_disable_continuous(int handle)
228 {
229         struct pthread_timer *timer;
230
231         if (!(timer = find_timer(handle, 0))) {
232                 errno = EINVAL;
233                 return -1;
234         }
235
236         ao2_lock(timer);
237         if (timer->continuous) {
238                 timer->continuous = 0;
239                 read_pipe(timer, 1);
240         }
241         ao2_unlock(timer);
242
243         ao2_ref(timer, -1);
244
245         return 0;
246 }
247
248 static enum ast_timer_event pthread_timer_get_event(int handle)
249 {
250         struct pthread_timer *timer;
251         enum ast_timer_event res = AST_TIMING_EVENT_EXPIRED;
252
253         if (!(timer = find_timer(handle, 0))) {
254                 return res;
255         }
256
257         ao2_lock(timer);
258         if (timer->continuous && timer->pending_ticks == 1) {
259                 res = AST_TIMING_EVENT_CONTINUOUS;
260         }
261         ao2_unlock(timer);
262
263         ao2_ref(timer, -1);
264
265         return res;
266 }
267
268 static unsigned int pthread_timer_get_max_rate(int handle)
269 {
270         return MAX_RATE;
271 }
272
273 static struct pthread_timer *find_timer(int handle, int unlinkobj)
274 {
275         struct pthread_timer *timer;
276         struct pthread_timer tmp_timer;
277         int flags = OBJ_POINTER;
278
279         tmp_timer.pipe[PIPE_READ] = handle;
280
281         if (unlinkobj) {
282                 flags |= OBJ_UNLINK;
283         }
284
285         if (!(timer = ao2_find(pthread_timers, &tmp_timer, flags))) {
286                 ast_assert(timer != NULL);
287                 return NULL;
288         }
289
290         return timer;
291 }
292
293 static void pthread_timer_destructor(void *obj)
294 {
295         struct pthread_timer *timer = obj;
296
297         if (timer->pipe[PIPE_READ] > -1) {
298                 close(timer->pipe[PIPE_READ]);
299                 timer->pipe[PIPE_READ] = -1;
300         }
301
302         if (timer->pipe[PIPE_WRITE] > -1) {
303                 close(timer->pipe[PIPE_WRITE]);
304                 timer->pipe[PIPE_WRITE] = -1;
305         }
306 }
307
308 /*!
309  * \note only PIPE_READ is guaranteed valid
310  */
311 static int pthread_timer_hash(const void *obj, const int flags)
312 {
313         const struct pthread_timer *timer = obj;
314
315         return timer->pipe[PIPE_READ];
316 }
317
318 /*!
319  * \note only PIPE_READ is guaranteed valid
320  */
321 static int pthread_timer_cmp(void *obj, void *arg, int flags)
322 {
323         struct pthread_timer *timer1 = obj, *timer2 = arg;
324
325         return (timer1->pipe[PIPE_READ] == timer2->pipe[PIPE_READ]) ? CMP_MATCH | CMP_STOP : 0;
326 }
327
328 /*!
329  * \retval 0 no timer tick needed
330  * \retval non-zero write to the timing pipe needed
331  */
332 static int check_timer(struct pthread_timer *timer)
333 {
334         struct timeval now;
335
336         if (timer->state == TIMER_STATE_IDLE) {
337                 return 0;
338         }
339
340         now = ast_tvnow();
341
342         if (timer->tick_count < (ast_tvdiff_ms(now, timer->start) / timer->interval)) {
343                 timer->tick_count++;
344                 if (!timer->tick_count) {
345                         /* Handle overflow. */
346                         timer->start = now;
347                 }
348                 return 1;
349         }
350
351         return 0;
352 }
353
354 /*!
355  * \internal
356  * \pre timer is locked
357  */
358 static void read_pipe(struct pthread_timer *timer, unsigned int quantity)
359 {
360         int rd_fd = timer->pipe[PIPE_READ];
361         int pending_ticks = timer->pending_ticks;
362
363         ast_assert(quantity);
364
365         if (timer->continuous && pending_ticks) {
366                 pending_ticks--;
367         }
368
369         if (quantity > pending_ticks) {
370                 quantity = pending_ticks;
371         }
372
373         if (!quantity) {
374                 return;
375         }
376
377         do {
378                 unsigned char buf[1024];
379                 ssize_t res;
380                 struct pollfd pfd = {
381                         .fd = rd_fd,
382                         .events = POLLIN,
383                 };
384
385                 if (ast_poll(&pfd, 1, 0) != 1) {
386                         ast_debug(1, "Reading not available on timing pipe, "
387                                         "quantity: %u\n", quantity);
388                         break;
389                 }
390
391                 res = read(rd_fd, buf,
392                         (quantity < sizeof(buf)) ? quantity : sizeof(buf));
393
394                 if (res == -1) {
395                         if (errno == EAGAIN) {
396                                 continue;
397                         }
398                         ast_log(LOG_ERROR, "read failed on timing pipe: %s\n",
399                                         strerror(errno));
400                         break;
401                 }
402
403                 quantity -= res;
404                 timer->pending_ticks -= res;
405         } while (quantity);
406 }
407
408 /*!
409  * \internal
410  * \pre timer is locked
411  */
412 static void write_byte(struct pthread_timer *timer)
413 {
414         ssize_t res;
415         unsigned char x = 42;
416
417         do {
418                 res = write(timer->pipe[PIPE_WRITE], &x, 1);
419         } while (res == -1 && errno == EAGAIN);
420
421         if (res == -1) {
422                 ast_log(LOG_ERROR, "Error writing to timing pipe: %s\n",
423                                 strerror(errno));
424         } else {
425                 timer->pending_ticks++;
426         }
427 }
428
429 static int run_timer(void *obj, void *arg, int flags)
430 {
431         struct pthread_timer *timer = obj;
432
433         if (timer->state == TIMER_STATE_IDLE) {
434                 return 0;
435         }
436
437         ao2_lock(timer);
438         if (check_timer(timer)) {
439                 write_byte(timer);
440         }
441         ao2_unlock(timer);
442
443         return 0;
444 }
445
446 static void *do_timing(void *arg)
447 {
448         struct timeval next_wakeup = ast_tvnow();
449
450         while (!timing_thread.stop) {
451                 struct timespec ts = { 0, };
452
453                 ao2_callback(pthread_timers, OBJ_NODATA, run_timer, NULL);
454
455                 next_wakeup = ast_tvadd(next_wakeup, ast_tv(0, 5000));
456
457                 ts.tv_sec = next_wakeup.tv_sec;
458                 ts.tv_nsec = next_wakeup.tv_usec * 1000;
459
460                 ast_mutex_lock(&timing_thread.lock);
461                 if (!timing_thread.stop) {
462                         if (ao2_container_count(pthread_timers)) {
463                                 ast_cond_timedwait(&timing_thread.cond, &timing_thread.lock, &ts);
464                         } else {
465                                 ast_cond_wait(&timing_thread.cond, &timing_thread.lock);
466                         }
467                 }
468                 ast_mutex_unlock(&timing_thread.lock);
469         }
470
471         return NULL;
472 }
473
474 static int init_timing_thread(void)
475 {
476         ast_mutex_init(&timing_thread.lock);
477         ast_cond_init(&timing_thread.cond, NULL);
478
479         if (ast_pthread_create_background(&timing_thread.thread, NULL, do_timing, NULL)) {
480                 ast_log(LOG_ERROR, "Unable to start timing thread.\n");
481                 return -1;
482         }
483
484         return 0;
485 }
486
487 static int load_module(void)
488 {
489         if (!(pthread_timers = ao2_container_alloc(PTHREAD_TIMER_BUCKETS,
490                 pthread_timer_hash, pthread_timer_cmp))) {
491                 return AST_MODULE_LOAD_DECLINE;
492         }
493
494         if (init_timing_thread()) {
495                 ao2_ref(pthread_timers, -1);
496                 pthread_timers = NULL;
497                 return AST_MODULE_LOAD_DECLINE;
498         }
499
500         return (timing_funcs_handle = ast_register_timing_interface(&pthread_timing)) ?
501                 AST_MODULE_LOAD_SUCCESS : AST_MODULE_LOAD_DECLINE;
502 }
503
504 static int unload_module(void)
505 {
506         int res;
507
508         ast_mutex_lock(&timing_thread.lock);
509         timing_thread.stop = 1;
510         ast_cond_signal(&timing_thread.cond);
511         ast_mutex_unlock(&timing_thread.lock);
512         pthread_join(timing_thread.thread, NULL);
513
514         if (!(res = ast_unregister_timing_interface(timing_funcs_handle))) {
515                 ao2_ref(pthread_timers, -1);
516                 pthread_timers = NULL;
517         }
518
519         return res;
520 }
521 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_LOAD_ORDER, "pthread Timing Interface",
522                 .load = load_module,
523                 .unload = unload_module,
524                 .load_pri = AST_MODPRI_CHANNEL_DEPEND,
525                 );