res/res_timing_kqueue: Update the module to conform to current timer API
authorMatthew Jordan <mjordan@digium.com>
Fri, 27 Mar 2015 14:41:46 +0000 (14:41 +0000)
committerMatthew Jordan <mjordan@digium.com>
Fri, 27 Mar 2015 14:41:46 +0000 (14:41 +0000)
This patch updates the kqueue timing module to conform to current timer API.

This fixes issues with using the kqueue timing source on Asterisk 13 on
FreeBSD 10. These issues include:

- Remove support for kevent64().  The values used to support Asterisk timers
  fit within 32bits and so can be handled on all platforms via kevent().

- Provide debug logging for, but do not track, unacked events.  This matches
  the behavior of all other timer implementations.

- Implement continuous mode by triggering and leaving active, a user event.
  This ensures that the file descriptor for the timer returns immediately from
  poll(), without placing the load of a high speed timer on the kernel.

- In kqueue_timer_get_max_rate(), don't overstate the capability of the timer.
  On some platforms, UINT_MAX is greater than INTPTR_MAX, the largest integer
  type kqueue supports for timers.

- In kqueue_timer_get_event(), assume the caller woke up from poll() and just
  return the mode the timer is currently in. This matches all other timer
  implementations.

- Adjust the test code now that unacked events are not tracked.

Review: https://reviewboard.asterisk.org/r/4465/

ASTERISK-24857 #close
Reported by: scsiguy
Tested by: Ed Hynan
patches:
  rb4465.patch submitted by scsiguy (License 6692)
........

Merged revisions 433574 from http://svn.asterisk.org/svn/asterisk/branches/13

git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@433575 65c4cc65-6c06-0410-ace0-fbb531ad65f3

res/res_timing_kqueue.c

index 0c98064..17f9836 100644 (file)
@@ -73,15 +73,92 @@ static struct ast_timing_interface kqueue_timing = {
 };
 
 struct kqueue_timer {
+       intptr_t period;
        int handle;
-       uint64_t nsecs;
-       uint64_t unacked;
+#ifndef EVFILT_USER
+       int continuous_fd;
+       unsigned int continuous_fd_valid:1;
+#endif
        unsigned int is_continuous:1;
 };
 
+#ifdef EVFILT_USER
+#define CONTINUOUS_EVFILT_TYPE EVFILT_USER
+static int kqueue_timer_init_continuous_event(struct kqueue_timer *timer)
+{
+       return 0;
+}
+
+static int kqueue_timer_enable_continuous_event(struct kqueue_timer *timer)
+{
+       struct kevent kev[2];
+
+       EV_SET(&kev[0], (uintptr_t)timer, EVFILT_USER, EV_ADD | EV_ENABLE,
+               0, 0, NULL);
+       EV_SET(&kev[1], (uintptr_t)timer, EVFILT_USER, 0, NOTE_TRIGGER,
+               0, NULL);
+       return kevent(timer->handle, kev, 2, NULL, 0, NULL);
+}
+
+static int kqueue_timer_disable_continuous_event(struct kqueue_timer *timer)
+{
+       struct kevent kev;
+
+       EV_SET(&kev, (uintptr_t)timer, EVFILT_USER, EV_DELETE, 0, 0, NULL);
+       return kevent(timer->handle, &kev, 1, NULL, 0, NULL);
+}
+
+static void kqueue_timer_fini_continuous_event(struct kqueue_timer *timer)
+{
+}
+
+#else /* EVFILT_USER */
+
+#define CONTINUOUS_EVFILT_TYPE EVFILT_READ
+static int kqueue_timer_init_continuous_event(struct kqueue_timer *timer)
+{
+       int pipefds[2];
+       int retval;
+
+       retval = pipe(pipefds);
+       if (retval == 0) {
+               timer->continuous_fd = pipefds[0];
+               timer->continuous_fd_valid = 1;
+               close(pipefds[1]);
+       }
+       return retval;
+}
+
+static void kqueue_timer_fini_continuous_event(struct kqueue_timer *timer)
+{
+       if (timer->continuous_fd_valid) {
+               close(timer->continuous_fd);
+       }
+}
+
+static int kqueue_timer_enable_continuous_event(struct kqueue_timer *timer)
+{
+       struct kevent kev;
+
+       EV_SET(&kev, timer->continuous_fd, EVFILT_READ, EV_ADD | EV_ENABLE,
+               0, 0, NULL);
+       return kevent(timer->handle, &kev, 1, NULL, 0, NULL);
+}
+
+static int kqueue_timer_disable_continuous_event(struct kqueue_timer *timer)
+{
+       struct kevent kev;
+
+       EV_SET(&kev, timer->continuous_fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
+       return kevent(timer->handle, &kev, 1, NULL, 0, NULL);
+}
+#endif
+
 static void timer_destroy(void *obj)
 {
        struct kqueue_timer *timer = obj;
+       ast_debug(5, "[%d]: Timer Destroy\n", timer->handle);
+       kqueue_timer_fini_continuous_event(timer);
        close(timer->handle);
 }
 
@@ -90,15 +167,24 @@ static void *kqueue_timer_open(void)
        struct kqueue_timer *timer;
 
        if (!(timer = ao2_alloc(sizeof(*timer), timer_destroy))) {
-               ast_log(LOG_ERROR, "Could not allocate memory for kqueue_timer structure\n");
+               ast_log(LOG_ERROR, "Alloc failed for kqueue_timer structure\n");
                return NULL;
        }
+
        if ((timer->handle = kqueue()) < 0) {
-               ast_log(LOG_ERROR, "Failed to create kqueue timer: %s\n", strerror(errno));
+               ast_log(LOG_ERROR, "Failed to create kqueue fd: %s\n",
+                       strerror(errno));
                ao2_ref(timer, -1);
                return NULL;
        }
 
+       if (kqueue_timer_init_continuous_event(timer) != 0) {
+               ast_log(LOG_ERROR, "Failed to create continuous event: %s\n",
+                       strerror(errno));
+               ao2_ref(timer, -1);
+               return NULL;
+       }
+       ast_debug(5, "[%d]: Create timer\n", timer->handle);
        return timer;
 }
 
@@ -106,75 +192,151 @@ static void kqueue_timer_close(void *data)
 {
        struct kqueue_timer *timer = data;
 
+       ast_debug(5, "[%d]: Timer Close\n", timer->handle);
        ao2_ref(timer, -1);
 }
 
-static void kqueue_set_nsecs(struct kqueue_timer *our_timer, uint64_t nsecs)
+/*
+ * Use the highest precision available that does not overflow
+ * the datatype kevent is using for time.
+ */
+static intptr_t kqueue_scale_period(unsigned int period_ns, int *units)
 {
-       struct timespec nowait = { 0, 1 };
-#ifdef HAVE_KEVENT64
-       struct kevent64_s kev;
-
-       EV_SET64(&kev, our_timer->handle, EVFILT_TIMER, EV_ADD | EV_ENABLE, NOTE_NSECONDS,
-               nsecs, 0, 0, 0);
-       kevent64(our_timer->handle, &kev, 1, NULL, 0, 0, &nowait);
-#else
-       struct kevent kev;
-
-       EV_SET(&kev, our_timer->handle, EVFILT_TIMER, EV_ADD | EV_ENABLE,
-#ifdef NOTE_NSECONDS
-               nsecs <= 0xFFffFFff ? NOTE_NSECONDS :
-#endif
-#ifdef NOTE_USECONDS
-               NOTE_USECONDS
-#else /* Milliseconds, if no constants are defined */
-               0
-#endif
-               ,
+       uint64_t period = period_ns;
+       *units = 0;
 #ifdef NOTE_NSECONDS
-               nsecs <= 0xFFffFFff ? nsecs :
-#endif
+       if (period < INTPTR_MAX) {
+               *units = NOTE_NSECONDS;
+       } else {
 #ifdef NOTE_USECONDS
-       nsecs / 1000
-#else /* Milliseconds, if nothing else is defined */
-       nsecs / 1000000
-#endif
-       , NULL);
-       kevent(our_timer->handle, &kev, 1, NULL, 0, &nowait);
+               period /= 1000;
+               if (period < INTPTR_MAX) {
+                       *units = NOTE_USECONDS;
+               } else {
+                       period /= 1000;
+#ifdef NOTE_MSECONDS
+                       *units = NOTE_MSECONDS;
+#endif /* NOTE_MSECONDS */
+               }
+#else  /* NOTE_USECONDS */
+               period /= 1000000;
+#ifdef NOTE_MSECONDS
+               *units = NOTE_MSECONDS;
+#endif /* NOTE_MSECONDS */
+#endif /* NOTE_USECONDS */
+       }
+#else  /* NOTE_NSECONDS */
+       period /= 1000000;
 #endif
+       if (period > INTPTR_MAX) {
+               period = INTPTR_MAX;
+       }
+       return period;
 }
 
 static int kqueue_timer_set_rate(void *data, unsigned int rate)
 {
+       struct kevent kev;
        struct kqueue_timer *timer = data;
+       uint64_t period_ns;
+       int flags;
+       int units;
+       int retval;
 
-       kqueue_set_nsecs(timer, (timer->nsecs = rate ? (long) (1000000000 / rate) : 0L));
+       ao2_lock(timer);
+
+       if (rate == 0) {
+               if (timer->period == 0) {
+                       ao2_unlock(timer);
+                       return (0);
+               }
+               flags = EV_DELETE;
+               timer->period = 0;
+               units = 0;
+       } else  {
+               flags = EV_ADD | EV_ENABLE;
+               period_ns = (uint64_t)1000000000 / rate;
+               timer->period = kqueue_scale_period(period_ns, &units);
+       }
+       ast_debug(5, "[%d]: Set rate %u:%ju\n",
+               timer->handle, units, (uintmax_t)timer->period);
+       EV_SET(&kev, timer->handle, EVFILT_TIMER, flags, units,
+               timer->period, NULL);
+       retval = kevent(timer->handle, &kev, 1, NULL, 0, NULL);
+
+       if (retval == -1) {
+               ast_log(LOG_ERROR, "[%d]: Error queing timer: %s\n",
+                       timer->handle, strerror(errno));
+       }
+
+       ao2_unlock(timer);
 
        return 0;
 }
 
 static int kqueue_timer_ack(void *data, unsigned int quantity)
 {
+       static struct timespec ts_nowait = { 0, 0 };
        struct kqueue_timer *timer = data;
+       struct kevent kev[2];
+       int i, retval;
+
+       ao2_lock(timer);
 
-       if (timer->unacked < quantity) {
-               ast_debug(1, "Acking more events than have expired?!!\n");
-               timer->unacked = 0;
+       retval = kevent(timer->handle, NULL, 0, kev, 2, &ts_nowait);
+       if (retval == -1) {
+               ast_log(LOG_ERROR, "[%d]: Error sampling kqueue: %s\n",
+                       timer->handle, strerror(errno));
+               ao2_unlock(timer);
                return -1;
-       } else {
-               timer->unacked -= quantity;
        }
 
+       for (i = 0; i < retval; i++) {
+               switch (kev[i].filter) {
+               case EVFILT_TIMER:
+                       if (kev[i].data > quantity) {
+                               ast_log(LOG_ERROR, "[%d]: Missed %ju\n",
+                                       timer->handle,
+                                       (uintmax_t)kev[i].data - quantity);
+                       }
+                       break;
+               case CONTINUOUS_EVFILT_TYPE:
+                       if (!timer->is_continuous) {
+                               ast_log(LOG_ERROR,
+                                       "[%d]: Spurious user event\n",
+                                       timer->handle);
+                       }
+                       break;
+               default:
+                       ast_log(LOG_ERROR, "[%d]: Spurious kevent type %d.\n",
+                               timer->handle, kev[i].filter);
+               }
+       }
+
+       ao2_unlock(timer);
+
        return 0;
 }
 
 static int kqueue_timer_enable_continuous(void *data)
 {
        struct kqueue_timer *timer = data;
+       int retval;
+
+       ao2_lock(timer);
+
+       if (!timer->is_continuous) {
+               ast_debug(5, "[%d]: Enable Continuous\n", timer->handle);
+               retval = kqueue_timer_enable_continuous_event(timer);
+               if (retval == -1) {
+                       ast_log(LOG_ERROR,
+                               "[%d]: Error signaling continuous event: %s\n",
+                               timer->handle, strerror(errno));
+               }
+               timer->is_continuous = 1;
+       }
 
-       kqueue_set_nsecs(timer, 1);
-       timer->is_continuous = 1;
-       timer->unacked = 0;
+       ao2_unlock(timer);
 
        return 0;
 }
@@ -182,10 +344,22 @@ static int kqueue_timer_enable_continuous(void *data)
 static int kqueue_timer_disable_continuous(void *data)
 {
        struct kqueue_timer *timer = data;
+       int retval;
+
+       ao2_lock(timer);
+
+       if (timer->is_continuous) {
+               ast_debug(5, "[%d]: Disable Continuous\n", timer->handle);
+               retval = kqueue_timer_disable_continuous_event(timer);
+               if (retval == -1) {
+                       ast_log(LOG_ERROR,
+                               "[%d]: Error clearing continuous event: %s\n",
+                               timer->handle, strerror(errno));
+               }
+               timer->is_continuous = 0;
+       }
 
-       kqueue_set_nsecs(timer, timer->nsecs);
-       timer->is_continuous = 0;
-       timer->unacked = 0;
+       ao2_unlock(timer);
 
        return 0;
 }
@@ -193,21 +367,12 @@ static int kqueue_timer_disable_continuous(void *data)
 static enum ast_timer_event kqueue_timer_get_event(void *data)
 {
        struct kqueue_timer *timer = data;
-       enum ast_timer_event res = -1;
-       struct timespec sixty_seconds = { 60, 0 };
-       struct kevent kev;
+       enum ast_timer_event res;
 
-       /* If we have non-ACKed events, just return immediately */
-       if (timer->unacked == 0) {
-               if (kevent(timer->handle, NULL, 0, &kev, 1, &sixty_seconds) > 0) {
-                       timer->unacked += kev.data;
-               } else {
-                       perror("kevent");
-               }
-       }
-
-       if (timer->unacked > 0) {
-               res = timer->is_continuous ? AST_TIMING_EVENT_CONTINUOUS : AST_TIMING_EVENT_EXPIRED;
+       if (timer->is_continuous) {
+               res = AST_TIMING_EVENT_CONTINUOUS;
+       } else {
+               res = AST_TIMING_EVENT_EXPIRED;
        }
 
        return res;
@@ -215,8 +380,7 @@ static enum ast_timer_event kqueue_timer_get_event(void *data)
 
 static unsigned int kqueue_timer_get_max_rate(void *data)
 {
-       /* Actually, the max rate is 2^64-1 seconds, but that's not representable in a 32-bit integer. */
-       return UINT_MAX;
+       return INTPTR_MAX > UINT_MAX ? UINT_MAX : INTPTR_MAX;
 }
 
 static int kqueue_timer_fd(void *data)
@@ -273,8 +437,8 @@ AST_TEST_DEFINE(test_kqueue_timing)
                        res = AST_TEST_FAIL;
                        break;
                }
-               if (kt->unacked == 0) {
-                       ast_test_status_update(test, "Unacked events is 0, but there should be at least 1.\n");
+               if (kqueue_timer_ack(kt, 1) != 0) {
+                       ast_test_status_update(test, "Acking event failed.\n");
                        res = AST_TEST_FAIL;
                        break;
                }
@@ -292,15 +456,15 @@ AST_TEST_DEFINE(test_kqueue_timing)
                                res = AST_TEST_FAIL;
                                break;
                        }
+                       if (kqueue_timer_ack(kt, 1) != 0) {
+                               ast_test_status_update(test, "Acking event failed.\n");
+                               res = AST_TEST_FAIL;
+                               break;
+                       }
+
                }
                diff = ast_tvdiff_us(ast_tvnow(), start);
                ast_test_status_update(test, "diff is %llu\n", diff);
-               /*
-               if (abs(diff - kt->unacked) == 0) {
-                       ast_test_status_update(test, "Unacked events should be around 1000, not %llu\n", kt->unacked);
-                       res = AST_TEST_FAIL;
-               }
-               */
        } while (0);
        kqueue_timer_close(kt);
        return res;
@@ -313,8 +477,8 @@ AST_TEST_DEFINE(test_kqueue_timing)
  * Module loading including tests for configuration or dependencies.
  * This function can return AST_MODULE_LOAD_FAILURE, AST_MODULE_LOAD_DECLINE,
  * or AST_MODULE_LOAD_SUCCESS. If a dependency or environment variable fails
- * tests return AST_MODULE_LOAD_FAILURE. If the module can not load the 
- * configuration file or other non-critical problem return 
+ * tests return AST_MODULE_LOAD_FAILURE. If the module can not load the
+ * configuration file or other non-critical problem return
  * AST_MODULE_LOAD_DECLINE. On success return AST_MODULE_LOAD_SUCCESS.
  */
 static int load_module(void)