Move taskprocessors to use a listener model.
authorMark Michelson <mmichelson@digium.com>
Thu, 8 Nov 2012 23:27:16 +0000 (23:27 +0000)
committerMark Michelson <mmichelson@digium.com>
Thu, 8 Nov 2012 23:27:16 +0000 (23:27 +0000)
Taskprocessors are now divided into two units: the task queue
and their listeners.

When a task is added to the queue, the listener is notified and
can take whatever action is desired. This means that taskprocessors
are no longer confined to having their tasks executed within a
single thread.

A default taskprocessor listener has been added that mirrors the
old taskprocessor behavior.

I've tested it by running Asterisk and placing calls. It appears
to work as expected. I'm going to do some cleaning up first and
then write some unit tests to be sure everything works as expected.

git-svn-id: https://origsvn.digium.com/svn/asterisk/team/mmichelson/threadpool@376118 65c4cc65-6c06-0410-ace0-fbb531ad65f3

include/asterisk/taskprocessor.h
main/taskprocessor.c

index 0f1876e..df66f59 100644 (file)
@@ -60,6 +60,21 @@ enum ast_tps_options {
        TPS_REF_IF_EXISTS = (1 << 0),
 };
 
+struct ast_taskprocessor_listener;
+
+struct ast_taskprocessor_listener_callbacks {
+       /*! Indicates a task was pushed to the processor */
+       void (*task_pushed)(struct ast_taskprocessor_listener *listener, int was_empty);
+       /*! Indicates the task processor has become empty */
+       void (*emptied)(struct ast_taskprocessor_listener *listener);
+};
+
+struct ast_taskprocessor_listener {
+       struct ast_taskprocessor_listener_callbacks *callbacks;
+       struct ast_taskprocessor *tps;
+       void *private_data;
+};
+
 /*!
  * \brief Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary
  *
@@ -75,6 +90,16 @@ enum ast_tps_options {
 struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_options create);
 
 /*!
+ * \brief Create a taskprocessor with a custom listener
+ *
+ * \param name The name of the taskprocessor to create
+ * \param listener The listener for operations on this taskprocessor
+ * \retval NULL Failure
+ * \reval non-NULL success
+ */
+struct ast_taskprocessor *ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener);
+
+/*!
  * \brief Unreference the specified taskprocessor and its reference count will decrement.
  *
  * Taskprocessors use astobj2 and will unlink from the taskprocessor singleton container and destroy
@@ -97,6 +122,14 @@ void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps);
 int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap);
 
 /*!
+ * \brief Pop a task off the taskprocessor and execute it.
+ * \param tps The taskprocessor from which to execute.
+ * \retval 0 There is no further work to be done.
+ * \retval 1 Tasks still remain in the taskprocessor queue.
+ */
+int ast_taskprocessor_execute(struct ast_taskprocessor *tps);
+
+/*!
  * \brief Return the name of the taskprocessor singleton
  * \since 1.6.1
  */
index 912f891..b433ca9 100644 (file)
@@ -83,6 +83,7 @@ struct ast_taskprocessor {
        AST_LIST_HEAD_NOLOCK(tps_queue, tps_task) tps_queue;
        /*! \brief Taskprocessor singleton list entry */
        AST_LIST_ENTRY(ast_taskprocessor) list;
+       struct ast_taskprocessor_listener *listener;
 };
 #define TPS_MAX_BUCKETS 7
 /*! \brief tps_singletons is the astobj2 container for taskprocessor singletons */
@@ -122,6 +123,83 @@ static struct ast_cli_entry taskprocessor_clis[] = {
        AST_CLI_DEFINE(cli_tps_report, "List instantiated task processors and statistics"),
 };
 
+struct default_taskprocessor_listener_pvt {
+       pthread_t poll_thread;
+       ast_mutex_t lock;
+       ast_cond_t cond;
+       int wake_up;
+       int dead;
+};
+
+static void default_tps_wake_up(struct default_taskprocessor_listener_pvt *pvt, int should_die)
+{
+       SCOPED_MUTEX(lock, &pvt->lock); 
+       pvt->wake_up = 1;
+       pvt->dead = should_die;
+       ast_cond_signal(&pvt->cond);
+}
+
+static void default_listener_destroy(void *obj)
+{
+       struct ast_taskprocessor_listener *listener = obj;
+       struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
+
+       default_tps_wake_up(pvt, 1);
+       pthread_join(pvt->poll_thread, NULL);
+       pvt->poll_thread = AST_PTHREADT_NULL;
+       ast_mutex_destroy(&pvt->lock);
+       ast_cond_destroy(&pvt->cond);
+       ast_free(pvt);
+
+       ao2_ref(listener->tps, -1);
+       listener->tps = NULL;
+}
+
+static int default_tps_idle(struct default_taskprocessor_listener_pvt *pvt)
+{
+       SCOPED_MUTEX(lock, &pvt->lock);
+       while (!pvt->wake_up) {
+               ast_cond_wait(&pvt->cond, lock);
+       }
+       pvt->wake_up = 0;
+       return pvt->dead;
+}
+
+/* this is the task processing worker function */
+static void *tps_processing_function(void *data)
+{
+       struct ast_taskprocessor_listener *listener = data;
+       struct ast_taskprocessor *tps = listener->tps;
+       struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
+       int dead = 0;
+
+       while (!dead) {
+               if (!ast_taskprocessor_execute(tps)) {
+                       dead = default_tps_idle(pvt);
+               }
+       }
+       return NULL;
+}
+
+static void default_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
+{
+       struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
+
+       if (was_empty) {
+               default_tps_wake_up(pvt, 0);
+       }
+}
+
+static void default_emptied(struct ast_taskprocessor_listener *listener)
+{
+       /* No-op */
+}
+
+static struct ast_taskprocessor_listener_callbacks default_listener_callbacks = {
+       .task_pushed = default_task_pushed,
+       .emptied = default_emptied,
+};
+
 /*! \internal \brief Clean up resources on Asterisk shutdown */
 static void tps_shutdown(void)
 {
@@ -286,75 +364,22 @@ static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_arg
        return CLI_SUCCESS;
 }
 
-/* this is the task processing worker function */
-static void *tps_processing_function(void *data)
-{
-       struct ast_taskprocessor *i = data;
-       struct tps_task *t;
-       int size;
-
-       if (!i) {
-               ast_log(LOG_ERROR, "cannot start thread_function loop without a ast_taskprocessor structure.\n");
-               return NULL;
-       }
-
-       while (i->poll_thread_run) {
-               ast_mutex_lock(&i->taskprocessor_lock);
-               if (!i->poll_thread_run) {
-                       ast_mutex_unlock(&i->taskprocessor_lock);
-                       break;
-               }
-               if (!(size = tps_taskprocessor_depth(i))) {
-                       ast_cond_wait(&i->poll_cond, &i->taskprocessor_lock);
-                       if (!i->poll_thread_run) {
-                               ast_mutex_unlock(&i->taskprocessor_lock);
-                               break;
-                       }
-               }
-               ast_mutex_unlock(&i->taskprocessor_lock);
-               /* stuff is in the queue */
-               if (!(t = tps_taskprocessor_pop(i))) {
-                       ast_log(LOG_ERROR, "Wtf?? %d tasks in the queue, but we're popping blanks!\n", size);
-                       continue;
-               }
-               if (!t->execute) {
-                       ast_log(LOG_WARNING, "Task is missing a function to execute!\n");
-                       tps_task_free(t);
-                       continue;
-               }
-               t->execute(t->datap);
-
-               ast_mutex_lock(&i->taskprocessor_lock);
-               if (i->stats) {
-                       i->stats->_tasks_processed_count++;
-                       if (size > i->stats->max_qsize) {
-                               i->stats->max_qsize = size;
-                       }
-               }
-               ast_mutex_unlock(&i->taskprocessor_lock);
-
-               tps_task_free(t);
-       }
-       while ((t = tps_taskprocessor_pop(i))) {
-               tps_task_free(t);
-       }
-       return NULL;
-}
-
 /* hash callback for astobj2 */
 static int tps_hash_cb(const void *obj, const int flags)
 {
        const struct ast_taskprocessor *tps = obj;
+       const char *name = flags & OBJ_KEY ? obj : tps->name;
 
-       return ast_str_case_hash(tps->name);
+       return ast_str_case_hash(name);
 }
 
 /* compare callback for astobj2 */
 static int tps_cmp_cb(void *obj, void *arg, int flags)
 {
        struct ast_taskprocessor *lhs = obj, *rhs = arg;
+       const char *rhsname = flags & OBJ_KEY ? arg : rhs->name;
 
-       return !strcasecmp(lhs->name, rhs->name) ? CMP_MATCH | CMP_STOP : 0;
+       return !strcasecmp(lhs->name, rhsname) ? CMP_MATCH | CMP_STOP : 0;
 }
 
 /* destroy the taskprocessor */
@@ -368,20 +393,21 @@ static void tps_taskprocessor_destroy(void *tps)
        }
        ast_debug(1, "destroying taskprocessor '%s'\n", t->name);
        /* kill it */
-       ast_mutex_lock(&t->taskprocessor_lock);
-       t->poll_thread_run = 0;
-       ast_cond_signal(&t->poll_cond);
-       ast_mutex_unlock(&t->taskprocessor_lock);
-       pthread_join(t->poll_thread, NULL);
-       t->poll_thread = AST_PTHREADT_NULL;
        ast_mutex_destroy(&t->taskprocessor_lock);
-       ast_cond_destroy(&t->poll_cond);
        /* free it */
        if (t->stats) {
                ast_free(t->stats);
                t->stats = NULL;
        }
        ast_free((char *) t->name);
+       if (t->listener) {
+               /* This code should not be reached since the listener
+                * should have been destroyed before the taskprocessor could
+                * be destroyed
+                */
+               ao2_ref(t->listener, -1);
+               t->listener = NULL;
+       }
 }
 
 /* pop the front task and return it */
@@ -416,80 +442,120 @@ const char *ast_taskprocessor_name(struct ast_taskprocessor *tps)
        return tps->name;
 }
 
+static struct ast_taskprocessor_listener *default_listener_alloc(void)
+{
+       struct ast_taskprocessor_listener *listener;
+       struct default_taskprocessor_listener_pvt *pvt;
+
+       listener = ao2_alloc(sizeof(*listener), default_listener_destroy);
+       if (!listener) {
+               return NULL;
+       }
+       pvt = ast_calloc(1, sizeof(*pvt));
+       if (!pvt) {
+               ao2_ref(listener, -1);
+               return NULL;
+       }
+       listener->callbacks = &default_listener_callbacks;
+       listener->private_data = pvt;
+       ast_cond_init(&pvt->cond, NULL);
+       ast_mutex_init(&pvt->lock);
+       pvt->poll_thread = AST_PTHREADT_NULL;
+       if (ast_pthread_create(&pvt->poll_thread, NULL, tps_processing_function, listener) < 0) {
+               ao2_ref(listener, -1);
+               return NULL;
+       }
+       return listener;
+}
+
 /* Provide a reference to a taskprocessor.  Create the taskprocessor if necessary, but don't
  * create the taskprocessor if we were told via ast_tps_options to return a reference only
  * if it already exists */
 struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_options create)
 {
-       struct ast_taskprocessor *p, tmp_tps = {
-               .name = name,
-       };
+       struct ast_taskprocessor *p;
+       struct ast_taskprocessor_listener *listener;
 
        if (ast_strlen_zero(name)) {
                ast_log(LOG_ERROR, "requesting a nameless taskprocessor!!!\n");
                return NULL;
        }
-       ao2_lock(tps_singletons);
-       p = ao2_find(tps_singletons, &tmp_tps, OBJ_POINTER);
+       p = ao2_find(tps_singletons, name, OBJ_KEY);
        if (p) {
                ao2_unlock(tps_singletons);
                return p;
        }
        if (create & TPS_REF_IF_EXISTS) {
                /* calling function does not want a new taskprocessor to be created if it doesn't already exist */
-               ao2_unlock(tps_singletons);
                return NULL;
        }
-       /* create a new taskprocessor */
-       if (!(p = ao2_alloc(sizeof(*p), tps_taskprocessor_destroy))) {
-               ao2_unlock(tps_singletons);
+       /* Create a new taskprocessor. Start by creating a default listener */
+       listener = default_listener_alloc();
+
+       p = ast_taskprocessor_create_with_listener(name, listener);
+       ao2_ref(listener, -1);
+       return p;
+
+}
+
+struct ast_taskprocessor *ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener)
+{
+       RAII_VAR(struct ast_taskprocessor *, p,
+                       ao2_alloc(sizeof(*p), tps_taskprocessor_destroy),
+                       ao2_cleanup);
+
+       if (!p) {
                ast_log(LOG_WARNING, "failed to create taskprocessor '%s'\n", name);
                return NULL;
        }
 
-       ast_cond_init(&p->poll_cond, NULL);
-       ast_mutex_init(&p->taskprocessor_lock);
-
        if (!(p->stats = ast_calloc(1, sizeof(*p->stats)))) {
-               ao2_unlock(tps_singletons);
                ast_log(LOG_WARNING, "failed to create taskprocessor stats for '%s'\n", name);
-               ao2_ref(p, -1);
                return NULL;
        }
        if (!(p->name = ast_strdup(name))) {
-               ao2_unlock(tps_singletons);
-               ao2_ref(p, -1);
-               return NULL;
-       }
-       p->poll_thread_run = 1;
-       p->poll_thread = AST_PTHREADT_NULL;
-       if (ast_pthread_create(&p->poll_thread, NULL, tps_processing_function, p) < 0) {
-               ao2_unlock(tps_singletons);
-               ast_log(LOG_ERROR, "Taskprocessor '%s' failed to create the processing thread.\n", p->name);
                ao2_ref(p, -1);
                return NULL;
        }
+
+       ao2_ref(listener, +1);
+       p->listener = listener;
+
+       ao2_ref(p, +1);
+       listener->tps = p;
+
        if (!(ao2_link(tps_singletons, p))) {
-               ao2_unlock(tps_singletons);
                ast_log(LOG_ERROR, "Failed to add taskprocessor '%s' to container\n", p->name);
-               ao2_ref(p, -1);
                return NULL;
        }
-       ao2_unlock(tps_singletons);
+
+       /* RAII_VAR will decrement the refcount at the end of the function.
+        * Since we want to pass back a reference to p, we bump the refcount
+        */
+       ao2_ref(p, +1);
        return p;
 }
 
 /* decrement the taskprocessor reference count and unlink from the container if necessary */
 void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
 {
-       if (tps) {
-               ao2_lock(tps_singletons);
-               ao2_unlink(tps_singletons, tps);
-               if (ao2_ref(tps, -1) > 1) {
-                       ao2_link(tps_singletons, tps);
-               }
-               ao2_unlock(tps_singletons);
+       struct ast_taskprocessor_listener *listener;
+       if (!tps) {
+               return NULL;
        }
+
+       if (ao2_ref(tps, -1) > 3) {
+               return NULL;
+       }
+       /* If we're down to 3 references, then those must be:
+        * 1. The reference we just got rid of
+        * 2. The container
+        * 3. The listener
+        */
+       ao2_unlink(tps_singletons, tps);
+       listener = tps->listener;
+       tps->listener = NULL;
+       ao2_ref(listener, -1);
        return NULL;
 }
 
@@ -497,6 +563,7 @@ void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
 int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap)
 {
        struct tps_task *t;
+       int previous_size;
 
        if (!tps || !task_exe) {
                ast_log(LOG_ERROR, "%s is missing!!\n", (tps) ? "task callback" : "taskprocessor");
@@ -508,9 +575,38 @@ int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *
        }
        ast_mutex_lock(&tps->taskprocessor_lock);
        AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list);
-       tps->tps_queue_size++;
-       ast_cond_signal(&tps->poll_cond);
+       previous_size = tps->tps_queue_size++;
        ast_mutex_unlock(&tps->taskprocessor_lock);
+       tps->listener->callbacks->task_pushed(tps->listener, previous_size ? 0 : 1);
        return 0;
 }
 
+int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
+{
+       struct tps_task *t;
+       int size;
+
+       if (!(t = tps_taskprocessor_pop(tps))) {
+               return 0;
+       }
+
+       t->execute(t->datap);
+
+       tps_task_free(t);
+
+       ast_mutex_lock(&tps->taskprocessor_lock);
+       size = tps_taskprocessor_depth(tps);
+       if (tps->stats) {
+               tps->stats->_tasks_processed_count++;
+               if (size > tps->stats->max_qsize) {
+                       tps->stats->max_qsize = size;
+               }
+       }
+       ast_mutex_unlock(&tps->taskprocessor_lock);
+
+       if (size == 0) {
+               tps->listener->callbacks->emptied(tps->listener);
+               return 0;
+       }
+       return 1;
+}