res_pjsip: Add serialized scheduler (res_pjsip/pjsip_scheduler.c)
authorGeorge Joseph <george.joseph@fairview5.com>
Thu, 17 Mar 2016 17:28:26 +0000 (11:28 -0600)
committerGeorge Joseph <george.joseph@fairview5.com>
Thu, 14 Apr 2016 19:16:21 +0000 (13:16 -0600)
There are several places that do scheduled tasks or periodic housecleaning,
each with its own implementation:

* res_pjsip_keepalive has a thread that sends keepalives.
* pjsip_distributor has a thread that cleans up expired unidentified requests.
* res_pjsip_registrar_expire has a thread that cleans up expired contacts.
* res_pjsip_pubsub uses ast_sched directly and then calls ast_sip_push_task.
* res_pjsip_sdp_rtp also uses ast_sched to send keepalives.

There are also places where we should be doing scheduled work but aren't.
A good example are the places we have sorcery observers to start registration
or qualify.  These don't work when changes are made to a backend database
without a pjsip reload.  We need to check periodically.

As a first step to solving these issues, a new ast_sip_sched facility has
been created.

ast_sip_sched wraps ast_sched but only uses ast_sched as a scheduled queue.
When a task is ready to run, ast_sip_task_pusk is called for it. This ensures
that the task is executed in a PJLIB registered thread and doesn't hold up the
ast_sched thread so it can immediately continue processing the queue.  The
serializer used by ast_sip_sched is one of your choosing or a random one from
the res_pjsip pool if you don't choose one.

Another feature is the ability to automatically clean up the task_data when the
task expires (if ever).  If it's an ao2 object, it will be dereferenced, if
it's a malloc'd object it will be freed.  This is selectable when the task is
scheduled.  Even if you choose to not auto dereference an ao2 task data object,
the scheduler itself maintains a reference to it while the task is under it's
control.  This prevents the data from disappearing out from under the task.

There are two scheduling models.

AST_SIP_SCHED_TASK_PERIODIC specifies that the invocations of the task occur at
the specific interval.  That is, every "interval" milliseconds, regardless of
how long the task takes.  If the task takes longer than the interval, it will
be scheduled at the next available multiple of interval.  For exmaple: If the
task has an interval of 60 secs and the task takes 70 secs (it better not),
the next invocation will happen at 120 seconds.

AST_SIP_SCHED_TASK_DELAY specifies that the next invocation of the task should
start "interval" milliseconds after the current invocation has finished.

Also, the same ast_sched facility for fixed or variable intervals exists.  The
task's return code in conjunction with the AST_SIP_SCHED_TASK_FIXED or
AST_SIP_SCHED_TASK_VARIABLE flags controls the next invocation start time.

One res_pjsip.h housekeeping change was made.  The pjsip header files were
added to the top.  There have been a few cases lately where I've needed
res_pjsip.h just for ast_sip calls and had compiles fail spectacularly because
I didn't add the pjsip header files to my source even though I never referenced
any pjsip calls.

Finally, a few new convenience APIs were added to astobj2 to make things a
little easier in the scheduler.  ao2_ref_and_lock() calls ao2_ref() and
ao2_lock() in one go.  ao2_unlock_and_unref() does the reverse. A few macros
were also copied from res_phoneprov because I got tired of having to duplicate
the same hash, sort and compare functions over and over again. The
AO2_STRING_FIELD_(HASH|SORT|CMP)_FN macros will insert functions suitable for
aor_container_alloc into your source.

This facility can be used immediately for the situations where we already have
a thread that wakes up periodically or do some scheduled work.  For the
registration and qualify issues, additional sorcery and schema changes would
need to be made so that we can easily detect changed objects on a periodic
basis without having to pull the entire database back to check.  I'm thinking
of a last-updated timestamp on the rows but more on this later.

Change-Id: I7af6ad2b2d896ea68e478aa1ae201d6dd016ba1c

include/asterisk/astobj2.h
include/asterisk/res_pjsip.h
res/res_pjsip.c
res/res_pjsip/include/res_pjsip_private.h
res/res_pjsip/pjsip_scheduler.c [new file with mode: 0644]
tests/test_res_pjsip_scheduler.c [new file with mode: 0644]

index c28dd23..0472c1b 100644 (file)
@@ -20,6 +20,7 @@
 #include "asterisk/compat.h"
 #include "asterisk/lock.h"
 #include "asterisk/linkedlists.h"
+#include "asterisk/inline_api.h"
 
 /*! \file
  * \ref AstObj2
@@ -725,6 +726,46 @@ int __ao2_trylock(void *a, enum ao2_lock_req lock_how, const char *file, const c
 void *ao2_object_get_lockaddr(void *obj);
 
 
+/*!
+ * \brief Increment reference count on an object and lock it
+ * \since 13.9.0
+ *
+ * \param[in] obj A pointer to the ao2 object
+ * \retval 0 The object is not an ao2 object or wasn't locked successfully
+ * \retval 1 The object's reference count was incremented and was locked
+ */
+AST_INLINE_API(
+int ao2_ref_and_lock(void *obj),
+{
+       ao2_ref(obj, +1);
+       if (ao2_lock(obj)) {
+               ao2_ref(obj, -1);
+               return 0;
+       }
+       return 1;
+}
+)
+
+/*!
+ * \brief Unlock an object and decrement its reference count
+ * \since 13.9.0
+ *
+ * \param[in] obj A pointer to the ao2 object
+ * \retval 0 The object is not an ao2 object or wasn't unlocked successfully
+ * \retval 1 The object was unlocked and it's reference count was decremented
+ */
+AST_INLINE_API(
+int ao2_unlock_and_unref(void *obj),
+{
+       if (ao2_unlock(obj)) {
+               return 0;
+       }
+       ao2_ref(obj, -1);
+
+       return 1;
+}
+)
+
 /*! Global ao2 object holder structure. */
 struct ao2_global_obj {
        /*! Access lock to the held ao2 object. */
@@ -1903,4 +1944,97 @@ void ao2_iterator_cleanup(struct ao2_iterator *iter);
  */
 int ao2_iterator_count(struct ao2_iterator *iter);
 
+/*!
+ * \brief Creates a hash function for a structure string field.
+ * \param stype The structure type
+ * \param field The string field in the structure to hash
+ *
+ * AO2_STRING_FIELD_HASH_CB(mystruct, myfield) will produce a function
+ * named mystruct_hash_fn which hashes mystruct->myfield.
+ */
+#define AO2_STRING_FIELD_HASH_FN(stype, field) \
+static int stype ## _hash_fn(const void *obj, const int flags) \
+{ \
+       const struct stype *object = obj; \
+       const char *key; \
+       switch (flags & OBJ_SEARCH_MASK) { \
+       case OBJ_SEARCH_KEY: \
+               key = obj; \
+               break; \
+       case OBJ_SEARCH_OBJECT: \
+               key = object->field; \
+               break; \
+       default: \
+               ast_assert(0); \
+               return 0; \
+       } \
+       return ast_str_hash(key); \
+}
+
+/*!
+ * \brief Creates a compare function for a structure string field.
+ * \param stype The structure type
+ * \param field The string field in the structure to compare
+ *
+ * AO2_STRING_FIELD_CMP_FN(mystruct, myfield) will produce a function
+ * named mystruct_cmp_fn which compares mystruct->myfield.
+ */
+#define AO2_STRING_FIELD_CMP_FN(stype, field) \
+static int stype ## _cmp_fn(void *obj, void *arg, int flags) \
+{ \
+       const struct stype *object_left = obj, *object_right = arg; \
+       const char *right_key = arg; \
+       int cmp; \
+       switch (flags & OBJ_SEARCH_MASK) { \
+       case OBJ_SEARCH_OBJECT: \
+               right_key = object_right->field; \
+       case OBJ_SEARCH_KEY: \
+               cmp = strcmp(object_left->field, right_key); \
+               break; \
+       case OBJ_SEARCH_PARTIAL_KEY: \
+               cmp = strncmp(object_left->field, right_key, strlen(right_key)); \
+               break; \
+       default: \
+               cmp = 0; \
+               break; \
+       } \
+       if (cmp) { \
+               return 0; \
+       } \
+       return CMP_MATCH; \
+}
+
+/*!
+ * \brief Creates a sort function for a structure string field.
+ * \param stype The structure type
+ * \param field The string field in the structure to compare
+ *
+ * AO2_STRING_FIELD_SORT_FN(mystruct, myfield) will produce a function
+ * named mystruct_sort_fn which compares mystruct->myfield.
+ */
+#define AO2_STRING_FIELD_SORT_FN(stype, field) \
+static int stype ## _sort_fn(const void *obj, const void *arg, int flags) \
+{ \
+       const struct stype *object_left = obj; \
+       const struct stype *object_right = arg; \
+       const char *right_key = arg; \
+       int cmp; \
+\
+       switch (flags & OBJ_SEARCH_MASK) { \
+       case OBJ_SEARCH_OBJECT: \
+               right_key = object_right->field; \
+               /* Fall through */ \
+       case OBJ_SEARCH_KEY: \
+               cmp = strcmp(object_left->field, right_key); \
+               break; \
+       case OBJ_SEARCH_PARTIAL_KEY: \
+               cmp = strncmp(object_left->field, right_key, strlen(right_key)); \
+               break; \
+       default: \
+               cmp = 0; \
+               break; \
+       } \
+       return cmp; \
+}
+
 #endif /* _ASTERISK_ASTOBJ2_H */
index 1bfae66..9015cb8 100644 (file)
 #ifndef _RES_PJSIP_H
 #define _RES_PJSIP_H
 
+#include <pjsip.h>
+/* Needed for SUBSCRIBE, NOTIFY, and PUBLISH method definitions */
+#include <pjsip_simple.h>
+#include <pjsip/sip_transaction.h>
+#include <pj/timer.h>
+#include <pjlib.h>
+
 #include "asterisk/stringfields.h"
 /* Needed for struct ast_sockaddr */
 #include "asterisk/netsock2.h"
@@ -1166,8 +1173,9 @@ struct ast_sip_auth *ast_sip_get_artificial_auth(void);
  */
 struct ast_sip_endpoint *ast_sip_get_artificial_endpoint(void);
 
-/*!
- * \page Threading model for SIP
+/*! \defgroup pjsip_threading PJSIP Threading Model
+ * @{
+ * \page PJSIP PJSIP Threading Model
  *
  * There are three major types of threads that SIP will have to deal with:
  * \li Asterisk threads
@@ -1216,6 +1224,19 @@ struct ast_sip_endpoint *ast_sip_get_artificial_endpoint(void);
  * previous tasks pushed with the same serializer have completed. For more information
  * on serializers and the benefits they provide, see \ref ast_threadpool_serializer
  *
+ * \par Scheduler
+ *
+ * Some situations require that a task run periodically or at a future time.  Normally
+ * the ast_sched functionality would be used but ast_sched only uses 1 thread for all
+ * tasks and that thread isn't registered with PJLIB and therefore can't do any PJSIP
+ * related work.
+ *
+ * ast_sip_sched uses ast_sched only as a scheduled queue.  When a task is ready to run,
+ * it's pushed to a Serializer to be invoked asynchronously by a Servant.  This ensures
+ * that the task is executed in a PJLIB registered thread and allows the ast_sched thread
+ * to immediately continue processing the queue.  The Serializer used by ast_sip_sched
+ * is one of your choosing or a random one from the res_pjsip pool if you don't choose one.
+ *
  * \note
  *
  * Do not make assumptions about individual threads based on a corresponding serializer.
@@ -1224,6 +1245,8 @@ struct ast_sip_endpoint *ast_sip_get_artificial_endpoint(void);
  * tasks, even though they are all guaranteed to be executed in sequence.
  */
 
+typedef int (*ast_sip_task)(void *user_data);
+
 /*!
  * \brief Create a new serializer for SIP tasks
  * \since 13.8.0
@@ -1334,6 +1357,214 @@ int ast_sip_push_task_synchronous(struct ast_taskprocessor *serializer, int (*si
 int ast_sip_thread_is_servant(void);
 
 /*!
+ * \brief Task flags for the res_pjsip scheduler
+ *
+ * The default is AST_SIP_SCHED_TASK_FIXED
+ *                | AST_SIP_SCHED_TASK_DATA_NOT_AO2
+ *                | AST_SIP_SCHED_TASK_DATA_NO_CLEANUP
+ *                | AST_SIP_SCHED_TASK_PERIODIC
+ */
+enum ast_sip_scheduler_task_flags {
+       /*!
+        * The defaults
+        */
+       AST_SIP_SCHED_TASK_DEFAULTS = (0 << 0),
+
+       /*!
+        * Run at a fixed interval.
+        * Stop scheduling if the callback returns 0.
+        * Any other value is ignored.
+        */
+       AST_SIP_SCHED_TASK_FIXED = (0 << 0),
+       /*!
+        * Run at a variable interval.
+        * Stop scheduling if the callback returns 0.
+        * Any other return value is used as the new interval.
+        */
+       AST_SIP_SCHED_TASK_VARIABLE = (1 << 0),
+
+       /*!
+        * The task data is not an AO2 object.
+        */
+       AST_SIP_SCHED_TASK_DATA_NOT_AO2 = (0 << 1),
+       /*!
+        * The task data is an AO2 object.
+        * A reference count will be held by the scheduler until
+        * after the task has run for the final time (if ever).
+        */
+       AST_SIP_SCHED_TASK_DATA_AO2 = (1 << 1),
+
+       /*!
+        * Don't take any cleanup action on the data
+        */
+       AST_SIP_SCHED_TASK_DATA_NO_CLEANUP = (0 << 3),
+       /*!
+        * If AST_SIP_SCHED_TASK_DATA_AO2 is set, decrement the reference count
+        * otherwise call ast_free on it.
+        */
+       AST_SIP_SCHED_TASK_DATA_FREE = ( 1 << 3 ),
+
+       /*! \brief AST_SIP_SCHED_TASK_PERIODIC
+        * The task is scheduled at multiples of interval
+        * \see Interval
+        */
+       AST_SIP_SCHED_TASK_PERIODIC = (0 << 4),
+       /*! \brief AST_SIP_SCHED_TASK_DELAY
+        * The next invocation of the task is at last finish + interval
+        * \see Interval
+        */
+       AST_SIP_SCHED_TASK_DELAY = (1 << 4),
+};
+
+/*!
+ * \brief Scheduler task data structure
+ */
+struct ast_sip_sched_task;
+
+/*!
+ * \brief Schedule a task to run in the res_pjsip thread pool
+ * \since 13.9.0
+ *
+ * \param serializer The serializer to use.  If NULL, don't use a serializer (see note below)
+ * \param interval The invocation interval in milliseconds (see note below)
+ * \param sip_task The task to invoke
+ * \param name An optional name to associate with the task
+ * \param task_data Optional data to pass to the task
+ * \param flags One of enum ast_sip_scheduler_task_type
+ *
+ * \returns Pointer to \ref ast_sip_sched_task ao2 object which must be dereferenced when done.
+ *
+ * \paragraph Serialization
+ *
+ * Specifying a serializer guarantees serialized execution but NOT specifying a serializer
+ * may still result in tasks being effectively serialized if the thread pool is busy.
+ * The point of the serializer BTW is not to prevent parallel executions of the SAME task.
+ * That happens automatically (see below).  It's to prevent the task from running at the same
+ * time as other work using the same serializer, whether or not it's being run by the scheduler.
+ *
+ * \paragraph Interval
+ *
+ * The interval is used to calculate the next time the task should run.  There are two models.
+ *
+ * \ref AST_SIP_SCHED_TASK_PERIODIC specifies that the invocations of the task occur at the
+ * specific interval.  That is, every \ref "interval" milliseconds, regardless of how long the task
+ * takes. If the task takes longer than \ref interval, it will be scheduled at the next available
+ * multiple of \ref interval.  For exmaple: If the task has an interval of 60 seconds and the task
+ * takes 70 seconds, the next invocation will happen at 120 seconds.
+ *
+ * \ref AST_SIP_SCHED_TASK_DELAY specifies that the next invocation of the task should start
+ * at \ref interval milliseconds after the current invocation has finished.
+ *
+ */
+struct ast_sip_sched_task *ast_sip_schedule_task(struct ast_taskprocessor *serializer,
+       int interval, ast_sip_task sip_task, char *name, void *task_data,
+       enum ast_sip_scheduler_task_flags flags);
+
+/*!
+ * \brief Cancels the next invocation of a task
+ * \since 13.9.0
+ *
+ * \param schtd The task structure pointer
+ * \retval 0 Success
+ * \retval -1 Failure
+ * \note Only cancels future invocations not the currently running invocation.
+ */
+int ast_sip_sched_task_cancel(struct ast_sip_sched_task *schtd);
+
+/*!
+ * \brief Cancels the next invocation of a task by name
+ * \since 13.9.0
+ *
+ * \param name The task name
+ * \retval 0 Success
+ * \retval -1 Failure
+ * \note Only cancels future invocations not the currently running invocation.
+ */
+int ast_sip_sched_task_cancel_by_name(const char *name);
+
+/*!
+ * \brief Gets the last start and end times of the task
+ * \since 13.9.0
+ *
+ * \param schtd The task structure pointer
+ * \param[out] when_queued Pointer to a timeval structure to contain the time when queued
+ * \param[out] last_start Pointer to a timeval structure to contain the time when last started
+ * \param[out] last_end Pointer to a timeval structure to contain the time when last ended
+ * \retval 0 Success
+ * \retval -1 Failure
+ * \note Any of the pointers can be NULL if you don't need them.
+ */
+int ast_sip_sched_task_get_times(struct ast_sip_sched_task *schtd,
+       struct timeval *when_queued, struct timeval *last_start, struct timeval *last_end);
+
+/*!
+ * \brief Gets the last start and end times of the task by name
+ * \since 13.9.0
+ *
+ * \param name The task name
+ * \param[out] when_queued Pointer to a timeval structure to contain the time when queued
+ * \param[out] last_start Pointer to a timeval structure to contain the time when last started
+ * \param[out] last_end Pointer to a timeval structure to contain the time when last ended
+ * \retval 0 Success
+ * \retval -1 Failure
+ * \note Any of the pointers can be NULL if you don't need them.
+ */
+int ast_sip_sched_task_get_times_by_name(const char *name,
+       struct timeval *when_queued, struct timeval *last_start, struct timeval *last_end);
+
+/*!
+ * \brief Gets the number of milliseconds until the next invocation
+ * \since 13.9.0
+ *
+ * \param schtd The task structure pointer
+ * \return The number of milliseconds until the next invocation or -1 if the task isn't scheduled
+ */
+int ast_sip_sched_task_get_next_run(struct ast_sip_sched_task *schtd);
+
+/*!
+ * \brief Gets the number of milliseconds until the next invocation
+ * \since 13.9.0
+ *
+ * \param name The task name
+ * \return The number of milliseconds until the next invocation or -1 if the task isn't scheduled
+ */
+int ast_sip_sched_task_get_next_run_by_name(const char *name);
+
+/*!
+ * \brief Checks if the task is currently running
+ * \since 13.9.0
+ *
+ * \param schtd The task structure pointer
+ * \retval 0 not running
+ * \retval 1 running
+ */
+int ast_sip_sched_is_task_running(struct ast_sip_sched_task *schtd);
+
+/*!
+ * \brief Checks if the task is currently running
+ * \since 13.9.0
+ *
+ * \param name The task name
+ * \retval 0 not running or not found
+ * \retval 1 running
+ */
+int ast_sip_sched_is_task_running_by_name(const char *name);
+
+/*!
+ * \brief Gets the task name
+ * \since 13.9.0
+ *
+ * \param schtd The task structure pointer
+ * \retval 0 success
+ * \retval 1 failure
+ */
+int ast_sip_sched_task_get_name(struct ast_sip_sched_task *schtd, char *name, size_t maxlen);
+
+/*!
+ *  @}
+ */
+
+/*!
  * \brief SIP body description
  *
  * This contains a type and subtype that will be added as
index d12951c..82bd7c9 100644 (file)
@@ -3636,11 +3636,7 @@ int ast_sip_push_task(struct ast_taskprocessor *serializer, int (*sip_task)(void
                serializer = serializer_pool[pos];
        }
 
-       if (serializer) {
-               return ast_taskprocessor_push(serializer, sip_task, task_data);
-       } else {
-               return ast_threadpool_push(sip_threadpool, sip_task, task_data);
-       }
+       return ast_taskprocessor_push(serializer, sip_task, task_data);
 }
 
 struct sync_task_data {
@@ -4158,6 +4154,11 @@ static int load_module(void)
                goto error;
        }
 
+       if (ast_sip_initialize_scheduler()) {
+               ast_log(LOG_ERROR, "Failed to start scheduler. Aborting load\n");
+               goto error;
+       }
+
        /* Now load all the pjproject infrastructure. */
        if (load_pjsip()) {
                goto error;
@@ -4196,8 +4197,10 @@ static int load_module(void)
        return AST_MODULE_LOAD_SUCCESS;
 
 error:
-       /* These functions all check for NULLs and are safe to call at any time */
        unload_pjsip(NULL);
+
+       /* These functions all check for NULLs and are safe to call at any time */
+       ast_sip_destroy_scheduler();
        serializer_pool_shutdown();
        ast_threadpool_shutdown(sip_threadpool);
 
@@ -4228,7 +4231,7 @@ static int unload_module(void)
         * so we have to push the work to the threadpool to handle
         */
        ast_sip_push_task_synchronous(NULL, unload_pjsip, NULL);
-
+       ast_sip_destroy_scheduler();
        serializer_pool_shutdown();
        ast_threadpool_shutdown(sip_threadpool);
 
index 24b8edf..b175b5e 100644 (file)
@@ -313,4 +313,23 @@ struct ast_sip_contact_status *ast_res_pjsip_find_or_create_contact_status(const
  */
 int ast_sip_validate_uri_length(const char *uri);
 
+/*!
+ * \brief Initialize scheduler
+ * \since 13.9.0
+ *
+ * \retval -1 failure
+ * \retval 0 success
+ */
+int ast_sip_initialize_scheduler(void);
+
+/*!
+ * \internal
+ * \brief Destroy scheduler
+ * \since 13.9.0
+ *
+ * \retval -1 failure
+ * \retval 0 success
+ */
+int ast_sip_destroy_scheduler(void);
+
 #endif /* RES_PJSIP_PRIVATE_H_ */
diff --git a/res/res_pjsip/pjsip_scheduler.c b/res/res_pjsip/pjsip_scheduler.c
new file mode 100644 (file)
index 0000000..a5d406c
--- /dev/null
@@ -0,0 +1,495 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2016, Fairview 5 Engineering, LLC
+ *
+ * George Joseph <george.joseph@fairview5.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*! \file
+ *
+ * \brief res_pjsip Scheduler
+ *
+ * \author George Joseph <george.joseph@fairview5.com>
+ */
+
+#include "asterisk.h"
+
+ASTERISK_REGISTER_FILE()
+
+#include "asterisk/res_pjsip.h"
+#include "include/res_pjsip_private.h"
+#include "asterisk/res_pjsip_cli.h"
+
+#define TASK_BUCKETS 53
+
+static struct ast_sched_context *scheduler_context;
+static struct ao2_container *tasks;
+static int task_count;
+
+struct ast_sip_sched_task {
+       /*! ast_sip_sched task id */
+       uint32_t task_id;
+       /*! ast_sched scheudler id */
+       int current_scheduler_id;
+       /*! task is currently running */
+       int is_running;
+       /*! task */
+       ast_sip_task task;
+       /*! task data */
+       void *task_data;
+       /*! reschedule interval in milliseconds */
+       int interval;
+       /*! the time the task was queued */
+       struct timeval when_queued;
+       /*! the last time the task was started */
+       struct timeval last_start;
+       /*! the last time the task was ended */
+       struct timeval last_end;
+       /*! times run */
+       int run_count;
+       /*! the task reschedule, cleanup and policy flags */
+       enum ast_sip_scheduler_task_flags flags;
+       /*! the serializer to be used (if any) */
+       struct ast_taskprocessor *serializer;
+       /* A name to be associated with the task */
+       char name[0];
+};
+
+AO2_STRING_FIELD_HASH_FN(ast_sip_sched_task, name);
+AO2_STRING_FIELD_CMP_FN(ast_sip_sched_task, name);
+AO2_STRING_FIELD_SORT_FN(ast_sip_sched_task, name);
+
+static int push_to_serializer(const void *data);
+
+/*
+ * This function is run in the context of the serializer.
+ * It runs the task with a simple call and reschedules based on the result.
+ */
+static int run_task(void *data)
+{
+       RAII_VAR(struct ast_sip_sched_task *, schtd, ao2_bump(data), ao2_cleanup);
+       int res;
+       int delay;
+
+       ao2_lock(schtd);
+       schtd->last_start = ast_tvnow();
+       schtd->is_running = 1;
+       schtd->run_count++;
+       ao2_unlock(schtd);
+
+       res = schtd->task(schtd->task_data);
+
+       ao2_lock(schtd);
+       schtd->is_running = 0;
+       schtd->last_end = ast_tvnow();
+
+       /*
+        * Don't restart if the task returned 0 or if the interval
+        * was set to 0 while the task was running
+        */
+       if (!res || !schtd->interval) {
+               schtd->interval = 0;
+               ao2_unlock(schtd);
+               ao2_unlink(tasks, schtd);
+               return -1;
+       }
+
+       if (schtd->flags & AST_SIP_SCHED_TASK_VARIABLE) {
+               schtd->interval = res;
+       }
+
+       if (schtd->flags & AST_SIP_SCHED_TASK_DELAY) {
+               delay = schtd->interval;
+       } else {
+               delay = schtd->interval - (ast_tvdiff_ms(schtd->last_end, schtd->last_start) % schtd->interval);
+       }
+
+       schtd->current_scheduler_id = ast_sched_add(scheduler_context, delay, push_to_serializer, (const void *)schtd);
+       if (schtd->current_scheduler_id < 0) {
+               schtd->interval = 0;
+               ao2_unlock(schtd);
+               ao2_unlink(tasks, schtd);
+               return -1;
+       }
+
+       ao2_unlock(schtd);
+
+       return 0;
+}
+
+/*
+ * This function is run by the scheduler thread.  Its only job is to push the task
+ * to the serialize and return.  It returns 0 so it's not rescheduled.
+ */
+static int push_to_serializer(const void *data)
+{
+       struct ast_sip_sched_task *schtd = (struct ast_sip_sched_task *)data;
+
+       if (ast_sip_push_task(schtd->serializer, run_task, schtd)) {
+               ao2_ref(schtd, -1);
+       }
+
+       return 0;
+}
+
+int ast_sip_sched_task_cancel(struct ast_sip_sched_task *schtd)
+{
+       int res;
+
+       if (!ao2_ref_and_lock(schtd)) {
+               return -1;
+       }
+
+       if (schtd->current_scheduler_id < 0 || schtd->interval <= 0) {
+               ao2_unlock_and_unref(schtd);
+               return 0;
+       }
+
+       schtd->interval = 0;
+       ao2_unlock_and_unref(schtd);
+       ao2_unlink(tasks, schtd);
+       res = ast_sched_del(scheduler_context, schtd->current_scheduler_id);
+
+       return res;
+}
+
+int ast_sip_sched_task_cancel_by_name(const char *name)
+{
+       RAII_VAR(struct ast_sip_sched_task *, schtd, NULL, ao2_cleanup);
+
+       if (ast_strlen_zero(name)) {
+               return -1;
+       }
+
+       schtd = ao2_find(tasks, name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+       if (!schtd) {
+               return -1;
+       }
+
+       return ast_sip_sched_task_cancel(schtd);
+}
+
+
+int ast_sip_sched_task_get_times(struct ast_sip_sched_task *schtd,
+       struct timeval *queued, struct timeval *last_start, struct timeval *last_end)
+{
+       if (!ao2_ref_and_lock(schtd)) {
+               return -1;
+       }
+
+       if (queued) {
+               memcpy(queued, &schtd->when_queued, sizeof(struct timeval));
+       }
+       if (last_start) {
+               memcpy(last_start, &schtd->last_start, sizeof(struct timeval));
+       }
+       if (last_end) {
+               memcpy(last_end, &schtd->last_end, sizeof(struct timeval));
+       }
+
+       ao2_unlock_and_unref(schtd);
+
+       return 0;
+}
+
+int ast_sip_sched_task_get_times_by_name(const char *name,
+       struct timeval *queued, struct timeval *last_start, struct timeval *last_end)
+{
+       RAII_VAR(struct ast_sip_sched_task *, schtd, NULL, ao2_cleanup);
+
+       if (ast_strlen_zero(name)) {
+               return -1;
+       }
+
+       schtd = ao2_find(tasks, name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+       if (!schtd) {
+               return -1;
+       }
+
+       return ast_sip_sched_task_get_times(schtd, queued, last_start, last_end);
+}
+
+int ast_sip_sched_task_get_name(struct ast_sip_sched_task *schtd, char *name, size_t maxlen)
+{
+       if (maxlen <= 0) {
+               return -1;
+       }
+
+       if (!ao2_ref_and_lock(schtd)) {
+               return -1;
+       }
+
+       ast_copy_string(name, schtd->name, maxlen);
+
+       ao2_unlock_and_unref(schtd);
+
+       return 0;
+}
+
+int ast_sip_sched_task_get_next_run(struct ast_sip_sched_task *schtd)
+{
+       int delay;
+       struct timeval since_when;
+       struct timeval now;
+
+       if (!ao2_ref_and_lock(schtd)) {
+               return -1;
+       }
+
+       if (schtd->interval) {
+               delay = schtd->interval;
+               now = ast_tvnow();
+
+               if (schtd->flags & AST_SIP_SCHED_TASK_DELAY) {
+                       since_when = schtd->is_running ? now : schtd->last_end;
+               } else {
+                       since_when = schtd->last_start.tv_sec ? schtd->last_start : schtd->when_queued;
+               }
+
+               delay -= ast_tvdiff_ms(now, since_when);
+
+               delay = delay < 0 ? 0 : delay;
+       } else {
+               delay = -1;
+       }
+
+       ao2_unlock_and_unref(schtd);
+
+       return delay;
+}
+
+int ast_sip_sched_task_get_next_run_by_name(const char *name)
+{
+       RAII_VAR(struct ast_sip_sched_task *, schtd, NULL, ao2_cleanup);
+
+       if (ast_strlen_zero(name)) {
+               return -1;
+       }
+
+       schtd = ao2_find(tasks, name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+       if (!schtd) {
+               return -1;
+       }
+
+       return ast_sip_sched_task_get_next_run(schtd);
+}
+
+int ast_sip_sched_is_task_running(struct ast_sip_sched_task *schtd)
+{
+       if (!schtd) {
+               return 0;
+       }
+
+       return schtd->is_running;
+}
+
+int ast_sip_sched_is_task_running_by_name(const char *name)
+{
+       RAII_VAR(struct ast_sip_sched_task *, schtd, NULL, ao2_cleanup);
+
+       if (ast_strlen_zero(name)) {
+               return 0;
+       }
+
+       schtd = ao2_find(tasks, name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+       if (!schtd) {
+               return 0;
+       }
+
+       return schtd->is_running;
+}
+
+static void schtd_destructor(void *data)
+{
+       struct ast_sip_sched_task *schtd = data;
+
+       if (schtd->flags & AST_SIP_SCHED_TASK_DATA_AO2) {
+               /* release our own ref, then release the callers if asked to do so */
+               ao2_ref(schtd->task_data, (schtd->flags & AST_SIP_SCHED_TASK_DATA_FREE) ? -2 : -1);
+       } else if (schtd->task_data && (schtd->flags & AST_SIP_SCHED_TASK_DATA_FREE)) {
+               ast_free(schtd->task_data);
+       }
+}
+
+struct ast_sip_sched_task *ast_sip_schedule_task(struct ast_taskprocessor *serializer,
+       int interval, ast_sip_task sip_task, char *name, void *task_data, enum ast_sip_scheduler_task_flags flags)
+{
+#define ID_LEN 13 /* task_deadbeef */
+       struct ast_sip_sched_task *schtd;
+       int res;
+
+       if (interval < 0) {
+               return NULL;
+       }
+
+       schtd = ao2_alloc((sizeof(*schtd) + (!ast_strlen_zero(name) ? strlen(name) : ID_LEN) + 1), schtd_destructor);
+       if (!schtd) {
+               return NULL;
+       }
+
+       schtd->task_id = ast_atomic_fetchadd_int(&task_count, 1);
+       schtd->serializer = serializer;
+       schtd->task = sip_task;
+       if (!ast_strlen_zero(name)) {
+               strcpy(schtd->name, name); /* Safe */
+       } else {
+               sprintf(schtd->name, "task_%08x", schtd->task_id);
+       }
+       schtd->task_data = task_data;
+       schtd->flags = flags;
+       schtd->interval = interval;
+       schtd->when_queued = ast_tvnow();
+
+       if (flags & AST_SIP_SCHED_TASK_DATA_AO2) {
+               ao2_ref(task_data, +1);
+       }
+       res = ast_sched_add(scheduler_context, interval, push_to_serializer, (const void *)schtd);
+       if (res < 0) {
+               ao2_ref(schtd, -1);
+               return NULL;
+       } else {
+               schtd->current_scheduler_id = res;
+               ao2_link(tasks, schtd);
+       }
+
+       return schtd;
+#undef ID_LEN
+}
+
+static char *cli_show_tasks(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
+{
+       struct ao2_iterator i;
+       struct ast_sip_sched_task *schtd;
+       const char *log_format = ast_logger_get_dateformat();
+       struct ast_tm tm;
+       char queued[32];
+       char last_start[32];
+       char last_end[32];
+       int datelen;
+       struct timeval now = ast_tvnow();
+       const char *separator = "======================================";
+
+       switch (cmd) {
+       case CLI_INIT:
+               e->command = "pjsip show scheduled_tasks";
+               e->usage = "Usage: pjsip show scheduled_tasks\n"
+                           "      Show all scheduled tasks\n";
+               return NULL;
+       case CLI_GENERATE:
+               return NULL;
+       }
+
+       if (a->argc != 3) {
+               return CLI_SHOWUSAGE;
+       }
+
+       ast_localtime(&now, &tm, NULL);
+       datelen = ast_strftime(queued, sizeof(queued), log_format, &tm);
+
+       ast_cli(a->fd, "PJSIP Scheduled Tasks:\n\n");
+
+       ast_cli(a->fd, " %1$-24s %2$-8s %3$-9s %4$-7s  %6$-*5$s  %7$-*5$s  %8$-*5$s\n",
+               "Task Name", "Interval", "Times Run", "State",
+               datelen, "Queued", "Last Started", "Last Ended");
+
+       ast_cli(a->fd, " %1$-24.24s %2$-8.8s %3$-9.9s %4$-7.7s  %6$-*5$.*5$s  %7$-*5$.*5$s  %8$-*5$.*5$s\n",
+               separator, separator, separator, separator,
+               datelen, separator, separator, separator);
+
+
+       ao2_ref(tasks, +1);
+       ao2_rdlock(tasks);
+       i = ao2_iterator_init(tasks, 0);
+       while ((schtd = ao2_iterator_next(&i))) {
+
+               ast_localtime(&schtd->when_queued, &tm, NULL);
+               ast_strftime(queued, sizeof(queued), log_format, &tm);
+
+               if (ast_tvzero(schtd->last_start)) {
+                       strcpy(last_start, "not yet started");
+               } else {
+                       ast_localtime(&schtd->last_start, &tm, NULL);
+                       ast_strftime(last_start, sizeof(last_start), log_format, &tm);
+               }
+
+               if (ast_tvzero(schtd->last_end)) {
+                       if (ast_tvzero(schtd->last_start)) {
+                               strcpy(last_end, "not yet started");
+                       } else {
+                               strcpy(last_end, "running");
+                       }
+               } else {
+                       ast_localtime(&schtd->last_end, &tm, NULL);
+                       ast_strftime(last_end, sizeof(last_end), log_format, &tm);
+               }
+
+               ast_cli(a->fd, " %1$-24.24s %2$-8.3f %3$-9d %4$-7s  %6$-*5$s  %7$-*5$s  %8$-*5$s\n",
+                       schtd->name,
+                       schtd->interval / 1000.0,
+                       schtd->run_count,
+                       schtd->is_running ? "running" : "waiting",
+                       datelen, queued, last_start, last_end);
+               ao2_cleanup(schtd);
+       }
+       ao2_iterator_destroy(&i);
+       ao2_unlock(tasks);
+       ao2_ref(tasks, -1);
+       ast_cli(a->fd, "\n");
+
+       return CLI_SUCCESS;
+}
+
+static struct ast_cli_entry cli_commands[] = {
+       AST_CLI_DEFINE(cli_show_tasks, "Show all scheduled tasks"),
+};
+
+int ast_sip_initialize_scheduler(void)
+{
+       if (!(scheduler_context = ast_sched_context_create())) {
+               ast_log(LOG_ERROR, "Failed to create scheduler. Aborting load\n");
+               return -1;
+       }
+
+       if (ast_sched_start_thread(scheduler_context)) {
+               ast_log(LOG_ERROR, "Failed to start scheduler. Aborting load\n");
+               ast_sched_context_destroy(scheduler_context);
+               return -1;
+       }
+
+       tasks = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_RWLOCK, AO2_CONTAINER_ALLOC_OPT_DUPS_REJECT,
+               TASK_BUCKETS, ast_sip_sched_task_hash_fn, ast_sip_sched_task_sort_fn, ast_sip_sched_task_cmp_fn);
+       if (!tasks) {
+               ast_log(LOG_ERROR, "Failed to allocate task container. Aborting load\n");
+               ast_sched_context_destroy(scheduler_context);
+               return -1;
+       }
+
+       ast_cli_register_multiple(cli_commands, ARRAY_LEN(cli_commands));
+
+       return 0;
+}
+
+int ast_sip_destroy_scheduler(void)
+{
+       ast_cli_unregister_multiple(cli_commands, ARRAY_LEN(cli_commands));
+
+       if (scheduler_context) {
+               ast_sched_context_destroy(scheduler_context);
+       }
+
+       ao2_cleanup(tasks);
+       tasks = NULL;
+
+       return 0;
+}
diff --git a/tests/test_res_pjsip_scheduler.c b/tests/test_res_pjsip_scheduler.c
new file mode 100644 (file)
index 0000000..22d2c99
--- /dev/null
@@ -0,0 +1,400 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2016, Fairview 5 Engineering, LLC
+ *
+ * George Joseph <george.joseph@fairview5.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*!
+ * \file
+ * \brief res_pjsip scheduler tests
+ *
+ * \author George Joseph <george.joseph@fairview5.com>
+ *
+ */
+
+/*** MODULEINFO
+       <depend>TEST_FRAMEWORK</depend>
+       <depend>res_pjsip</depend>
+       <support_level>core</support_level>
+ ***/
+
+#include "asterisk.h"
+
+ASTERISK_REGISTER_FILE()
+
+#include <pjsip.h>
+#include "asterisk/test.h"
+#include "asterisk/module.h"
+#include "asterisk/taskprocessor.h"
+#include "asterisk/res_pjsip.h"
+#include "asterisk/utils.h"
+
+#define CATEGORY "/res/res_pjsip/scheduler/"
+
+struct test_data {
+       ast_mutex_t lock;
+       ast_cond_t cond;
+       pthread_t tid;
+       struct timeval test_start;
+       struct timeval task_start;
+       struct timeval task_end;
+       int is_servant;
+       int interval;
+       int sleep;
+       int done;
+       struct ast_test *test;
+};
+
+#define S2U(x) (long int)(x * 1000 * 1000)
+#define M2U(x) (long int)(x * 1000)
+
+static int task_1(void *data)
+{
+       struct test_data *test = data;
+
+       test->done = 0;
+       test->task_start = ast_tvnow();
+       test->tid = pthread_self();
+       test->is_servant = ast_sip_thread_is_servant();
+       usleep(M2U(test->sleep));
+       test->task_end = ast_tvnow();
+
+       ast_mutex_lock(&test->lock);
+       test->done = 1;
+       ast_mutex_unlock(&test->lock);
+       ast_cond_signal(&test->cond);
+
+       return test->interval;
+}
+
+
+static void data_cleanup(void *data)
+{
+       struct test_data *test_data = data;
+       ast_mutex_destroy(&test_data->lock);
+       ast_cond_destroy(&test_data->cond);
+}
+
+#define waitfor(x) \
+{ \
+       ast_mutex_lock(&(x)->lock); \
+       while (!(x)->done) { \
+               ast_cond_wait(&(x)->cond, &(x)->lock); \
+       } \
+       (x)->done = 0; \
+       ast_mutex_unlock(&(x)->lock); \
+}
+
+static int scheduler(struct ast_test *test, int serialized)
+{
+       RAII_VAR(struct ast_taskprocessor *, tp1, NULL, ast_taskprocessor_unreference);
+       RAII_VAR(struct test_data *, test_data1, ao2_alloc(sizeof(*test_data1), data_cleanup), ao2_cleanup);
+       RAII_VAR(struct test_data *, test_data2, ao2_alloc(sizeof(*test_data2), data_cleanup), ao2_cleanup);
+       RAII_VAR(struct ast_sip_sched_task *, task1, NULL, ao2_cleanup);
+       RAII_VAR(struct ast_sip_sched_task *, task2, NULL, ao2_cleanup);
+       int duration;
+       int delay;
+       struct timeval task1_start;
+
+       ast_test_validate(test, test_data1 != NULL);
+       ast_test_validate(test, test_data2 != NULL);
+
+       test_data1->test = test;
+       test_data1->test_start = ast_tvnow();
+       test_data1->interval = 2000;
+       test_data1->sleep = 1000;
+       ast_mutex_init(&test_data1->lock);
+       ast_cond_init(&test_data1->cond, NULL);
+
+       test_data2->test = test;
+       test_data2->test_start = ast_tvnow();
+       test_data2->interval = 2000;
+       test_data2->sleep = 1000;
+       ast_mutex_init(&test_data2->lock);
+       ast_cond_init(&test_data2->cond, NULL);
+
+       if (serialized) {
+               ast_test_status_update(test, "This test will take about %3.1f seconds\n",
+                       (test_data1->interval + test_data1->sleep + (MAX(test_data1->interval - test_data2->interval, 0)) + test_data2->sleep) / 1000.0);
+               tp1 = ast_sip_create_serializer("test-scheduler-serializer");
+               ast_test_validate(test, (tp1 != NULL));
+       } else {
+               ast_test_status_update(test, "This test will take about %3.1f seconds\n",
+                       ((MAX(test_data1->interval, test_data2->interval) + MAX(test_data1->sleep, test_data2->sleep)) / 1000.0));
+       }
+
+       task1 = ast_sip_schedule_task(tp1, test_data1->interval, task_1, NULL, test_data1, AST_SIP_SCHED_TASK_FIXED);
+       ast_test_validate(test, task1 != NULL);
+
+       task2 = ast_sip_schedule_task(tp1, test_data2->interval, task_1, NULL, test_data2, AST_SIP_SCHED_TASK_FIXED);
+       ast_test_validate(test, task2 != NULL);
+
+       waitfor(test_data1);
+       ast_sip_sched_task_cancel(task1);
+       ast_test_validate(test, test_data1->is_servant);
+
+       duration = ast_tvdiff_ms(test_data1->task_end, test_data1->test_start);
+       ast_test_validate(test, (duration > ((test_data1->interval + test_data1->sleep) * 0.9))
+               && (duration < ((test_data1->interval + test_data1->sleep) * 1.1)));
+
+       ast_sip_sched_task_get_times(task1, NULL, &task1_start, NULL);
+       delay = ast_tvdiff_ms(task1_start, test_data1->test_start);
+       ast_test_validate(test, (delay > (test_data1->interval * 0.9)
+               && (delay < (test_data1->interval * 1.1))));
+
+       waitfor(test_data2);
+       ast_sip_sched_task_cancel(task2);
+       ast_test_validate(test, test_data2->is_servant);
+
+       if (serialized) {
+               ast_test_validate(test, test_data1->tid == test_data2->tid);
+               ast_test_validate(test, ast_tvdiff_ms(test_data2->task_start, test_data1->task_end) >= 0);
+       } else {
+               ast_test_validate(test, test_data1->tid != test_data2->tid);
+       }
+
+       return AST_TEST_PASS;
+}
+
+AST_TEST_DEFINE(serialized_scheduler)
+{
+
+       switch (cmd) {
+       case TEST_INIT:
+               info->name = __func__;
+               info->category = CATEGORY;
+               info->summary = "Test res_pjsip serialized scheduler";
+               info->description = "Test res_pjsip serialized scheduler";
+               return AST_TEST_NOT_RUN;
+       case TEST_EXECUTE:
+               break;
+       }
+
+       return scheduler(test, 1);
+}
+
+AST_TEST_DEFINE(unserialized_scheduler)
+{
+
+       switch (cmd) {
+       case TEST_INIT:
+               info->name = __func__;
+               info->category = CATEGORY;
+               info->summary = "Test res_pjsip unserialized scheduler";
+               info->description = "Test res_pjsip unserialized scheduler";
+               return AST_TEST_NOT_RUN;
+       case TEST_EXECUTE:
+               break;
+       }
+
+       return scheduler(test, 0);
+}
+
+static int run_count;
+static int destruct_count;
+
+static int dummy_task(void *data)
+{
+       int *sleep = data;
+
+       usleep(M2U(*sleep));
+       run_count++;
+
+       return 0;
+}
+
+static void test_destructor(void *data)
+{
+       destruct_count++;
+}
+
+AST_TEST_DEFINE(scheduler_cleanup)
+{
+       RAII_VAR(int *, sleep, NULL, ao2_cleanup);
+       RAII_VAR(struct ast_sip_sched_task *, task, NULL, ao2_cleanup);
+       int interval;
+       int when;
+
+       switch (cmd) {
+       case TEST_INIT:
+               info->name = __func__;
+               info->category = CATEGORY;
+               info->summary = "Test res_pjsip scheduler cleanup";
+               info->description = "Test res_pjsip scheduler cleanup";
+               return AST_TEST_NOT_RUN;
+       case TEST_EXECUTE:
+               break;
+       }
+
+       destruct_count = 0;
+       interval = 1000;
+
+       sleep = ao2_alloc(sizeof(*sleep), test_destructor);
+       ast_test_validate(test, sleep != NULL);
+       *sleep = 500;
+
+       ast_test_status_update(test, "This test will take about %3.1f seconds\n",
+               ((interval * 1.1) + *sleep) / 1000.0);
+
+       task = ast_sip_schedule_task(NULL, interval, dummy_task, "dummy", sleep,
+               AST_SIP_SCHED_TASK_DATA_AO2 | AST_SIP_SCHED_TASK_DATA_FREE);
+       ast_test_validate(test, task != NULL);
+       usleep(M2U(interval * 0.5));
+       when = ast_sip_sched_task_get_next_run(task);
+       ast_test_validate(test, (when > (interval * 0.4) && when < (interval * 0.6)));
+       usleep(M2U(interval * 0.6));
+       ast_test_validate(test, ast_sip_sched_is_task_running(task));
+
+       usleep(M2U(*sleep));
+
+       ast_test_validate(test, (ast_sip_sched_is_task_running(task) == 0));
+       when = ast_sip_sched_task_get_next_run(task);
+       ast_test_validate(test, (when < 0), res, error);
+       ast_test_validate(test, (ao2_ref(task, 0) == 1));
+       ao2_ref(task, -1);
+       task = NULL;
+       ast_test_validate(test, (destruct_count == 1));
+       sleep = NULL;
+
+       return AST_TEST_PASS;
+}
+
+AST_TEST_DEFINE(scheduler_cancel)
+{
+       RAII_VAR(int *, sleep, NULL, ao2_cleanup);
+       RAII_VAR(struct ast_sip_sched_task *, task, NULL, ao2_cleanup);
+       int interval;
+       int when;
+
+       switch (cmd) {
+       case TEST_INIT:
+               info->name = __func__;
+               info->category = CATEGORY;
+               info->summary = "Test res_pjsip scheduler cancel task";
+               info->description = "Test res_pjsip scheduler cancel task";
+               return AST_TEST_NOT_RUN;
+       case TEST_EXECUTE:
+               break;
+       }
+
+       destruct_count = 0;
+       interval = 1000;
+
+       sleep = ao2_alloc(sizeof(*sleep), test_destructor);
+       ast_test_validate(test, sleep != NULL);
+       *sleep = 500;
+
+       ast_test_status_update(test, "This test will take about %3.1f seconds\n",
+               (interval + *sleep) / 1000.0);
+
+       task = ast_sip_schedule_task(NULL, interval, dummy_task, "dummy", sleep, AST_SIP_SCHED_TASK_DATA_NO_CLEANUP);
+       ast_test_validate(test, task != NULL);
+
+       usleep(M2U(interval * 0.5));
+       when = ast_sip_sched_task_get_next_run_by_name("dummy");
+       ast_test_validate(test, (when > (interval * 0.4) && when < (interval * 0.6)));
+       ast_test_validate(test, !ast_sip_sched_is_task_running_by_name("dummy"));
+       ast_test_validate(test, ao2_ref(task, 0) == 2);
+
+       ast_sip_sched_task_cancel_by_name("dummy");
+
+       when = ast_sip_sched_task_get_next_run(task);
+       ast_test_validate(test, when < 0);
+
+       usleep(M2U(interval));
+       ast_test_validate(test, run_count == 0);
+       ast_test_validate(test, destruct_count == 0);
+       ast_test_validate(test, ao2_ref(task, 0) == 1);
+
+       return AST_TEST_PASS;
+}
+
+AST_TEST_DEFINE(scheduler_policy)
+{
+       RAII_VAR(struct test_data *, test_data1, ao2_alloc(sizeof(*test_data1), data_cleanup), ao2_cleanup);
+       RAII_VAR(struct ast_sip_sched_task *, task, NULL, ao2_cleanup);
+       int when;
+
+       switch (cmd) {
+       case TEST_INIT:
+               info->name = __func__;
+               info->category = CATEGORY;
+               info->summary = "Test res_pjsip scheduler cancel task";
+               info->description = "Test res_pjsip scheduler cancel task";
+               return AST_TEST_NOT_RUN;
+       case TEST_EXECUTE:
+               break;
+       }
+
+       ast_test_validate(test, test_data1 != NULL);
+
+       destruct_count = 0;
+       run_count = 0;
+       test_data1->test = test;
+       test_data1->test_start = ast_tvnow();
+       test_data1->interval = 1000;
+       test_data1->sleep = 500;
+       ast_mutex_init(&test_data1->lock);
+       ast_cond_init(&test_data1->cond, NULL);
+
+       ast_test_status_update(test, "This test will take about %3.1f seconds\n",
+               ((test_data1->interval * 3) + test_data1->sleep) / 1000.0);
+
+       task = ast_sip_schedule_task(NULL, test_data1->interval, task_1, "test_1", test_data1,
+               AST_SIP_SCHED_TASK_DATA_NO_CLEANUP | AST_SIP_SCHED_TASK_PERIODIC);
+       ast_test_validate(test, task != NULL);
+
+       waitfor(test_data1);
+       when = ast_tvdiff_ms(test_data1->task_start, test_data1->test_start);
+       ast_test_validate(test, when > test_data1->interval * 0.9 && when < test_data1->interval * 1.1);
+
+       waitfor(test_data1);
+       when = ast_tvdiff_ms(test_data1->task_start, test_data1->test_start);
+       ast_test_validate(test, when > test_data1->interval * 2 * 0.9 && when < test_data1->interval * 2 * 1.1);
+
+       waitfor(test_data1);
+       when = ast_tvdiff_ms(test_data1->task_start, test_data1->test_start);
+       ast_test_validate(test, when > test_data1->interval * 3 * 0.9 && when < test_data1->interval * 3 * 1.1);
+
+       ast_sip_sched_task_cancel(task);
+       ao2_ref(task, -1);
+       task = NULL;
+
+       return AST_TEST_PASS;
+}
+
+static int load_module(void)
+{
+       CHECK_PJSIP_MODULE_LOADED();
+
+       AST_TEST_REGISTER(serialized_scheduler);
+       AST_TEST_REGISTER(unserialized_scheduler);
+       AST_TEST_REGISTER(scheduler_cleanup);
+       AST_TEST_REGISTER(scheduler_cancel);
+       AST_TEST_REGISTER(scheduler_policy);
+       return AST_MODULE_LOAD_SUCCESS;
+}
+
+static int unload_module(void)
+{
+       AST_TEST_UNREGISTER(scheduler_cancel);
+       AST_TEST_UNREGISTER(scheduler_cleanup);
+       AST_TEST_UNREGISTER(unserialized_scheduler);
+       AST_TEST_UNREGISTER(serialized_scheduler);
+       AST_TEST_UNREGISTER(scheduler_policy);
+       return 0;
+}
+
+AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "res_pjsip scheduler test module");