8eea1a0f9cb064be30d2fc8d0bb505384f693cf4
[asterisk/asterisk.git] / res / res_timing_pthread.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2008, Digium, Inc.
5  *
6  * Russell Bryant <russell@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*!
20  * \file
21  * \author Russell Bryant <russell@digium.com>
22  *
23  * \brief pthread timing interface
24  */
25
26 #include "asterisk.h"
27
28 ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
29
30 #include <math.h>
31 #include <sys/select.h>
32
33 #include "asterisk/module.h"
34 #include "asterisk/timing.h"
35 #include "asterisk/utils.h"
36 #include "asterisk/astobj2.h"
37 #include "asterisk/time.h"
38 #include "asterisk/lock.h"
39
40 static void *timing_funcs_handle;
41
42 static int pthread_timer_open(void);
43 static void pthread_timer_close(int handle);
44 static int pthread_timer_set_rate(int handle, unsigned int rate);
45 static void pthread_timer_ack(int handle, unsigned int quantity);
46 static int pthread_timer_enable_continuous(int handle);
47 static int pthread_timer_disable_continuous(int handle);
48 static enum ast_timer_event pthread_timer_get_event(int handle);
49 static unsigned int pthread_timer_get_max_rate(int handle);
50
51 static struct ast_timing_interface pthread_timing = {
52         .name = "pthread",
53         .priority = 0, /* use this as a last resort */
54         .timer_open = pthread_timer_open,
55         .timer_close = pthread_timer_close,
56         .timer_set_rate = pthread_timer_set_rate,
57         .timer_ack = pthread_timer_ack,
58         .timer_enable_continuous = pthread_timer_enable_continuous,
59         .timer_disable_continuous = pthread_timer_disable_continuous,
60         .timer_get_event = pthread_timer_get_event,
61         .timer_get_max_rate = pthread_timer_get_max_rate,
62 };
63
64 /* 1 tick / 10 ms */
65 #define MAX_RATE 100
66
67 static struct ao2_container *pthread_timers;
68 #define PTHREAD_TIMER_BUCKETS 563
69
70 enum {
71         PIPE_READ =  0,
72         PIPE_WRITE = 1
73 };
74
75 enum pthread_timer_state {
76         TIMER_STATE_IDLE,
77         TIMER_STATE_TICKING,
78         TIMER_STATE_CONTINUOUS,
79 };
80
81 struct pthread_timer {
82         int pipe[2];
83         enum pthread_timer_state state;
84         unsigned int rate;
85         /*! Interval in ms for current rate */
86         unsigned int interval;
87         unsigned int tick_count;
88         struct timeval start;
89 };
90
91 static void pthread_timer_destructor(void *obj);
92 static struct pthread_timer *find_timer(int handle, int unlinkobj);
93 static void write_byte(int wr_fd);
94 static void read_pipe(int rd_fd, unsigned int num, int clear);
95
96 /*!
97  * \brief Data for the timing thread
98  */
99 static struct {
100         pthread_t thread;
101         ast_mutex_t lock;
102         ast_cond_t cond;
103         unsigned int stop:1;
104 } timing_thread;
105
106 static int pthread_timer_open(void)
107 {
108         struct pthread_timer *timer;
109         int fd;
110
111         if (!(timer = ao2_alloc(sizeof(*timer), pthread_timer_destructor))) {
112                 errno = ENOMEM;
113                 return -1;
114         }
115
116         timer->pipe[PIPE_READ] = timer->pipe[PIPE_WRITE] = -1;
117         timer->state = TIMER_STATE_IDLE;
118
119         if (pipe(timer->pipe)) {
120                 ao2_ref(timer, -1);
121                 return -1;
122         }
123
124         ao2_lock(pthread_timers);
125         if (!ao2_container_count(pthread_timers)) {
126                 ast_mutex_lock(&timing_thread.lock);
127                 ast_cond_signal(&timing_thread.cond);
128                 ast_mutex_unlock(&timing_thread.lock);
129         }
130         ao2_link(pthread_timers, timer);
131         ao2_unlock(pthread_timers);
132
133         fd = timer->pipe[PIPE_READ];
134
135         ao2_ref(timer, -1);
136
137         return fd;
138 }
139
140 static void pthread_timer_close(int handle)
141 {
142         struct pthread_timer *timer;
143
144         if (!(timer = find_timer(handle, 1))) {
145                 return;
146         }
147
148         ao2_ref(timer, -1);
149 }
150
151 static void set_state(struct pthread_timer *timer)
152 {
153         unsigned int rate = timer->rate;
154
155         if (rate) {
156                 timer->state = TIMER_STATE_TICKING;
157                 timer->interval = roundf(1000.0 / ((float) rate));
158                 timer->start = ast_tvnow();
159         } else {
160                 timer->state = TIMER_STATE_IDLE;
161                 timer->interval = 0;
162                 timer->start = ast_tv(0, 0);
163         }
164
165         timer->tick_count = 0;
166 }
167
168 static int pthread_timer_set_rate(int handle, unsigned int rate)
169 {
170         struct pthread_timer *timer;
171
172         if (!(timer = find_timer(handle, 0))) {
173                 errno = EINVAL;
174                 return -1;
175         }
176
177         if (rate > MAX_RATE) {
178                 ast_log(LOG_ERROR, "res_timing_pthread only supports timers at a max rate of %d / sec\n",
179                         MAX_RATE);
180                 errno = EINVAL;
181                 return -1;
182         }
183
184         ao2_lock(timer);
185         timer->rate = rate;
186         if (timer->state != TIMER_STATE_CONTINUOUS) {
187                 set_state(timer);
188         }
189
190         ao2_unlock(timer);
191
192         ao2_ref(timer, -1);
193
194         return 0;
195 }
196
197 static void pthread_timer_ack(int handle, unsigned int quantity)
198 {
199         struct pthread_timer *timer;
200
201         ast_assert(quantity > 0);
202
203         if (!(timer = find_timer(handle, 0))) {
204                 return;
205         }
206
207         if (timer->state == TIMER_STATE_CONTINUOUS) {
208                 /* Leave the pipe alone, please! */
209                 return;
210         }
211
212         read_pipe(timer->pipe[PIPE_READ], quantity, 0);
213
214         ao2_ref(timer, -1);
215 }
216
217 static int pthread_timer_enable_continuous(int handle)
218 {
219         struct pthread_timer *timer;
220
221         if (!(timer = find_timer(handle, 0))) {
222                 errno = EINVAL;
223                 return -1;
224         }
225
226         ao2_lock(timer);
227         timer->state = TIMER_STATE_CONTINUOUS;
228         write_byte(timer->pipe[PIPE_WRITE]);
229         ao2_unlock(timer);
230
231         ao2_ref(timer, -1);
232
233         return 0;
234 }
235
236 static int pthread_timer_disable_continuous(int handle)
237 {
238         struct pthread_timer *timer;
239
240         if (!(timer = find_timer(handle, 0))) {
241                 errno = EINVAL;
242                 return -1;
243         }
244
245         ao2_lock(timer);
246         set_state(timer);
247         read_pipe(timer->pipe[PIPE_READ], 0, 1);
248         ao2_unlock(timer);
249
250         ao2_ref(timer, -1);
251
252         return 0;
253 }
254
255 static enum ast_timer_event pthread_timer_get_event(int handle)
256 {
257         struct pthread_timer *timer;
258         enum ast_timer_event res = AST_TIMING_EVENT_EXPIRED;
259
260         if (!(timer = find_timer(handle, 0))) {
261                 return res;
262         }
263
264         if (timer->state == TIMER_STATE_CONTINUOUS) {
265                 res = AST_TIMING_EVENT_CONTINUOUS;
266         }
267
268         ao2_ref(timer, -1);
269
270         return res;
271 }
272
273 static unsigned int pthread_timer_get_max_rate(int handle)
274 {
275         return MAX_RATE;
276 }
277
278 static struct pthread_timer *find_timer(int handle, int unlinkobj)
279 {
280         struct pthread_timer *timer;
281         struct pthread_timer tmp_timer;
282         int flags = OBJ_POINTER;
283
284         tmp_timer.pipe[PIPE_READ] = handle;
285
286         if (unlinkobj) {
287                 flags |= OBJ_UNLINK;
288         }
289
290         if (!(timer = ao2_find(pthread_timers, &tmp_timer, flags))) {
291                 ast_assert(timer != NULL);
292                 return NULL;
293         }
294
295         return timer;
296 }
297
298 static void pthread_timer_destructor(void *obj)
299 {
300         struct pthread_timer *timer = obj;
301
302         if (timer->pipe[PIPE_READ] > -1) {
303                 close(timer->pipe[PIPE_READ]);
304                 timer->pipe[PIPE_READ] = -1;
305         }
306
307         if (timer->pipe[PIPE_WRITE] > -1) {
308                 close(timer->pipe[PIPE_WRITE]);
309                 timer->pipe[PIPE_WRITE] = -1;
310         }
311 }
312
313 /*!
314  * \note only PIPE_READ is guaranteed valid
315  */
316 static int pthread_timer_hash(const void *obj, const int flags)
317 {
318         const struct pthread_timer *timer = obj;
319
320         return timer->pipe[PIPE_READ];
321 }
322
323 /*!
324  * \note only PIPE_READ is guaranteed valid
325  */
326 static int pthread_timer_cmp(void *obj, void *arg, int flags)
327 {
328         struct pthread_timer *timer1 = obj, *timer2 = arg;
329
330         return (timer1->pipe[PIPE_READ] == timer2->pipe[PIPE_READ]) ? CMP_MATCH | CMP_STOP : 0;
331 }
332
333 /*!
334  * \retval 0 no timer tick needed
335  * \retval non-zero write to the timing pipe needed
336  */
337 static int check_timer(struct pthread_timer *timer)
338 {
339         struct timeval now;
340
341         if (timer->state == TIMER_STATE_IDLE || timer->state == TIMER_STATE_CONTINUOUS) {
342                 return 0;
343         }
344
345         now = ast_tvnow();
346
347         if (timer->tick_count < (ast_tvdiff_ms(now, timer->start) / timer->interval)) {
348                 timer->tick_count++;
349                 if (!timer->tick_count) {
350                         timer->start = now;
351                 }
352                 return 1;
353         }
354
355         return 0;
356 }
357
358 static void read_pipe(int rd_fd, unsigned int quantity, int clear)
359 {
360         ast_assert(quantity || clear);
361
362         if (!quantity && clear) {
363                 quantity = 1;
364         }
365
366         do {
367                 unsigned char buf[1024];
368                 ssize_t res;
369                 fd_set rfds;
370                 struct timeval timeout = {
371                         .tv_sec = 0,
372                 };
373
374                 /* Make sure there is data to read */
375                 FD_ZERO(&rfds);
376                 FD_SET(rd_fd, &rfds);
377
378                 if (select(rd_fd + 1, &rfds, NULL, NULL, &timeout) != 1) {
379                         break;
380                 }
381
382                 res = read(rd_fd, buf,
383                         (quantity < sizeof(buf)) ? quantity : sizeof(buf));
384
385                 if (res == -1) {
386                         if (errno == EAGAIN) {
387                                 continue;
388                         }
389                         ast_log(LOG_ERROR, "read failed on timing pipe: %s\n", strerror(errno));
390                         break;
391                 }
392
393                 if (clear) {
394                         continue;
395                 }
396
397                 quantity -= res;
398         } while (quantity);
399 }
400
401 static void write_byte(int wr_fd)
402 {
403         do {
404                 ssize_t res;
405                 unsigned char x = 42;
406
407                 res = write(wr_fd, &x, 1);
408
409                 if (res == -1) {
410                         if (errno == EAGAIN) {
411                                 continue;
412                         }
413                         ast_log(LOG_ERROR, "Error writing to timing pipe: %s\n", strerror(errno));
414                 }
415         } while (0);
416 }
417
418 static int run_timer(void *obj, void *arg, int flags)
419 {
420         struct pthread_timer *timer = obj;
421
422         if (timer->state == TIMER_STATE_IDLE) {
423                 return 0;
424         }
425
426         ao2_lock(timer);
427
428         if (check_timer(timer)) {
429                 write_byte(timer->pipe[PIPE_WRITE]);
430         }
431
432         ao2_unlock(timer);
433
434         return 0;
435 }
436
437 static void *do_timing(void *arg)
438 {
439         struct timeval next_wakeup = ast_tvnow();
440
441         while (!timing_thread.stop) {
442                 struct timespec ts = { 0, };
443
444                 ao2_callback(pthread_timers, OBJ_NODATA, run_timer, NULL);
445
446                 next_wakeup = ast_tvadd(next_wakeup, ast_tv(0, 5000));
447
448                 ts.tv_sec = next_wakeup.tv_sec;
449                 ts.tv_nsec = next_wakeup.tv_usec * 1000;
450
451                 ast_mutex_lock(&timing_thread.lock);
452                 if (!timing_thread.stop) {
453                         if (ao2_container_count(pthread_timers)) {
454                                 ast_cond_timedwait(&timing_thread.cond, &timing_thread.lock, &ts);
455                         } else {
456                                 ast_cond_wait(&timing_thread.cond, &timing_thread.lock);
457                         }
458                 }
459                 ast_mutex_unlock(&timing_thread.lock);
460         }
461
462         return NULL;
463 }
464
465 static int init_timing_thread(void)
466 {
467         ast_mutex_init(&timing_thread.lock);
468         ast_cond_init(&timing_thread.cond, NULL);
469
470         if (ast_pthread_create_background(&timing_thread.thread, NULL, do_timing, NULL)) {
471                 ast_log(LOG_ERROR, "Unable to start timing thread.\n");
472                 return -1;
473         }
474
475         return 0;
476 }
477
478 static int load_module(void)
479 {
480         if (!(pthread_timers = ao2_container_alloc(PTHREAD_TIMER_BUCKETS,
481                 pthread_timer_hash, pthread_timer_cmp))) {
482                 return AST_MODULE_LOAD_DECLINE;
483         }
484
485         if (init_timing_thread()) {
486                 ao2_ref(pthread_timers, -1);
487                 pthread_timers = NULL;
488                 return AST_MODULE_LOAD_DECLINE;
489         }
490
491         return (timing_funcs_handle = ast_register_timing_interface(&pthread_timing)) ?
492                 AST_MODULE_LOAD_SUCCESS : AST_MODULE_LOAD_DECLINE;
493 }
494
495 static int unload_module(void)
496 {
497         int res;
498
499         ast_mutex_lock(&timing_thread.lock);
500         timing_thread.stop = 1;
501         ast_cond_signal(&timing_thread.cond);
502         ast_mutex_unlock(&timing_thread.lock);
503         pthread_join(timing_thread.thread, NULL);
504
505         if (!(res = ast_unregister_timing_interface(timing_funcs_handle))) {
506                 ao2_ref(pthread_timers, -1);
507                 pthread_timers = NULL;
508         }
509
510         return res;
511 }
512
513 AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "pthread Timing Interface");