* at the top of the source tree.
*/
-/*!
+/*!
* \file
* \author Russell Bryant <russell@digium.com>
*
- * \brief pthread timing interface
+ * \brief pthread timing interface
*/
/*** MODULEINFO
- <conflict>res_timing_timerfd</conflict>
- <conflict>res_timing_dahdi</conflict>
+ <support_level>extended</support_level>
***/
#include "asterisk.h"
-ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
+ASTERISK_REGISTER_FILE();
+#include <stdbool.h>
#include <math.h>
-#include <sys/select.h>
+#include <unistd.h>
+#include <fcntl.h>
#include "asterisk/module.h"
#include "asterisk/timing.h"
static void *timing_funcs_handle;
-static int pthread_timer_open(void);
-static void pthread_timer_close(int handle);
-static int pthread_timer_set_rate(int handle, unsigned int rate);
-static void pthread_timer_ack(int handle, unsigned int quantity);
-static int pthread_timer_enable_continuous(int handle);
-static int pthread_timer_disable_continuous(int handle);
-static enum ast_timing_event pthread_timer_get_event(int handle);
-static unsigned int pthread_timer_get_max_rate(int handle);
-
-static struct ast_timing_functions pthread_timing_functions = {
+static void *pthread_timer_open(void);
+static void pthread_timer_close(void *data);
+static int pthread_timer_set_rate(void *data, unsigned int rate);
+static int pthread_timer_ack(void *data, unsigned int quantity);
+static int pthread_timer_enable_continuous(void *data);
+static int pthread_timer_disable_continuous(void *data);
+static enum ast_timer_event pthread_timer_get_event(void *data);
+static unsigned int pthread_timer_get_max_rate(void *data);
+static int pthread_timer_fd(void *data);
+
+static struct ast_timing_interface pthread_timing = {
+ .name = "pthread",
+ .priority = 0, /* use this as a last resort */
.timer_open = pthread_timer_open,
.timer_close = pthread_timer_close,
.timer_set_rate = pthread_timer_set_rate,
.timer_disable_continuous = pthread_timer_disable_continuous,
.timer_get_event = pthread_timer_get_event,
.timer_get_max_rate = pthread_timer_get_max_rate,
+ .timer_fd = pthread_timer_fd,
};
/* 1 tick / 10 ms */
enum pthread_timer_state {
TIMER_STATE_IDLE,
TIMER_STATE_TICKING,
- TIMER_STATE_CONTINUOUS,
};
struct pthread_timer {
/*! Interval in ms for current rate */
unsigned int interval;
unsigned int tick_count;
+ unsigned int pending_ticks;
struct timeval start;
+ bool continuous:1;
+ bool pipe_signaled:1;
};
static void pthread_timer_destructor(void *obj);
-static struct pthread_timer *find_timer(int handle, int unlinkobj);
-static void write_byte(int wr_fd);
-static void read_pipe(int rd_fd, unsigned int num, int clear);
+static void signal_pipe(struct pthread_timer *timer);
+static void unsignal_pipe(struct pthread_timer *timer);
+static void ack_ticks(struct pthread_timer *timer, unsigned int num);
/*!
* \brief Data for the timing thread
unsigned int stop:1;
} timing_thread;
-static int pthread_timer_open(void)
+static void *pthread_timer_open(void)
{
struct pthread_timer *timer;
- int fd;
+ int i;
if (!(timer = ao2_alloc(sizeof(*timer), pthread_timer_destructor))) {
errno = ENOMEM;
- return -1;
+ return NULL;
}
timer->pipe[PIPE_READ] = timer->pipe[PIPE_WRITE] = -1;
if (pipe(timer->pipe)) {
ao2_ref(timer, -1);
- return -1;
+ return NULL;
}
+ for (i = 0; i < ARRAY_LEN(timer->pipe); ++i) {
+ int flags = fcntl(timer->pipe[i], F_GETFL);
+ flags |= O_NONBLOCK;
+ fcntl(timer->pipe[i], F_SETFL, flags);
+ }
+
ao2_lock(pthread_timers);
if (!ao2_container_count(pthread_timers)) {
ast_mutex_lock(&timing_thread.lock);
ast_cond_signal(&timing_thread.cond);
ast_mutex_unlock(&timing_thread.lock);
}
- ao2_link(pthread_timers, timer);
+ ao2_link_flags(pthread_timers, timer, OBJ_NOLOCK);
ao2_unlock(pthread_timers);
- fd = timer->pipe[PIPE_READ];
-
- ao2_ref(timer, -1);
-
- return fd;
+ return timer;
}
-static void pthread_timer_close(int handle)
+static void pthread_timer_close(void *data)
{
- struct pthread_timer *timer;
-
- if (!(timer = find_timer(handle, 1))) {
- return;
- }
+ struct pthread_timer *timer = data;
+ ao2_unlink(pthread_timers, timer);
ao2_ref(timer, -1);
}
-static void set_state(struct pthread_timer *timer)
+static int pthread_timer_set_rate(void *data, unsigned int rate)
{
- unsigned int rate = timer->rate;
+ struct pthread_timer *timer = data;
- if (rate) {
- timer->state = TIMER_STATE_TICKING;
+ if (rate > MAX_RATE) {
+ ast_log(LOG_ERROR, "res_timing_pthread only supports timers at a "
+ "max rate of %d / sec\n", MAX_RATE);
+ errno = EINVAL;
+ return -1;
+ }
+
+ ao2_lock(timer);
+
+ if ((timer->rate = rate)) {
timer->interval = roundf(1000.0 / ((float) rate));
timer->start = ast_tvnow();
+ timer->state = TIMER_STATE_TICKING;
} else {
- timer->state = TIMER_STATE_IDLE;
timer->interval = 0;
timer->start = ast_tv(0, 0);
+ timer->state = TIMER_STATE_IDLE;
}
-
timer->tick_count = 0;
-}
-
-static int pthread_timer_set_rate(int handle, unsigned int rate)
-{
- struct pthread_timer *timer;
- if (!(timer = find_timer(handle, 0))) {
- errno = EINVAL;
- return -1;
- }
-
- if (rate > MAX_RATE) {
- ast_log(LOG_ERROR, "res_timing_pthread only supports timers at a max rate of %d / sec\n",
- MAX_RATE);
- errno = EINVAL;
- return -1;
- }
-
- ao2_lock(timer);
- timer->rate = rate;
- if (timer->state != TIMER_STATE_CONTINUOUS) {
- set_state(timer);
- }
-
ao2_unlock(timer);
- ao2_ref(timer, -1);
-
return 0;
}
-static void pthread_timer_ack(int handle, unsigned int quantity)
+static int pthread_timer_ack(void *data, unsigned int quantity)
{
- struct pthread_timer *timer;
+ struct pthread_timer *timer = data;
ast_assert(quantity > 0);
- if (!(timer = find_timer(handle, 0))) {
- return;
- }
-
- if (timer->state == TIMER_STATE_CONTINUOUS) {
- /* Leave the pipe alone, please! */
- return;
- }
-
- read_pipe(timer->pipe[PIPE_READ], quantity, 0);
+ ao2_lock(timer);
+ ack_ticks(timer, quantity);
+ ao2_unlock(timer);
- ao2_ref(timer, -1);
+ return 0;
}
-static int pthread_timer_enable_continuous(int handle)
+static int pthread_timer_enable_continuous(void *data)
{
- struct pthread_timer *timer;
-
- if (!(timer = find_timer(handle, 0))) {
- errno = EINVAL;
- return -1;
- }
+ struct pthread_timer *timer = data;
ao2_lock(timer);
- timer->state = TIMER_STATE_CONTINUOUS;
- write_byte(timer->pipe[PIPE_WRITE]);
+ if (!timer->continuous) {
+ timer->continuous = true;
+ signal_pipe(timer);
+ }
ao2_unlock(timer);
- ao2_ref(timer, -1);
-
return 0;
}
-static int pthread_timer_disable_continuous(int handle)
+static int pthread_timer_disable_continuous(void *data)
{
- struct pthread_timer *timer;
-
- if (!(timer = find_timer(handle, 0))) {
- errno = EINVAL;
- return -1;
- }
+ struct pthread_timer *timer = data;
ao2_lock(timer);
- set_state(timer);
- read_pipe(timer->pipe[PIPE_READ], 0, 1);
+ if (timer->continuous) {
+ timer->continuous = false;
+ unsignal_pipe(timer);
+ }
ao2_unlock(timer);
- ao2_ref(timer, -1);
-
return 0;
}
-static enum ast_timing_event pthread_timer_get_event(int handle)
+static enum ast_timer_event pthread_timer_get_event(void *data)
{
- struct pthread_timer *timer;
- enum ast_timing_event res = AST_TIMING_EVENT_EXPIRED;
-
- if (!(timer = find_timer(handle, 0))) {
- return res;
- }
+ struct pthread_timer *timer = data;
+ enum ast_timer_event res = AST_TIMING_EVENT_EXPIRED;
- if (timer->state == TIMER_STATE_CONTINUOUS) {
+ ao2_lock(timer);
+ if (timer->continuous) {
res = AST_TIMING_EVENT_CONTINUOUS;
}
-
- ao2_ref(timer, -1);
+ ao2_unlock(timer);
return res;
}
-static unsigned int pthread_timer_get_max_rate(int handle)
+static unsigned int pthread_timer_get_max_rate(void *data)
{
return MAX_RATE;
}
-static struct pthread_timer *find_timer(int handle, int unlinkobj)
+static int pthread_timer_fd(void *data)
{
- struct pthread_timer *timer;
- struct pthread_timer tmp_timer;
- int flags = OBJ_POINTER;
-
- tmp_timer.pipe[PIPE_READ] = handle;
-
- if (unlinkobj) {
- flags |= OBJ_UNLINK;
- }
+ struct pthread_timer *timer = data;
- if (!(timer = ao2_find(pthread_timers, &tmp_timer, flags))) {
- ast_assert(timer != NULL);
- return NULL;
- }
-
- return timer;
+ return timer->pipe[PIPE_READ];
}
static void pthread_timer_destructor(void *obj)
}
/*!
- * \note only PIPE_READ is guaranteed valid
+ * \note only PIPE_READ is guaranteed valid
*/
static int pthread_timer_hash(const void *obj, const int flags)
{
}
/*!
- * \note only PIPE_READ is guaranteed valid
+ * \note only PIPE_READ is guaranteed valid
*/
static int pthread_timer_cmp(void *obj, void *arg, int flags)
{
{
struct timeval now;
- if (timer->state == TIMER_STATE_IDLE || timer->state == TIMER_STATE_CONTINUOUS) {
- return 0;
+ if (timer->state == TIMER_STATE_IDLE) {
+ return 0;
}
-
+
now = ast_tvnow();
if (timer->tick_count < (ast_tvdiff_ms(now, timer->start) / timer->interval)) {
timer->tick_count++;
if (!timer->tick_count) {
+ /* Handle overflow. */
timer->start = now;
}
return 1;
return 0;
}
-static void read_pipe(int rd_fd, unsigned int quantity, int clear)
+/*!
+ * \internal
+ * \pre timer is locked
+ */
+static void ack_ticks(struct pthread_timer *timer, unsigned int quantity)
{
- ast_assert(quantity || clear);
+ int pending_ticks = timer->pending_ticks;
- if (!quantity && clear) {
- quantity = 1;
- }
+ ast_assert(quantity);
- do {
- unsigned char buf[1024];
- ssize_t res;
- fd_set rfds;
- struct timeval timeout = {
- .tv_sec = 0,
- };
+ if (quantity > pending_ticks) {
+ quantity = pending_ticks;
+ }
- /* Make sure there is data to read */
- FD_ZERO(&rfds);
- FD_SET(rd_fd, &rfds);
+ if (!quantity) {
+ return;
+ }
- if (select(rd_fd + 1, &rfds, NULL, NULL, &timeout) != 1) {
- break;
- }
+ timer->pending_ticks -= quantity;
- res = read(rd_fd, buf,
- (quantity < sizeof(buf)) ? quantity : sizeof(buf));
+ if ((0 == timer->pending_ticks) && !timer->continuous) {
+ unsignal_pipe(timer);
+ }
+}
- if (res == -1) {
- if (errno == EAGAIN) {
- continue;
- }
- ast_log(LOG_ERROR, "read failed on timing pipe: %s\n", strerror(errno));
- break;
- }
+/*!
+ * \internal
+ * \pre timer is locked
+ */
+static void signal_pipe(struct pthread_timer *timer)
+{
+ ssize_t res;
+ unsigned char x = 42;
- if (clear) {
- continue;
- }
+ if (timer->pipe_signaled) {
+ return;
+ }
- quantity -= res;
- } while (quantity);
+ res = write(timer->pipe[PIPE_WRITE], &x, 1);
+ if (-1 == res) {
+ ast_log(LOG_ERROR, "Error writing to timing pipe: %s\n",
+ strerror(errno));
+ } else {
+ timer->pipe_signaled = true;
+ }
}
-static void write_byte(int wr_fd)
+/*!
+ * \internal
+ * \pre timer is locked
+ */
+static void unsignal_pipe(struct pthread_timer *timer)
{
- do {
- ssize_t res;
- unsigned char x = 42;
+ ssize_t res;
+ unsigned long buffer;
- res = write(wr_fd, &x, 1);
+ if (!timer->pipe_signaled) {
+ return;
+ }
- if (res == -1) {
- if (errno == EAGAIN) {
- continue;
- }
- ast_log(LOG_ERROR, "Error writing to timing pipe: %s\n", strerror(errno));
- }
- } while (0);
+ res = read(timer->pipe[PIPE_READ], &buffer, sizeof(buffer));
+ if (-1 == res) {
+ ast_log(LOG_ERROR, "Error reading from pipe: %s\n",
+ strerror(errno));
+ } else {
+ timer->pipe_signaled = false;
+ }
}
static int run_timer(void *obj, void *arg, int flags)
}
ao2_lock(timer);
-
if (check_timer(timer)) {
- write_byte(timer->pipe[PIPE_WRITE]);
+ timer->pending_ticks++;
+ signal_pipe(timer);
}
-
ao2_unlock(timer);
return 0;
static int load_module(void)
{
- if (!(pthread_timers = ao2_container_alloc(PTHREAD_TIMER_BUCKETS,
+ if (!(pthread_timers = ao2_container_alloc(PTHREAD_TIMER_BUCKETS,
pthread_timer_hash, pthread_timer_cmp))) {
return AST_MODULE_LOAD_DECLINE;
}
return AST_MODULE_LOAD_DECLINE;
}
- return (timing_funcs_handle = ast_install_timing_functions(&pthread_timing_functions)) ?
+ return (timing_funcs_handle = ast_register_timing_interface(&pthread_timing)) ?
AST_MODULE_LOAD_SUCCESS : AST_MODULE_LOAD_DECLINE;
}
static int unload_module(void)
{
-#if 0
- /* XXX code to stop the timing thread ... */
+ int res;
- ast_uninstall_timing_functions(timing_funcs_handle);
- ao2_ref(pthread_timers, -1);
-#endif
+ ast_mutex_lock(&timing_thread.lock);
+ timing_thread.stop = 1;
+ ast_cond_signal(&timing_thread.cond);
+ ast_mutex_unlock(&timing_thread.lock);
+ pthread_join(timing_thread.thread, NULL);
- /* This module can not currently be unloaded. No use count handling is being done. */
+ if (!(res = ast_unregister_timing_interface(timing_funcs_handle))) {
+ ao2_ref(pthread_timers, -1);
+ pthread_timers = NULL;
+ }
- return -1;
+ return res;
}
-
-AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "pthread Timing Interface");
+AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_LOAD_ORDER, "pthread Timing Interface",
+ .support_level = AST_MODULE_SUPPORT_EXTENDED,
+ .load = load_module,
+ .unload = unload_module,
+ .load_pri = AST_MODPRI_TIMING,
+);