Genericize the allocation and destruction of taskprocessor listeners.
authorMark Michelson <mmichelson@digium.com>
Fri, 9 Nov 2012 22:28:10 +0000 (22:28 +0000)
committerMark Michelson <mmichelson@digium.com>
Fri, 9 Nov 2012 22:28:10 +0000 (22:28 +0000)
The goal of this is to take the responsibility away from individual
listeners to be sure to properly unref the taskprocessor.

git-svn-id: https://origsvn.digium.com/svn/asterisk/team/mmichelson/threadpool@376121 65c4cc65-6c06-0410-ace0-fbb531ad65f3

include/asterisk/taskprocessor.h
main/astobj2.c
main/taskprocessor.c

index df66f59..a92e1f3 100644 (file)
@@ -63,10 +63,14 @@ enum ast_tps_options {
 struct ast_taskprocessor_listener;
 
 struct ast_taskprocessor_listener_callbacks {
+       /*! Allocate the listener's private data */
+       void *(*alloc)(struct ast_taskprocessor_listener *listener);
        /*! Indicates a task was pushed to the processor */
        void (*task_pushed)(struct ast_taskprocessor_listener *listener, int was_empty);
        /*! Indicates the task processor has become empty */
        void (*emptied)(struct ast_taskprocessor_listener *listener);
+       /*! Destroy the listener's private data */
+       void (*destroy)(void *private_data);
 };
 
 struct ast_taskprocessor_listener {
@@ -75,6 +79,9 @@ struct ast_taskprocessor_listener {
        void *private_data;
 };
 
+struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(struct ast_taskprocessor *tps,
+               struct ast_taskprocessor_listener_callbacks *callbacks);
+
 /*!
  * \brief Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary
  *
index 082dfc0..b36cee8 100644 (file)
@@ -431,6 +431,7 @@ static int internal_ao2_ref(void *user_data, int delta, const char *file, int li
        int ret;
 
        if (obj == NULL) {
+               ast_backtrace();
                ast_assert(0);
                return -1;
        }
index bd94103..4ca01f9 100644 (file)
@@ -131,17 +131,11 @@ static void default_tps_wake_up(struct default_taskprocessor_listener_pvt *pvt,
        ast_cond_signal(&pvt->cond);
 }
 
-static void default_listener_destroy(void *obj)
+static void listener_destroy(void *obj)
 {
        struct ast_taskprocessor_listener *listener = obj;
-       struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
 
-       default_tps_wake_up(pvt, 1);
-       pthread_join(pvt->poll_thread, NULL);
-       pvt->poll_thread = AST_PTHREADT_NULL;
-       ast_mutex_destroy(&pvt->lock);
-       ast_cond_destroy(&pvt->cond);
-       ast_free(pvt);
+       listener->callbacks->destroy(listener->private_data);
 
        ao2_ref(listener->tps, -1);
        listener->tps = NULL;
@@ -173,6 +167,35 @@ static void *tps_processing_function(void *data)
        return NULL;
 }
 
+static void *default_listener_alloc(struct ast_taskprocessor_listener *listener)
+{
+       struct default_taskprocessor_listener_pvt *pvt;
+
+       pvt = ast_calloc(1, sizeof(*pvt));
+       if (!pvt) {
+               return NULL;
+       }
+       ast_cond_init(&pvt->cond, NULL);
+       ast_mutex_init(&pvt->lock);
+       pvt->poll_thread = AST_PTHREADT_NULL;
+       if (ast_pthread_create(&pvt->poll_thread, NULL, tps_processing_function, listener) < 0) {
+               return NULL;
+       }
+       return pvt;
+}
+
+static void default_listener_destroy(void *obj)
+{
+       struct default_taskprocessor_listener_pvt *pvt = obj;
+
+       default_tps_wake_up(pvt, 1);
+       pthread_join(pvt->poll_thread, NULL);
+       pvt->poll_thread = AST_PTHREADT_NULL;
+       ast_mutex_destroy(&pvt->lock);
+       ast_cond_destroy(&pvt->cond);
+       ast_free(pvt);
+}
+
 static void default_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
 {
        struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
@@ -188,8 +211,10 @@ static void default_emptied(struct ast_taskprocessor_listener *listener)
 }
 
 static struct ast_taskprocessor_listener_callbacks default_listener_callbacks = {
+       .alloc = default_listener_alloc,
        .task_pushed = default_task_pushed,
        .emptied = default_emptied,
+       .destroy = default_listener_destroy,
 };
 
 /*! \internal \brief Clean up resources on Asterisk shutdown */
@@ -432,29 +457,22 @@ const char *ast_taskprocessor_name(struct ast_taskprocessor *tps)
        return tps->name;
 }
 
-static struct ast_taskprocessor_listener *default_listener_alloc(void)
+struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(struct ast_taskprocessor *tps,
+               struct ast_taskprocessor_listener_callbacks *callbacks)
 {
-       struct ast_taskprocessor_listener *listener;
-       struct default_taskprocessor_listener_pvt *pvt;
-
-       listener = ao2_alloc(sizeof(*listener), default_listener_destroy);
+       RAII_VAR(struct ast_taskprocessor_listener *, listener,
+                       ao2_alloc(sizeof(*listener), listener_destroy), ao2_cleanup);
+       
        if (!listener) {
                return NULL;
        }
-       pvt = ast_calloc(1, sizeof(*pvt));
-       if (!pvt) {
-               ao2_ref(listener, -1);
-               return NULL;
-       }
-       listener->callbacks = &default_listener_callbacks;
-       listener->private_data = pvt;
-       ast_cond_init(&pvt->cond, NULL);
-       ast_mutex_init(&pvt->lock);
-       pvt->poll_thread = AST_PTHREADT_NULL;
-       if (ast_pthread_create(&pvt->poll_thread, NULL, tps_processing_function, listener) < 0) {
-               ao2_ref(listener, -1);
+       listener->callbacks = callbacks;
+       listener->private_data = listener->callbacks->alloc(listener);
+       if (!listener->private_data) {
                return NULL;
        }
+
+       ao2_ref(listener, +1);
        return listener;
 }
 
@@ -480,9 +498,18 @@ struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_o
                return NULL;
        }
        /* Create a new taskprocessor. Start by creating a default listener */
-       listener = default_listener_alloc();
+       listener = ast_taskprocessor_listener_alloc(p, &default_listener_callbacks);
+       if (!listener) {
+               return NULL;
+       }
 
        p = ast_taskprocessor_create_with_listener(name, listener);
+       if (!p) {
+               ao2_ref(listener, -1);
+               return NULL;
+       }
+
+       /* Unref listener here since the taskprocessor has gained a reference to the listener */
        ao2_ref(listener, -1);
        return p;