res_timing_pthread: Fix leaky pipes.
[asterisk/asterisk.git] / res / res_timing_pthread.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2008, Digium, Inc.
5  *
6  * Russell Bryant <russell@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*!
20  * \file
21  * \author Russell Bryant <russell@digium.com>
22  *
23  * \brief pthread timing interface
24  */
25
26 /*** MODULEINFO
27         <support_level>extended</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
33
34 #include <stdbool.h>
35 #include <math.h>
36 #include <unistd.h>
37 #include <fcntl.h>
38
39 #include "asterisk/module.h"
40 #include "asterisk/timing.h"
41 #include "asterisk/utils.h"
42 #include "asterisk/astobj2.h"
43 #include "asterisk/time.h"
44 #include "asterisk/lock.h"
45
46 static void *timing_funcs_handle;
47
48 static void *pthread_timer_open(void);
49 static void pthread_timer_close(void *data);
50 static int pthread_timer_set_rate(void *data, unsigned int rate);
51 static int pthread_timer_ack(void *data, unsigned int quantity);
52 static int pthread_timer_enable_continuous(void *data);
53 static int pthread_timer_disable_continuous(void *data);
54 static enum ast_timer_event pthread_timer_get_event(void *data);
55 static unsigned int pthread_timer_get_max_rate(void *data);
56 static int pthread_timer_fd(void *data);
57
58 static struct ast_timing_interface pthread_timing = {
59         .name = "pthread",
60         .priority = 0, /* use this as a last resort */
61         .timer_open = pthread_timer_open,
62         .timer_close = pthread_timer_close,
63         .timer_set_rate = pthread_timer_set_rate,
64         .timer_ack = pthread_timer_ack,
65         .timer_enable_continuous = pthread_timer_enable_continuous,
66         .timer_disable_continuous = pthread_timer_disable_continuous,
67         .timer_get_event = pthread_timer_get_event,
68         .timer_get_max_rate = pthread_timer_get_max_rate,
69         .timer_fd = pthread_timer_fd,
70 };
71
72 /* 1 tick / 10 ms */
73 #define MAX_RATE 100
74
75 static struct ao2_container *pthread_timers;
76 #define PTHREAD_TIMER_BUCKETS 563
77
78 enum {
79         PIPE_READ =  0,
80         PIPE_WRITE = 1
81 };
82
83 enum pthread_timer_state {
84         TIMER_STATE_IDLE,
85         TIMER_STATE_TICKING,
86 };
87
88 struct pthread_timer {
89         int pipe[2];
90         enum pthread_timer_state state;
91         unsigned int rate;
92         /*! Interval in ms for current rate */
93         unsigned int interval;
94         unsigned int tick_count;
95         unsigned int pending_ticks;
96         struct timeval start;
97         bool continuous:1;
98         bool pipe_signaled:1;
99 };
100
101 static void pthread_timer_destructor(void *obj);
102 static void signal_pipe(struct pthread_timer *timer);
103 static void unsignal_pipe(struct pthread_timer *timer);
104 static void ack_ticks(struct pthread_timer *timer, unsigned int num);
105
106 /*!
107  * \brief Data for the timing thread
108  */
109 static struct {
110         pthread_t thread;
111         ast_mutex_t lock;
112         ast_cond_t cond;
113         unsigned int stop:1;
114 } timing_thread;
115
116 static void *pthread_timer_open(void)
117 {
118         struct pthread_timer *timer;
119         int i;
120
121         if (!(timer = ao2_alloc(sizeof(*timer), pthread_timer_destructor))) {
122                 errno = ENOMEM;
123                 return NULL;
124         }
125
126         timer->pipe[PIPE_READ] = timer->pipe[PIPE_WRITE] = -1;
127         timer->state = TIMER_STATE_IDLE;
128
129         if (pipe(timer->pipe)) {
130                 ao2_ref(timer, -1);
131                 return NULL;
132         }
133
134         for (i = 0; i < ARRAY_LEN(timer->pipe); ++i) {
135                 int flags = fcntl(timer->pipe[i], F_GETFL);
136                 flags |= O_NONBLOCK;
137                 fcntl(timer->pipe[i], F_SETFL, flags);
138         }
139         
140         ao2_lock(pthread_timers);
141         if (!ao2_container_count(pthread_timers)) {
142                 ast_mutex_lock(&timing_thread.lock);
143                 ast_cond_signal(&timing_thread.cond);
144                 ast_mutex_unlock(&timing_thread.lock);
145         }
146         ao2_link_flags(pthread_timers, timer, OBJ_NOLOCK);
147         ao2_unlock(pthread_timers);
148
149         return timer;
150 }
151
152 static void pthread_timer_close(void *data)
153 {
154         struct pthread_timer *timer = data;
155
156         ao2_unlink(pthread_timers, timer);
157         ao2_ref(timer, -1);
158 }
159
160 static int pthread_timer_set_rate(void *data, unsigned int rate)
161 {
162         struct pthread_timer *timer = data;
163
164         if (rate > MAX_RATE) {
165                 ast_log(LOG_ERROR, "res_timing_pthread only supports timers at a "
166                                 "max rate of %d / sec\n", MAX_RATE);
167                 errno = EINVAL;
168                 return -1;
169         }
170
171         ao2_lock(timer);
172
173         if ((timer->rate = rate)) {
174                 timer->interval = roundf(1000.0 / ((float) rate));
175                 timer->start = ast_tvnow();
176                 timer->state = TIMER_STATE_TICKING;
177         } else {
178                 timer->interval = 0;
179                 timer->start = ast_tv(0, 0);
180                 timer->state = TIMER_STATE_IDLE;
181         }
182         timer->tick_count = 0;
183
184         ao2_unlock(timer);
185
186         return 0;
187 }
188
189 static int pthread_timer_ack(void *data, unsigned int quantity)
190 {
191         struct pthread_timer *timer = data;
192
193         ast_assert(quantity > 0);
194
195         ao2_lock(timer);
196         ack_ticks(timer, quantity);
197         ao2_unlock(timer);
198
199         return 0;
200 }
201
202 static int pthread_timer_enable_continuous(void *data)
203 {
204         struct pthread_timer *timer = data;
205
206         ao2_lock(timer);
207         if (!timer->continuous) {
208                 timer->continuous = true;
209                 signal_pipe(timer);
210         }
211         ao2_unlock(timer);
212
213         return 0;
214 }
215
216 static int pthread_timer_disable_continuous(void *data)
217 {
218         struct pthread_timer *timer = data;
219
220         ao2_lock(timer);
221         if (timer->continuous) {
222                 timer->continuous = false;
223                 unsignal_pipe(timer);
224         }
225         ao2_unlock(timer);
226
227         return 0;
228 }
229
230 static enum ast_timer_event pthread_timer_get_event(void *data)
231 {
232         struct pthread_timer *timer = data;
233         enum ast_timer_event res = AST_TIMING_EVENT_EXPIRED;
234
235         ao2_lock(timer);
236         if (timer->continuous) {
237                 res = AST_TIMING_EVENT_CONTINUOUS;
238         }
239         ao2_unlock(timer);
240
241         return res;
242 }
243
244 static unsigned int pthread_timer_get_max_rate(void *data)
245 {
246         return MAX_RATE;
247 }
248
249 static int pthread_timer_fd(void *data)
250 {
251         struct pthread_timer *timer = data;
252
253         return timer->pipe[PIPE_READ];
254 }
255
256 static void pthread_timer_destructor(void *obj)
257 {
258         struct pthread_timer *timer = obj;
259
260         if (timer->pipe[PIPE_READ] > -1) {
261                 close(timer->pipe[PIPE_READ]);
262                 timer->pipe[PIPE_READ] = -1;
263         }
264
265         if (timer->pipe[PIPE_WRITE] > -1) {
266                 close(timer->pipe[PIPE_WRITE]);
267                 timer->pipe[PIPE_WRITE] = -1;
268         }
269 }
270
271 /*!
272  * \note only PIPE_READ is guaranteed valid
273  */
274 static int pthread_timer_hash(const void *obj, const int flags)
275 {
276         const struct pthread_timer *timer = obj;
277
278         return timer->pipe[PIPE_READ];
279 }
280
281 /*!
282  * \note only PIPE_READ is guaranteed valid
283  */
284 static int pthread_timer_cmp(void *obj, void *arg, int flags)
285 {
286         struct pthread_timer *timer1 = obj, *timer2 = arg;
287
288         return (timer1->pipe[PIPE_READ] == timer2->pipe[PIPE_READ]) ? CMP_MATCH | CMP_STOP : 0;
289 }
290
291 /*!
292  * \retval 0 no timer tick needed
293  * \retval non-zero write to the timing pipe needed
294  */
295 static int check_timer(struct pthread_timer *timer)
296 {
297         struct timeval now;
298
299         if (timer->state == TIMER_STATE_IDLE) {
300                 return 0;
301         }
302
303         now = ast_tvnow();
304
305         if (timer->tick_count < (ast_tvdiff_ms(now, timer->start) / timer->interval)) {
306                 timer->tick_count++;
307                 if (!timer->tick_count) {
308                         /* Handle overflow. */
309                         timer->start = now;
310                 }
311                 return 1;
312         }
313
314         return 0;
315 }
316
317 /*!
318  * \internal
319  * \pre timer is locked
320  */
321 static void ack_ticks(struct pthread_timer *timer, unsigned int quantity)
322 {
323         int pending_ticks = timer->pending_ticks;
324
325         ast_assert(quantity);
326
327         if (quantity > pending_ticks) {
328                 quantity = pending_ticks;
329         }
330
331         if (!quantity) {
332                 return;
333         }
334
335         timer->pending_ticks -= quantity;
336
337         if ((0 == timer->pending_ticks) && !timer->continuous) {
338                 unsignal_pipe(timer);
339         }
340 }
341
342 /*!
343  * \internal
344  * \pre timer is locked
345  */
346 static void signal_pipe(struct pthread_timer *timer)
347 {
348         ssize_t res;
349         unsigned char x = 42;
350
351         if (timer->pipe_signaled) {
352                 return;
353         }
354
355         res = write(timer->pipe[PIPE_WRITE], &x, 1);
356         if (-1 == res) {
357                 ast_log(LOG_ERROR, "Error writing to timing pipe: %s\n",
358                                 strerror(errno));
359         } else {
360                 timer->pipe_signaled = true;
361         }
362 }
363
364 /*!
365  * \internal
366  * \pre timer is locked
367  */
368 static void unsignal_pipe(struct pthread_timer *timer)
369 {
370         ssize_t res;
371         unsigned long buffer;
372
373         if (!timer->pipe_signaled) {
374                 return;
375         }
376
377         res = read(timer->pipe[PIPE_READ], &buffer, sizeof(buffer));
378         if (-1 == res) {
379                 ast_log(LOG_ERROR, "Error reading from pipe: %s\n",
380                                 strerror(errno));
381         } else {
382                 timer->pipe_signaled = false;
383         }
384 }
385
386 static int run_timer(void *obj, void *arg, int flags)
387 {
388         struct pthread_timer *timer = obj;
389
390         if (timer->state == TIMER_STATE_IDLE) {
391                 return 0;
392         }
393
394         ao2_lock(timer);
395         if (check_timer(timer)) {
396                 timer->pending_ticks++;
397                 signal_pipe(timer);
398         }
399         ao2_unlock(timer);
400
401         return 0;
402 }
403
404 static void *do_timing(void *arg)
405 {
406         struct timeval next_wakeup = ast_tvnow();
407
408         while (!timing_thread.stop) {
409                 struct timespec ts = { 0, };
410
411                 ao2_callback(pthread_timers, OBJ_NODATA, run_timer, NULL);
412
413                 next_wakeup = ast_tvadd(next_wakeup, ast_tv(0, 5000));
414
415                 ts.tv_sec = next_wakeup.tv_sec;
416                 ts.tv_nsec = next_wakeup.tv_usec * 1000;
417
418                 ast_mutex_lock(&timing_thread.lock);
419                 if (!timing_thread.stop) {
420                         if (ao2_container_count(pthread_timers)) {
421                                 ast_cond_timedwait(&timing_thread.cond, &timing_thread.lock, &ts);
422                         } else {
423                                 ast_cond_wait(&timing_thread.cond, &timing_thread.lock);
424                         }
425                 }
426                 ast_mutex_unlock(&timing_thread.lock);
427         }
428
429         return NULL;
430 }
431
432 static int init_timing_thread(void)
433 {
434         ast_mutex_init(&timing_thread.lock);
435         ast_cond_init(&timing_thread.cond, NULL);
436
437         if (ast_pthread_create_background(&timing_thread.thread, NULL, do_timing, NULL)) {
438                 ast_log(LOG_ERROR, "Unable to start timing thread.\n");
439                 return -1;
440         }
441
442         return 0;
443 }
444
445 static int load_module(void)
446 {
447         if (!(pthread_timers = ao2_container_alloc(PTHREAD_TIMER_BUCKETS,
448                 pthread_timer_hash, pthread_timer_cmp))) {
449                 return AST_MODULE_LOAD_DECLINE;
450         }
451
452         if (init_timing_thread()) {
453                 ao2_ref(pthread_timers, -1);
454                 pthread_timers = NULL;
455                 return AST_MODULE_LOAD_DECLINE;
456         }
457
458         return (timing_funcs_handle = ast_register_timing_interface(&pthread_timing)) ?
459                 AST_MODULE_LOAD_SUCCESS : AST_MODULE_LOAD_DECLINE;
460 }
461
462 static int unload_module(void)
463 {
464         int res;
465
466         ast_mutex_lock(&timing_thread.lock);
467         timing_thread.stop = 1;
468         ast_cond_signal(&timing_thread.cond);
469         ast_mutex_unlock(&timing_thread.lock);
470         pthread_join(timing_thread.thread, NULL);
471
472         if (!(res = ast_unregister_timing_interface(timing_funcs_handle))) {
473                 ao2_ref(pthread_timers, -1);
474                 pthread_timers = NULL;
475         }
476
477         return res;
478 }
479 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_LOAD_ORDER, "pthread Timing Interface",
480                 .support_level = AST_MODULE_SUPPORT_EXTENDED,
481                 .load = load_module,
482                 .unload = unload_module,
483                 .load_pri = AST_MODPRI_TIMING,
484                 );