Merge "rtp_engine/res_rtp_asterisk: Fix RTP struct reentrancy crashes."
[asterisk/asterisk.git] / res / res_timing_pthread.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2008, Digium, Inc.
5  *
6  * Russell Bryant <russell@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*!
20  * \file
21  * \author Russell Bryant <russell@digium.com>
22  *
23  * \brief pthread timing interface
24  */
25
26 /*** MODULEINFO
27         <support_level>extended</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 #include <stdbool.h>
33 #include <math.h>
34 #include <unistd.h>
35 #include <fcntl.h>
36
37 #include "asterisk/module.h"
38 #include "asterisk/timing.h"
39 #include "asterisk/utils.h"
40 #include "asterisk/astobj2.h"
41 #include "asterisk/time.h"
42 #include "asterisk/lock.h"
43
44 static void *timing_funcs_handle;
45
46 static void *pthread_timer_open(void);
47 static void pthread_timer_close(void *data);
48 static int pthread_timer_set_rate(void *data, unsigned int rate);
49 static int pthread_timer_ack(void *data, unsigned int quantity);
50 static int pthread_timer_enable_continuous(void *data);
51 static int pthread_timer_disable_continuous(void *data);
52 static enum ast_timer_event pthread_timer_get_event(void *data);
53 static unsigned int pthread_timer_get_max_rate(void *data);
54 static int pthread_timer_fd(void *data);
55
56 static struct ast_timing_interface pthread_timing = {
57         .name = "pthread",
58         .priority = 0, /* use this as a last resort */
59         .timer_open = pthread_timer_open,
60         .timer_close = pthread_timer_close,
61         .timer_set_rate = pthread_timer_set_rate,
62         .timer_ack = pthread_timer_ack,
63         .timer_enable_continuous = pthread_timer_enable_continuous,
64         .timer_disable_continuous = pthread_timer_disable_continuous,
65         .timer_get_event = pthread_timer_get_event,
66         .timer_get_max_rate = pthread_timer_get_max_rate,
67         .timer_fd = pthread_timer_fd,
68 };
69
70 /* 1 tick / 10 ms */
71 #define MAX_RATE 100
72
73 static struct ao2_container *pthread_timers;
74 #define PTHREAD_TIMER_BUCKETS 563
75
76 enum {
77         PIPE_READ =  0,
78         PIPE_WRITE = 1
79 };
80
81 enum pthread_timer_state {
82         TIMER_STATE_IDLE,
83         TIMER_STATE_TICKING,
84 };
85
86 struct pthread_timer {
87         int pipe[2];
88         enum pthread_timer_state state;
89         unsigned int rate;
90         /*! Interval in ms for current rate */
91         unsigned int interval;
92         unsigned int tick_count;
93         unsigned int pending_ticks;
94         struct timeval start;
95         bool continuous:1;
96         bool pipe_signaled:1;
97 };
98
99 static void pthread_timer_destructor(void *obj);
100 static void signal_pipe(struct pthread_timer *timer);
101 static void unsignal_pipe(struct pthread_timer *timer);
102 static void ack_ticks(struct pthread_timer *timer, unsigned int num);
103
104 /*!
105  * \brief Data for the timing thread
106  */
107 static struct {
108         pthread_t thread;
109         ast_mutex_t lock;
110         ast_cond_t cond;
111         unsigned int stop:1;
112 } timing_thread;
113
114 static void *pthread_timer_open(void)
115 {
116         struct pthread_timer *timer;
117         int i;
118
119         if (!(timer = ao2_alloc(sizeof(*timer), pthread_timer_destructor))) {
120                 errno = ENOMEM;
121                 return NULL;
122         }
123
124         timer->pipe[PIPE_READ] = timer->pipe[PIPE_WRITE] = -1;
125         timer->state = TIMER_STATE_IDLE;
126
127         if (pipe(timer->pipe)) {
128                 ao2_ref(timer, -1);
129                 return NULL;
130         }
131
132         for (i = 0; i < ARRAY_LEN(timer->pipe); ++i) {
133                 int flags = fcntl(timer->pipe[i], F_GETFL);
134                 flags |= O_NONBLOCK;
135                 fcntl(timer->pipe[i], F_SETFL, flags);
136         }
137         
138         ao2_lock(pthread_timers);
139         if (!ao2_container_count(pthread_timers)) {
140                 ast_mutex_lock(&timing_thread.lock);
141                 ast_cond_signal(&timing_thread.cond);
142                 ast_mutex_unlock(&timing_thread.lock);
143         }
144         ao2_link_flags(pthread_timers, timer, OBJ_NOLOCK);
145         ao2_unlock(pthread_timers);
146
147         return timer;
148 }
149
150 static void pthread_timer_close(void *data)
151 {
152         struct pthread_timer *timer = data;
153
154         ao2_unlink(pthread_timers, timer);
155         ao2_ref(timer, -1);
156 }
157
158 static int pthread_timer_set_rate(void *data, unsigned int rate)
159 {
160         struct pthread_timer *timer = data;
161
162         if (rate > MAX_RATE) {
163                 ast_log(LOG_ERROR, "res_timing_pthread only supports timers at a "
164                                 "max rate of %d / sec\n", MAX_RATE);
165                 errno = EINVAL;
166                 return -1;
167         }
168
169         ao2_lock(timer);
170
171         if ((timer->rate = rate)) {
172                 timer->interval = roundf(1000.0 / ((float) rate));
173                 timer->start = ast_tvnow();
174                 timer->state = TIMER_STATE_TICKING;
175         } else {
176                 timer->interval = 0;
177                 timer->start = ast_tv(0, 0);
178                 timer->state = TIMER_STATE_IDLE;
179         }
180         timer->tick_count = 0;
181
182         ao2_unlock(timer);
183
184         return 0;
185 }
186
187 static int pthread_timer_ack(void *data, unsigned int quantity)
188 {
189         struct pthread_timer *timer = data;
190
191         ast_assert(quantity > 0);
192
193         ao2_lock(timer);
194         ack_ticks(timer, quantity);
195         ao2_unlock(timer);
196
197         return 0;
198 }
199
200 static int pthread_timer_enable_continuous(void *data)
201 {
202         struct pthread_timer *timer = data;
203
204         ao2_lock(timer);
205         if (!timer->continuous) {
206                 timer->continuous = true;
207                 signal_pipe(timer);
208         }
209         ao2_unlock(timer);
210
211         return 0;
212 }
213
214 static int pthread_timer_disable_continuous(void *data)
215 {
216         struct pthread_timer *timer = data;
217
218         ao2_lock(timer);
219         if (timer->continuous) {
220                 timer->continuous = false;
221                 unsignal_pipe(timer);
222         }
223         ao2_unlock(timer);
224
225         return 0;
226 }
227
228 static enum ast_timer_event pthread_timer_get_event(void *data)
229 {
230         struct pthread_timer *timer = data;
231         enum ast_timer_event res = AST_TIMING_EVENT_EXPIRED;
232
233         ao2_lock(timer);
234         if (timer->continuous) {
235                 res = AST_TIMING_EVENT_CONTINUOUS;
236         }
237         ao2_unlock(timer);
238
239         return res;
240 }
241
242 static unsigned int pthread_timer_get_max_rate(void *data)
243 {
244         return MAX_RATE;
245 }
246
247 static int pthread_timer_fd(void *data)
248 {
249         struct pthread_timer *timer = data;
250
251         return timer->pipe[PIPE_READ];
252 }
253
254 static void pthread_timer_destructor(void *obj)
255 {
256         struct pthread_timer *timer = obj;
257
258         if (timer->pipe[PIPE_READ] > -1) {
259                 close(timer->pipe[PIPE_READ]);
260                 timer->pipe[PIPE_READ] = -1;
261         }
262
263         if (timer->pipe[PIPE_WRITE] > -1) {
264                 close(timer->pipe[PIPE_WRITE]);
265                 timer->pipe[PIPE_WRITE] = -1;
266         }
267 }
268
269 /*!
270  * \note only PIPE_READ is guaranteed valid
271  */
272 static int pthread_timer_hash(const void *obj, const int flags)
273 {
274         const struct pthread_timer *timer = obj;
275
276         return timer->pipe[PIPE_READ];
277 }
278
279 /*!
280  * \note only PIPE_READ is guaranteed valid
281  */
282 static int pthread_timer_cmp(void *obj, void *arg, int flags)
283 {
284         struct pthread_timer *timer1 = obj, *timer2 = arg;
285
286         return (timer1->pipe[PIPE_READ] == timer2->pipe[PIPE_READ]) ? CMP_MATCH | CMP_STOP : 0;
287 }
288
289 /*!
290  * \retval 0 no timer tick needed
291  * \retval non-zero write to the timing pipe needed
292  */
293 static int check_timer(struct pthread_timer *timer)
294 {
295         struct timeval now;
296
297         if (timer->state == TIMER_STATE_IDLE) {
298                 return 0;
299         }
300
301         now = ast_tvnow();
302
303         if (timer->tick_count < (ast_tvdiff_ms(now, timer->start) / timer->interval)) {
304                 timer->tick_count++;
305                 if (!timer->tick_count) {
306                         /* Handle overflow. */
307                         timer->start = now;
308                 }
309                 return 1;
310         }
311
312         return 0;
313 }
314
315 /*!
316  * \internal
317  * \pre timer is locked
318  */
319 static void ack_ticks(struct pthread_timer *timer, unsigned int quantity)
320 {
321         int pending_ticks = timer->pending_ticks;
322
323         ast_assert(quantity);
324
325         if (quantity > pending_ticks) {
326                 quantity = pending_ticks;
327         }
328
329         if (!quantity) {
330                 return;
331         }
332
333         timer->pending_ticks -= quantity;
334
335         if ((0 == timer->pending_ticks) && !timer->continuous) {
336                 unsignal_pipe(timer);
337         }
338 }
339
340 /*!
341  * \internal
342  * \pre timer is locked
343  */
344 static void signal_pipe(struct pthread_timer *timer)
345 {
346         ssize_t res;
347         unsigned char x = 42;
348
349         if (timer->pipe_signaled) {
350                 return;
351         }
352
353         res = write(timer->pipe[PIPE_WRITE], &x, 1);
354         if (-1 == res) {
355                 ast_log(LOG_ERROR, "Error writing to timing pipe: %s\n",
356                                 strerror(errno));
357         } else {
358                 timer->pipe_signaled = true;
359         }
360 }
361
362 /*!
363  * \internal
364  * \pre timer is locked
365  */
366 static void unsignal_pipe(struct pthread_timer *timer)
367 {
368         ssize_t res;
369         unsigned long buffer;
370
371         if (!timer->pipe_signaled) {
372                 return;
373         }
374
375         res = read(timer->pipe[PIPE_READ], &buffer, sizeof(buffer));
376         if (-1 == res) {
377                 ast_log(LOG_ERROR, "Error reading from pipe: %s\n",
378                                 strerror(errno));
379         } else {
380                 timer->pipe_signaled = false;
381         }
382 }
383
384 static int run_timer(void *obj, void *arg, int flags)
385 {
386         struct pthread_timer *timer = obj;
387
388         if (timer->state == TIMER_STATE_IDLE) {
389                 return 0;
390         }
391
392         ao2_lock(timer);
393         if (check_timer(timer)) {
394                 timer->pending_ticks++;
395                 signal_pipe(timer);
396         }
397         ao2_unlock(timer);
398
399         return 0;
400 }
401
402 static void *do_timing(void *arg)
403 {
404         struct timeval next_wakeup = ast_tvnow();
405
406         while (!timing_thread.stop) {
407                 struct timespec ts = { 0, };
408
409                 ao2_callback(pthread_timers, OBJ_NODATA, run_timer, NULL);
410
411                 next_wakeup = ast_tvadd(next_wakeup, ast_tv(0, 5000));
412
413                 ts.tv_sec = next_wakeup.tv_sec;
414                 ts.tv_nsec = next_wakeup.tv_usec * 1000;
415
416                 ast_mutex_lock(&timing_thread.lock);
417                 if (!timing_thread.stop) {
418                         if (ao2_container_count(pthread_timers)) {
419                                 ast_cond_timedwait(&timing_thread.cond, &timing_thread.lock, &ts);
420                         } else {
421                                 ast_cond_wait(&timing_thread.cond, &timing_thread.lock);
422                         }
423                 }
424                 ast_mutex_unlock(&timing_thread.lock);
425         }
426
427         return NULL;
428 }
429
430 static int init_timing_thread(void)
431 {
432         ast_mutex_init(&timing_thread.lock);
433         ast_cond_init(&timing_thread.cond, NULL);
434
435         if (ast_pthread_create_background(&timing_thread.thread, NULL, do_timing, NULL)) {
436                 ast_log(LOG_ERROR, "Unable to start timing thread.\n");
437                 return -1;
438         }
439
440         return 0;
441 }
442
443 static int load_module(void)
444 {
445         if (!(pthread_timers = ao2_container_alloc(PTHREAD_TIMER_BUCKETS,
446                 pthread_timer_hash, pthread_timer_cmp))) {
447                 return AST_MODULE_LOAD_DECLINE;
448         }
449
450         if (init_timing_thread()) {
451                 ao2_ref(pthread_timers, -1);
452                 pthread_timers = NULL;
453                 return AST_MODULE_LOAD_DECLINE;
454         }
455
456         return (timing_funcs_handle = ast_register_timing_interface(&pthread_timing)) ?
457                 AST_MODULE_LOAD_SUCCESS : AST_MODULE_LOAD_DECLINE;
458 }
459
460 static int unload_module(void)
461 {
462         int res;
463
464         ast_mutex_lock(&timing_thread.lock);
465         timing_thread.stop = 1;
466         ast_cond_signal(&timing_thread.cond);
467         ast_mutex_unlock(&timing_thread.lock);
468         pthread_join(timing_thread.thread, NULL);
469
470         if (!(res = ast_unregister_timing_interface(timing_funcs_handle))) {
471                 ao2_ref(pthread_timers, -1);
472                 pthread_timers = NULL;
473         }
474
475         return res;
476 }
477 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_LOAD_ORDER, "pthread Timing Interface",
478         .support_level = AST_MODULE_SUPPORT_EXTENDED,
479         .load = load_module,
480         .unload = unload_module,
481         .load_pri = AST_MODPRI_TIMING,
482 );