res_fax.c: Add chan locked precondition comments.
[asterisk/asterisk.git] / res / res_timing_pthread.c
index 0afe9c9..6476e74 100644 (file)
  * at the top of the source tree.
  */
 
-/*! 
+/*!
  * \file
  * \author Russell Bryant <russell@digium.com>
  *
- * \brief pthread timing interface 
+ * \brief pthread timing interface
  */
 
 /*** MODULEINFO
-       <conflict>res_timing_timerfd</conflict>
-       <conflict>res_timing_dahdi</conflict>
+       <support_level>extended</support_level>
  ***/
 
 #include "asterisk.h"
 
-ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
+ASTERISK_REGISTER_FILE();
 
+#include <stdbool.h>
 #include <math.h>
-#include <sys/select.h>
+#include <unistd.h>
+#include <fcntl.h>
 
 #include "asterisk/module.h"
 #include "asterisk/timing.h"
@@ -44,16 +45,19 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
 
 static void *timing_funcs_handle;
 
-static int pthread_timer_open(void);
-static void pthread_timer_close(int handle);
-static int pthread_timer_set_rate(int handle, unsigned int rate);
-static void pthread_timer_ack(int handle, unsigned int quantity);
-static int pthread_timer_enable_continuous(int handle);
-static int pthread_timer_disable_continuous(int handle);
-static enum ast_timing_event pthread_timer_get_event(int handle);
-static unsigned int pthread_timer_get_max_rate(int handle);
-
-static struct ast_timing_functions pthread_timing_functions = {
+static void *pthread_timer_open(void);
+static void pthread_timer_close(void *data);
+static int pthread_timer_set_rate(void *data, unsigned int rate);
+static int pthread_timer_ack(void *data, unsigned int quantity);
+static int pthread_timer_enable_continuous(void *data);
+static int pthread_timer_disable_continuous(void *data);
+static enum ast_timer_event pthread_timer_get_event(void *data);
+static unsigned int pthread_timer_get_max_rate(void *data);
+static int pthread_timer_fd(void *data);
+
+static struct ast_timing_interface pthread_timing = {
+       .name = "pthread",
+       .priority = 0, /* use this as a last resort */
        .timer_open = pthread_timer_open,
        .timer_close = pthread_timer_close,
        .timer_set_rate = pthread_timer_set_rate,
@@ -62,6 +66,7 @@ static struct ast_timing_functions pthread_timing_functions = {
        .timer_disable_continuous = pthread_timer_disable_continuous,
        .timer_get_event = pthread_timer_get_event,
        .timer_get_max_rate = pthread_timer_get_max_rate,
+       .timer_fd = pthread_timer_fd,
 };
 
 /* 1 tick / 10 ms */
@@ -78,7 +83,6 @@ enum {
 enum pthread_timer_state {
        TIMER_STATE_IDLE,
        TIMER_STATE_TICKING,
-       TIMER_STATE_CONTINUOUS,
 };
 
 struct pthread_timer {
@@ -88,13 +92,16 @@ struct pthread_timer {
        /*! Interval in ms for current rate */
        unsigned int interval;
        unsigned int tick_count;
+       unsigned int pending_ticks;
        struct timeval start;
+       bool continuous:1;
+       bool pipe_signaled:1;
 };
 
 static void pthread_timer_destructor(void *obj);
-static struct pthread_timer *find_timer(int handle, int unlinkobj);
-static void write_byte(int wr_fd);
-static void read_pipe(int rd_fd, unsigned int num, int clear);
+static void signal_pipe(struct pthread_timer *timer);
+static void unsignal_pipe(struct pthread_timer *timer);
+static void ack_ticks(struct pthread_timer *timer, unsigned int num);
 
 /*!
  * \brief Data for the timing thread
@@ -106,14 +113,14 @@ static struct {
        unsigned int stop:1;
 } timing_thread;
 
-static int pthread_timer_open(void)
+static void *pthread_timer_open(void)
 {
        struct pthread_timer *timer;
-       int fd;
+       int i;
 
        if (!(timer = ao2_alloc(sizeof(*timer), pthread_timer_destructor))) {
                errno = ENOMEM;
-               return -1;
+               return NULL;
        }
 
        timer->pipe[PIPE_READ] = timer->pipe[PIPE_WRITE] = -1;
@@ -121,181 +128,129 @@ static int pthread_timer_open(void)
 
        if (pipe(timer->pipe)) {
                ao2_ref(timer, -1);
-               return -1;
+               return NULL;
        }
 
+       for (i = 0; i < ARRAY_LEN(timer->pipe); ++i) {
+               int flags = fcntl(timer->pipe[i], F_GETFL);
+               flags |= O_NONBLOCK;
+               fcntl(timer->pipe[i], F_SETFL, flags);
+       }
+       
        ao2_lock(pthread_timers);
        if (!ao2_container_count(pthread_timers)) {
                ast_mutex_lock(&timing_thread.lock);
                ast_cond_signal(&timing_thread.cond);
                ast_mutex_unlock(&timing_thread.lock);
        }
-       ao2_link(pthread_timers, timer);
+       ao2_link_flags(pthread_timers, timer, OBJ_NOLOCK);
        ao2_unlock(pthread_timers);
 
-       fd = timer->pipe[PIPE_READ];
-
-       ao2_ref(timer, -1);
-
-       return fd;
+       return timer;
 }
 
-static void pthread_timer_close(int handle)
+static void pthread_timer_close(void *data)
 {
-       struct pthread_timer *timer;
-
-       if (!(timer = find_timer(handle, 1))) {
-               return;
-       }
+       struct pthread_timer *timer = data;
 
+       ao2_unlink(pthread_timers, timer);
        ao2_ref(timer, -1);
 }
 
-static void set_state(struct pthread_timer *timer)
+static int pthread_timer_set_rate(void *data, unsigned int rate)
 {
-       unsigned int rate = timer->rate;
+       struct pthread_timer *timer = data;
 
-       if (rate) {
-               timer->state = TIMER_STATE_TICKING;
+       if (rate > MAX_RATE) {
+               ast_log(LOG_ERROR, "res_timing_pthread only supports timers at a "
+                               "max rate of %d / sec\n", MAX_RATE);
+               errno = EINVAL;
+               return -1;
+       }
+
+       ao2_lock(timer);
+
+       if ((timer->rate = rate)) {
                timer->interval = roundf(1000.0 / ((float) rate));
                timer->start = ast_tvnow();
+               timer->state = TIMER_STATE_TICKING;
        } else {
-               timer->state = TIMER_STATE_IDLE;
                timer->interval = 0;
                timer->start = ast_tv(0, 0);
+               timer->state = TIMER_STATE_IDLE;
        }
-
        timer->tick_count = 0;
-}
-
-static int pthread_timer_set_rate(int handle, unsigned int rate)
-{
-       struct pthread_timer *timer;
 
-       if (!(timer = find_timer(handle, 0))) {
-               errno = EINVAL;
-               return -1;
-       }
-
-       if (rate > MAX_RATE) {
-               ast_log(LOG_ERROR, "res_timing_pthread only supports timers at a max rate of %d / sec\n",
-                       MAX_RATE);
-               errno = EINVAL;
-               return -1;
-       }
-
-       ao2_lock(timer);
-       timer->rate = rate;
-       if (timer->state != TIMER_STATE_CONTINUOUS) {
-               set_state(timer);
-       }
-       
        ao2_unlock(timer);
 
-       ao2_ref(timer, -1);
-
        return 0;
 }
 
-static void pthread_timer_ack(int handle, unsigned int quantity)
+static int pthread_timer_ack(void *data, unsigned int quantity)
 {
-       struct pthread_timer *timer;
+       struct pthread_timer *timer = data;
 
        ast_assert(quantity > 0);
 
-       if (!(timer = find_timer(handle, 0))) {
-               return;
-       }
-
-       if (timer->state == TIMER_STATE_CONTINUOUS) {
-               /* Leave the pipe alone, please! */
-               return;
-       }
-
-       read_pipe(timer->pipe[PIPE_READ], quantity, 0);
+       ao2_lock(timer);
+       ack_ticks(timer, quantity);
+       ao2_unlock(timer);
 
-       ao2_ref(timer, -1);
+       return 0;
 }
 
-static int pthread_timer_enable_continuous(int handle)
+static int pthread_timer_enable_continuous(void *data)
 {
-       struct pthread_timer *timer;
-
-       if (!(timer = find_timer(handle, 0))) {
-               errno = EINVAL;
-               return -1;
-       }
+       struct pthread_timer *timer = data;
 
        ao2_lock(timer);
-       timer->state = TIMER_STATE_CONTINUOUS;
-       write_byte(timer->pipe[PIPE_WRITE]);
+       if (!timer->continuous) {
+               timer->continuous = true;
+               signal_pipe(timer);
+       }
        ao2_unlock(timer);
 
-       ao2_ref(timer, -1);
-
        return 0;
 }
 
-static int pthread_timer_disable_continuous(int handle)
+static int pthread_timer_disable_continuous(void *data)
 {
-       struct pthread_timer *timer;
-
-       if (!(timer = find_timer(handle, 0))) {
-               errno = EINVAL;
-               return -1;
-       }
+       struct pthread_timer *timer = data;
 
        ao2_lock(timer);
-       set_state(timer);
-       read_pipe(timer->pipe[PIPE_READ], 0, 1);
+       if (timer->continuous) {
+               timer->continuous = false;
+               unsignal_pipe(timer);
+       }
        ao2_unlock(timer);
 
-       ao2_ref(timer, -1);
-
        return 0;
 }
 
-static enum ast_timing_event pthread_timer_get_event(int handle)
+static enum ast_timer_event pthread_timer_get_event(void *data)
 {
-       struct pthread_timer *timer;
-       enum ast_timing_event res = AST_TIMING_EVENT_EXPIRED;
-
-       if (!(timer = find_timer(handle, 0))) {
-               return res;
-       }
+       struct pthread_timer *timer = data;
+       enum ast_timer_event res = AST_TIMING_EVENT_EXPIRED;
 
-       if (timer->state == TIMER_STATE_CONTINUOUS) {
+       ao2_lock(timer);
+       if (timer->continuous) {
                res = AST_TIMING_EVENT_CONTINUOUS;
        }
-
-       ao2_ref(timer, -1);
+       ao2_unlock(timer);
 
        return res;
 }
 
-static unsigned int pthread_timer_get_max_rate(int handle)
+static unsigned int pthread_timer_get_max_rate(void *data)
 {
        return MAX_RATE;
 }
 
-static struct pthread_timer *find_timer(int handle, int unlinkobj)
+static int pthread_timer_fd(void *data)
 {
-       struct pthread_timer *timer;
-       struct pthread_timer tmp_timer;
-       int flags = OBJ_POINTER;
-
-       tmp_timer.pipe[PIPE_READ] = handle;
-
-       if (unlinkobj) {
-               flags |= OBJ_UNLINK;
-       }
+       struct pthread_timer *timer = data;
 
-       if (!(timer = ao2_find(pthread_timers, &tmp_timer, flags))) {
-               ast_assert(timer != NULL);
-               return NULL;
-       }
-
-       return timer;
+       return timer->pipe[PIPE_READ];
 }
 
 static void pthread_timer_destructor(void *obj)
@@ -314,7 +269,7 @@ static void pthread_timer_destructor(void *obj)
 }
 
 /*!
- * \note only PIPE_READ is guaranteed valid 
+ * \note only PIPE_READ is guaranteed valid
  */
 static int pthread_timer_hash(const void *obj, const int flags)
 {
@@ -324,7 +279,7 @@ static int pthread_timer_hash(const void *obj, const int flags)
 }
 
 /*!
- * \note only PIPE_READ is guaranteed valid 
+ * \note only PIPE_READ is guaranteed valid
  */
 static int pthread_timer_cmp(void *obj, void *arg, int flags)
 {
@@ -341,15 +296,16 @@ static int check_timer(struct pthread_timer *timer)
 {
        struct timeval now;
 
-       if (timer->state == TIMER_STATE_IDLE || timer->state == TIMER_STATE_CONTINUOUS) {
-               return 0;       
+       if (timer->state == TIMER_STATE_IDLE) {
+               return 0;
        }
-       
+
        now = ast_tvnow();
 
        if (timer->tick_count < (ast_tvdiff_ms(now, timer->start) / timer->interval)) {
                timer->tick_count++;
                if (!timer->tick_count) {
+                       /* Handle overflow. */
                        timer->start = now;
                }
                return 1;
@@ -358,64 +314,73 @@ static int check_timer(struct pthread_timer *timer)
        return 0;
 }
 
-static void read_pipe(int rd_fd, unsigned int quantity, int clear)
+/*!
+ * \internal
+ * \pre timer is locked
+ */
+static void ack_ticks(struct pthread_timer *timer, unsigned int quantity)
 {
-       ast_assert(quantity || clear);
+       int pending_ticks = timer->pending_ticks;
 
-       if (!quantity && clear) {
-               quantity = 1;
-       }
+       ast_assert(quantity);
 
-       do {
-               unsigned char buf[1024];
-               ssize_t res;
-               fd_set rfds;
-               struct timeval timeout = {
-                       .tv_sec = 0,
-               };
+       if (quantity > pending_ticks) {
+               quantity = pending_ticks;
+       }
 
-               /* Make sure there is data to read */
-               FD_ZERO(&rfds);
-               FD_SET(rd_fd, &rfds);
+       if (!quantity) {
+               return;
+       }
 
-               if (select(rd_fd + 1, &rfds, NULL, NULL, &timeout) != 1) {
-                       break;
-               }
+       timer->pending_ticks -= quantity;
 
-               res = read(rd_fd, buf, 
-                       (quantity < sizeof(buf)) ? quantity : sizeof(buf));
+       if ((0 == timer->pending_ticks) && !timer->continuous) {
+               unsignal_pipe(timer);
+       }
+}
 
-               if (res == -1) {
-                       if (errno == EAGAIN) {
-                               continue;
-                       }
-                       ast_log(LOG_ERROR, "read failed on timing pipe: %s\n", strerror(errno));
-                       break;
-               }
+/*!
+ * \internal
+ * \pre timer is locked
+ */
+static void signal_pipe(struct pthread_timer *timer)
+{
+       ssize_t res;
+       unsigned char x = 42;
 
-               if (clear) {
-                       continue;
-               }
+       if (timer->pipe_signaled) {
+               return;
+       }
 
-               quantity -= res;
-       } while (quantity);
+       res = write(timer->pipe[PIPE_WRITE], &x, 1);
+       if (-1 == res) {
+               ast_log(LOG_ERROR, "Error writing to timing pipe: %s\n",
+                               strerror(errno));
+       } else {
+               timer->pipe_signaled = true;
+       }
 }
 
-static void write_byte(int wr_fd)
+/*!
+ * \internal
+ * \pre timer is locked
+ */
+static void unsignal_pipe(struct pthread_timer *timer)
 {
-       do {
-               ssize_t res;
-               unsigned char x = 42;
+       ssize_t res;
+       unsigned long buffer;
 
-               res = write(wr_fd, &x, 1); 
+       if (!timer->pipe_signaled) {
+               return;
+       }
 
-               if (res == -1) {
-                       if (errno == EAGAIN) {
-                               continue;
-                       }
-                       ast_log(LOG_ERROR, "Error writing to timing pipe: %s\n", strerror(errno));
-               }
-       } while (0);
+       res = read(timer->pipe[PIPE_READ], &buffer, sizeof(buffer));
+       if (-1 == res) {
+               ast_log(LOG_ERROR, "Error reading from pipe: %s\n",
+                               strerror(errno));
+       } else {
+               timer->pipe_signaled = false;
+       }
 }
 
 static int run_timer(void *obj, void *arg, int flags)
@@ -427,11 +392,10 @@ static int run_timer(void *obj, void *arg, int flags)
        }
 
        ao2_lock(timer);
-
        if (check_timer(timer)) {
-               write_byte(timer->pipe[PIPE_WRITE]);
+               timer->pending_ticks++;
+               signal_pipe(timer);
        }
-       
        ao2_unlock(timer);
 
        return 0;
@@ -480,7 +444,7 @@ static int init_timing_thread(void)
 
 static int load_module(void)
 {
-       if (!(pthread_timers = ao2_container_alloc(PTHREAD_TIMER_BUCKETS, 
+       if (!(pthread_timers = ao2_container_alloc(PTHREAD_TIMER_BUCKETS,
                pthread_timer_hash, pthread_timer_cmp))) {
                return AST_MODULE_LOAD_DECLINE;
        }
@@ -491,22 +455,30 @@ static int load_module(void)
                return AST_MODULE_LOAD_DECLINE;
        }
 
-       return (timing_funcs_handle = ast_install_timing_functions(&pthread_timing_functions)) ?
+       return (timing_funcs_handle = ast_register_timing_interface(&pthread_timing)) ?
                AST_MODULE_LOAD_SUCCESS : AST_MODULE_LOAD_DECLINE;
 }
 
 static int unload_module(void)
 {
-#if 0
-       /* XXX code to stop the timing thread ... */
+       int res;
 
-       ast_uninstall_timing_functions(timing_funcs_handle);
-       ao2_ref(pthread_timers, -1);
-#endif
+       ast_mutex_lock(&timing_thread.lock);
+       timing_thread.stop = 1;
+       ast_cond_signal(&timing_thread.cond);
+       ast_mutex_unlock(&timing_thread.lock);
+       pthread_join(timing_thread.thread, NULL);
 
-       /* This module can not currently be unloaded.  No use count handling is being done. */
+       if (!(res = ast_unregister_timing_interface(timing_funcs_handle))) {
+               ao2_ref(pthread_timers, -1);
+               pthread_timers = NULL;
+       }
 
-       return -1;
+       return res;
 }
-
-AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "pthread Timing Interface");
+AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_LOAD_ORDER, "pthread Timing Interface",
+       .support_level = AST_MODULE_SUPPORT_EXTENDED,
+       .load = load_module,
+       .unload = unload_module,
+       .load_pri = AST_MODPRI_TIMING,
+);