module load priority
[asterisk/asterisk.git] / res / res_timing_pthread.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2008, Digium, Inc.
5  *
6  * Russell Bryant <russell@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*!
20  * \file
21  * \author Russell Bryant <russell@digium.com>
22  *
23  * \brief pthread timing interface
24  */
25
26 #include "asterisk.h"
27
28 ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
29
30 #include <math.h>
31 #include <sys/select.h>
32
33 #include "asterisk/module.h"
34 #include "asterisk/timing.h"
35 #include "asterisk/utils.h"
36 #include "asterisk/astobj2.h"
37 #include "asterisk/time.h"
38 #include "asterisk/lock.h"
39
40 static void *timing_funcs_handle;
41
42 static int pthread_timer_open(void);
43 static void pthread_timer_close(int handle);
44 static int pthread_timer_set_rate(int handle, unsigned int rate);
45 static void pthread_timer_ack(int handle, unsigned int quantity);
46 static int pthread_timer_enable_continuous(int handle);
47 static int pthread_timer_disable_continuous(int handle);
48 static enum ast_timer_event pthread_timer_get_event(int handle);
49 static unsigned int pthread_timer_get_max_rate(int handle);
50
51 static struct ast_timing_interface pthread_timing = {
52         .name = "pthread",
53         .priority = 0, /* use this as a last resort */
54         .timer_open = pthread_timer_open,
55         .timer_close = pthread_timer_close,
56         .timer_set_rate = pthread_timer_set_rate,
57         .timer_ack = pthread_timer_ack,
58         .timer_enable_continuous = pthread_timer_enable_continuous,
59         .timer_disable_continuous = pthread_timer_disable_continuous,
60         .timer_get_event = pthread_timer_get_event,
61         .timer_get_max_rate = pthread_timer_get_max_rate,
62 };
63
64 /* 1 tick / 10 ms */
65 #define MAX_RATE 100
66
67 static struct ao2_container *pthread_timers;
68 #define PTHREAD_TIMER_BUCKETS 563
69
70 enum {
71         PIPE_READ =  0,
72         PIPE_WRITE = 1
73 };
74
75 enum pthread_timer_state {
76         TIMER_STATE_IDLE,
77         TIMER_STATE_TICKING,
78 };
79
80 struct pthread_timer {
81         int pipe[2];
82         enum pthread_timer_state state;
83         unsigned int rate;
84         /*! Interval in ms for current rate */
85         unsigned int interval;
86         unsigned int tick_count;
87         unsigned int pending_ticks;
88         struct timeval start;
89         unsigned int continuous:1;
90 };
91
92 static void pthread_timer_destructor(void *obj);
93 static struct pthread_timer *find_timer(int handle, int unlinkobj);
94 static void write_byte(struct pthread_timer *timer);
95 static void read_pipe(struct pthread_timer *timer, unsigned int num);
96
97 /*!
98  * \brief Data for the timing thread
99  */
100 static struct {
101         pthread_t thread;
102         ast_mutex_t lock;
103         ast_cond_t cond;
104         unsigned int stop:1;
105 } timing_thread;
106
107 static int pthread_timer_open(void)
108 {
109         struct pthread_timer *timer;
110         int fd;
111
112         if (!(timer = ao2_alloc(sizeof(*timer), pthread_timer_destructor))) {
113                 errno = ENOMEM;
114                 return -1;
115         }
116
117         timer->pipe[PIPE_READ] = timer->pipe[PIPE_WRITE] = -1;
118         timer->state = TIMER_STATE_IDLE;
119
120         if (pipe(timer->pipe)) {
121                 ao2_ref(timer, -1);
122                 return -1;
123         }
124
125         ao2_lock(pthread_timers);
126         if (!ao2_container_count(pthread_timers)) {
127                 ast_mutex_lock(&timing_thread.lock);
128                 ast_cond_signal(&timing_thread.cond);
129                 ast_mutex_unlock(&timing_thread.lock);
130         }
131         ao2_link(pthread_timers, timer);
132         ao2_unlock(pthread_timers);
133
134         fd = timer->pipe[PIPE_READ];
135
136         ao2_ref(timer, -1);
137
138         return fd;
139 }
140
141 static void pthread_timer_close(int handle)
142 {
143         struct pthread_timer *timer;
144
145         if (!(timer = find_timer(handle, 1))) {
146                 return;
147         }
148
149         ao2_ref(timer, -1);
150 }
151
152 static int pthread_timer_set_rate(int handle, unsigned int rate)
153 {
154         struct pthread_timer *timer;
155
156         if (!(timer = find_timer(handle, 0))) {
157                 errno = EINVAL;
158                 return -1;
159         }
160
161         if (rate > MAX_RATE) {
162                 ast_log(LOG_ERROR, "res_timing_pthread only supports timers at a "
163                                 "max rate of %d / sec\n", MAX_RATE);
164                 errno = EINVAL;
165                 return -1;
166         }
167
168         ao2_lock(timer);
169
170         if ((timer->rate = rate)) {
171                 timer->interval = roundf(1000.0 / ((float) rate));
172                 timer->start = ast_tvnow();
173                 timer->state = TIMER_STATE_TICKING;
174         } else {
175                 timer->interval = 0;
176                 timer->start = ast_tv(0, 0);
177                 timer->state = TIMER_STATE_IDLE;
178         }
179         timer->tick_count = 0;
180
181         ao2_unlock(timer);
182
183         ao2_ref(timer, -1);
184
185         return 0;
186 }
187
188 static void pthread_timer_ack(int handle, unsigned int quantity)
189 {
190         struct pthread_timer *timer;
191
192         ast_assert(quantity > 0);
193
194         if (!(timer = find_timer(handle, 0))) {
195                 return;
196         }
197
198         ao2_lock(timer);
199         read_pipe(timer, quantity);
200         ao2_unlock(timer);
201
202         ao2_ref(timer, -1);
203 }
204
205 static int pthread_timer_enable_continuous(int handle)
206 {
207         struct pthread_timer *timer;
208
209         if (!(timer = find_timer(handle, 0))) {
210                 errno = EINVAL;
211                 return -1;
212         }
213
214         ao2_lock(timer);
215         if (!timer->continuous) {
216                 timer->continuous = 1;
217                 write_byte(timer);
218         }
219         ao2_unlock(timer);
220
221         ao2_ref(timer, -1);
222
223         return 0;
224 }
225
226 static int pthread_timer_disable_continuous(int handle)
227 {
228         struct pthread_timer *timer;
229
230         if (!(timer = find_timer(handle, 0))) {
231                 errno = EINVAL;
232                 return -1;
233         }
234
235         ao2_lock(timer);
236         if (timer->continuous) {
237                 timer->continuous = 0;
238                 read_pipe(timer, 1);
239         }
240         ao2_unlock(timer);
241
242         ao2_ref(timer, -1);
243
244         return 0;
245 }
246
247 static enum ast_timer_event pthread_timer_get_event(int handle)
248 {
249         struct pthread_timer *timer;
250         enum ast_timer_event res = AST_TIMING_EVENT_EXPIRED;
251
252         if (!(timer = find_timer(handle, 0))) {
253                 return res;
254         }
255
256         ao2_lock(timer);
257         if (timer->continuous && timer->pending_ticks == 1) {
258                 res = AST_TIMING_EVENT_CONTINUOUS;
259         }
260         ao2_unlock(timer);
261
262         ao2_ref(timer, -1);
263
264         return res;
265 }
266
267 static unsigned int pthread_timer_get_max_rate(int handle)
268 {
269         return MAX_RATE;
270 }
271
272 static struct pthread_timer *find_timer(int handle, int unlinkobj)
273 {
274         struct pthread_timer *timer;
275         struct pthread_timer tmp_timer;
276         int flags = OBJ_POINTER;
277
278         tmp_timer.pipe[PIPE_READ] = handle;
279
280         if (unlinkobj) {
281                 flags |= OBJ_UNLINK;
282         }
283
284         if (!(timer = ao2_find(pthread_timers, &tmp_timer, flags))) {
285                 ast_assert(timer != NULL);
286                 return NULL;
287         }
288
289         return timer;
290 }
291
292 static void pthread_timer_destructor(void *obj)
293 {
294         struct pthread_timer *timer = obj;
295
296         if (timer->pipe[PIPE_READ] > -1) {
297                 close(timer->pipe[PIPE_READ]);
298                 timer->pipe[PIPE_READ] = -1;
299         }
300
301         if (timer->pipe[PIPE_WRITE] > -1) {
302                 close(timer->pipe[PIPE_WRITE]);
303                 timer->pipe[PIPE_WRITE] = -1;
304         }
305 }
306
307 /*!
308  * \note only PIPE_READ is guaranteed valid
309  */
310 static int pthread_timer_hash(const void *obj, const int flags)
311 {
312         const struct pthread_timer *timer = obj;
313
314         return timer->pipe[PIPE_READ];
315 }
316
317 /*!
318  * \note only PIPE_READ is guaranteed valid
319  */
320 static int pthread_timer_cmp(void *obj, void *arg, int flags)
321 {
322         struct pthread_timer *timer1 = obj, *timer2 = arg;
323
324         return (timer1->pipe[PIPE_READ] == timer2->pipe[PIPE_READ]) ? CMP_MATCH | CMP_STOP : 0;
325 }
326
327 /*!
328  * \retval 0 no timer tick needed
329  * \retval non-zero write to the timing pipe needed
330  */
331 static int check_timer(struct pthread_timer *timer)
332 {
333         struct timeval now;
334
335         if (timer->state == TIMER_STATE_IDLE) {
336                 return 0;
337         }
338
339         now = ast_tvnow();
340
341         if (timer->tick_count < (ast_tvdiff_ms(now, timer->start) / timer->interval)) {
342                 timer->tick_count++;
343                 if (!timer->tick_count) {
344                         /* Handle overflow. */
345                         timer->start = now;
346                 }
347                 return 1;
348         }
349
350         return 0;
351 }
352
353 /*!
354  * \internal
355  * \pre timer is locked
356  */
357 static void read_pipe(struct pthread_timer *timer, unsigned int quantity)
358 {
359         int rd_fd = timer->pipe[PIPE_READ];
360         int pending_ticks = timer->pending_ticks;
361
362         ast_assert(quantity);
363
364         if (timer->continuous && pending_ticks) {
365                 pending_ticks--;
366         }
367
368         if (quantity > pending_ticks) {
369                 quantity = pending_ticks;
370         }
371
372         if (!quantity) {
373                 return;
374         }
375
376         do {
377                 unsigned char buf[1024];
378                 ssize_t res;
379                 fd_set rfds;
380                 struct timeval timeout = {
381                         .tv_sec = 0,
382                 };
383
384                 /* Make sure there is data to read */
385                 FD_ZERO(&rfds);
386                 FD_SET(rd_fd, &rfds);
387
388                 if (select(rd_fd + 1, &rfds, NULL, NULL, &timeout) != 1) {
389                         ast_debug(1, "Reading not available on timing pipe, "
390                                         "quantity: %u\n", quantity);
391                         break;
392                 }
393
394                 res = read(rd_fd, buf,
395                         (quantity < sizeof(buf)) ? quantity : sizeof(buf));
396
397                 if (res == -1) {
398                         if (errno == EAGAIN) {
399                                 continue;
400                         }
401                         ast_log(LOG_ERROR, "read failed on timing pipe: %s\n",
402                                         strerror(errno));
403                         break;
404                 }
405
406                 quantity -= res;
407                 timer->pending_ticks -= res;
408         } while (quantity);
409 }
410
411 /*!
412  * \internal
413  * \pre timer is locked
414  */
415 static void write_byte(struct pthread_timer *timer)
416 {
417         ssize_t res;
418         unsigned char x = 42;
419
420         do {
421                 res = write(timer->pipe[PIPE_WRITE], &x, 1);
422         } while (res == -1 && errno == EAGAIN);
423
424         if (res == -1) {
425                 ast_log(LOG_ERROR, "Error writing to timing pipe: %s\n",
426                                 strerror(errno));
427         } else {
428                 timer->pending_ticks++;
429         }
430 }
431
432 static int run_timer(void *obj, void *arg, int flags)
433 {
434         struct pthread_timer *timer = obj;
435
436         if (timer->state == TIMER_STATE_IDLE) {
437                 return 0;
438         }
439
440         ao2_lock(timer);
441         if (check_timer(timer)) {
442                 write_byte(timer);
443         }
444         ao2_unlock(timer);
445
446         return 0;
447 }
448
449 static void *do_timing(void *arg)
450 {
451         struct timeval next_wakeup = ast_tvnow();
452
453         while (!timing_thread.stop) {
454                 struct timespec ts = { 0, };
455
456                 ao2_callback(pthread_timers, OBJ_NODATA, run_timer, NULL);
457
458                 next_wakeup = ast_tvadd(next_wakeup, ast_tv(0, 5000));
459
460                 ts.tv_sec = next_wakeup.tv_sec;
461                 ts.tv_nsec = next_wakeup.tv_usec * 1000;
462
463                 ast_mutex_lock(&timing_thread.lock);
464                 if (!timing_thread.stop) {
465                         if (ao2_container_count(pthread_timers)) {
466                                 ast_cond_timedwait(&timing_thread.cond, &timing_thread.lock, &ts);
467                         } else {
468                                 ast_cond_wait(&timing_thread.cond, &timing_thread.lock);
469                         }
470                 }
471                 ast_mutex_unlock(&timing_thread.lock);
472         }
473
474         return NULL;
475 }
476
477 static int init_timing_thread(void)
478 {
479         ast_mutex_init(&timing_thread.lock);
480         ast_cond_init(&timing_thread.cond, NULL);
481
482         if (ast_pthread_create_background(&timing_thread.thread, NULL, do_timing, NULL)) {
483                 ast_log(LOG_ERROR, "Unable to start timing thread.\n");
484                 return -1;
485         }
486
487         return 0;
488 }
489
490 static int load_module(void)
491 {
492         if (!(pthread_timers = ao2_container_alloc(PTHREAD_TIMER_BUCKETS,
493                 pthread_timer_hash, pthread_timer_cmp))) {
494                 return AST_MODULE_LOAD_DECLINE;
495         }
496
497         if (init_timing_thread()) {
498                 ao2_ref(pthread_timers, -1);
499                 pthread_timers = NULL;
500                 return AST_MODULE_LOAD_DECLINE;
501         }
502
503         return (timing_funcs_handle = ast_register_timing_interface(&pthread_timing)) ?
504                 AST_MODULE_LOAD_SUCCESS : AST_MODULE_LOAD_DECLINE;
505 }
506
507 static int unload_module(void)
508 {
509         int res;
510
511         ast_mutex_lock(&timing_thread.lock);
512         timing_thread.stop = 1;
513         ast_cond_signal(&timing_thread.cond);
514         ast_mutex_unlock(&timing_thread.lock);
515         pthread_join(timing_thread.thread, NULL);
516
517         if (!(res = ast_unregister_timing_interface(timing_funcs_handle))) {
518                 ao2_ref(pthread_timers, -1);
519                 pthread_timers = NULL;
520         }
521
522         return res;
523 }
524 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_LOAD_ORDER, "pthread Timing Interface",
525                 .load = load_module,
526                 .unload = unload_module,
527                 .load_pri = 10,
528                 );