Remove alloc and destroy callbacks from the taskprocessor.
authorMark Michelson <mmichelson@digium.com>
Tue, 15 Jan 2013 18:40:36 +0000 (18:40 +0000)
committerMark Michelson <mmichelson@digium.com>
Tue, 15 Jan 2013 18:40:36 +0000 (18:40 +0000)
Now user data is allocated by the creator of the taskprocessor
listener and that user data is passed into ast_taskprocessor_listener_alloc().
Similarly, freeing of the user data is left up to the user himself. He can
free the data when the taskprocessor shuts down, or he can choose to hold
onto it if it makes sense to do so.

This, unsurprisingly, makes threadpool allocation a LOT cleaner now.

git-svn-id: https://origsvn.digium.com/svn/asterisk/team/mmichelson/threadpool@379120 65c4cc65-6c06-0410-ace0-fbb531ad65f3

include/asterisk/taskprocessor.h
main/taskprocessor.c
main/threadpool.c
tests/test_taskprocessor.c

index 7720547..c64a8f9 100644 (file)
@@ -75,17 +75,6 @@ struct ast_taskprocessor_listener;
 
 struct ast_taskprocessor_listener_callbacks {
        /*!
-        * \brief Allocate the listener's private data
-        *
-        * This is called during taskprocesor creation.
-        * It is not necessary to assign the private data to the listener.
-        *
-        * \param listener The listener to which the private data belongs
-        * \retval NULL Error while attempting to initialize private data
-        * \retval non-NULL Allocated private data
-        */
-       void *(*alloc)(struct ast_taskprocessor_listener *listener);
-       /*!
         * \brief The taskprocessor has started completely
         *
         * This indicates that the taskprocessor is fully set up and the listener
@@ -111,7 +100,8 @@ struct ast_taskprocessor_listener_callbacks {
         * \brief Indicates the taskprocessor wishes to die.
         *
         * All operations on the task processor must to be stopped in
-        * this callback.
+        * this callback. This is an opportune time to free the listener's
+        * user data if it is not going to be used anywhere else.
         *
         * After this callback returns, it is NOT safe to operate on the
         * listener's reference to the taskprocessor.
@@ -119,15 +109,6 @@ struct ast_taskprocessor_listener_callbacks {
         * \param listener The listener
         */
        void (*shutdown)(struct ast_taskprocessor_listener *listener);
-       /*!
-        * \brief Destroy the listener's private data
-        *
-        * It is required that you free the private data in this callback
-        * in addition to the private data's individual fields.
-        *
-        * \param private_data The listener's private data
-        */
-       void (*destroy)(void *private_data);
 };
 
 /*!
@@ -146,7 +127,7 @@ struct ast_taskprocessor_listener {
        /*! The taskprocessor that the listener is listening to */
        struct ast_taskprocessor *tps;
        /*! Data private to the listener */
-       void *private_data;
+       void *user_data;
 };
 
 /*!
@@ -158,10 +139,11 @@ struct ast_taskprocessor_listener {
  * callbacks.
  *
  * \param callbacks The callbacks to assign to the listener
+ * \param user_data The user data for the listener
  * \retval NULL Failure
  * \retval non-NULL The newly allocated taskprocessor listener
  */
-struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks);
+struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data);
 
 /*!
  * \brief Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary
index 95bff72..911eb76 100644 (file)
@@ -151,7 +151,7 @@ static void *tps_processing_function(void *data)
 {
        struct ast_taskprocessor_listener *listener = data;
        struct ast_taskprocessor *tps = listener->tps;
-       struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
+       struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
        int dead = 0;
 
        while (!dead) {
@@ -162,23 +162,9 @@ static void *tps_processing_function(void *data)
        return NULL;
 }
 
-static void *default_listener_alloc(struct ast_taskprocessor_listener *listener)
-{
-       struct default_taskprocessor_listener_pvt *pvt;
-
-       pvt = ast_calloc(1, sizeof(*pvt));
-       if (!pvt) {
-               return NULL;
-       }
-       ast_cond_init(&pvt->cond, NULL);
-       ast_mutex_init(&pvt->lock);
-       pvt->poll_thread = AST_PTHREADT_NULL;
-       return pvt;
-}
-
 static int default_listener_start(struct ast_taskprocessor_listener *listener)
 {
-       struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
+       struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
 
        if (ast_pthread_create(&pvt->poll_thread, NULL, tps_processing_function, listener)) {
                return -1;
@@ -189,41 +175,33 @@ static int default_listener_start(struct ast_taskprocessor_listener *listener)
 
 static void default_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
 {
-       struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
+       struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
 
        if (was_empty) {
                default_tps_wake_up(pvt, 0);
        }
 }
 
-static void default_emptied(struct ast_taskprocessor_listener *listener)
+static void default_listener_pvt_destroy(struct default_taskprocessor_listener_pvt *pvt)
 {
-       /* No-op */
+       ast_mutex_destroy(&pvt->lock);
+       ast_cond_destroy(&pvt->cond);
+       ast_free(pvt);
 }
 
 static void default_listener_shutdown(struct ast_taskprocessor_listener *listener)
 {
-       struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
+       struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
        default_tps_wake_up(pvt, 1);
        pthread_join(pvt->poll_thread, NULL);
        pvt->poll_thread = AST_PTHREADT_NULL;
-}
-
-static void default_listener_destroy(void *obj)
-{
-       struct default_taskprocessor_listener_pvt *pvt = obj;
-       ast_mutex_destroy(&pvt->lock);
-       ast_cond_destroy(&pvt->cond);
-       ast_free(pvt);
+       default_listener_pvt_destroy(pvt);
 }
 
 static const struct ast_taskprocessor_listener_callbacks default_listener_callbacks = {
-       .alloc = default_listener_alloc,
        .start = default_listener_start,
        .task_pushed = default_task_pushed,
-       .emptied = default_emptied,
        .shutdown = default_listener_shutdown,
-       .destroy = default_listener_destroy,
 };
 
 /*!
@@ -474,33 +452,41 @@ const char *ast_taskprocessor_name(struct ast_taskprocessor *tps)
        return tps->name;
 }
 
-static void listener_destroy(void *obj)
-{
-       struct ast_taskprocessor_listener *listener = obj;
-
-       listener->callbacks->destroy(listener->private_data);
-}
-
 static void listener_shutdown(struct ast_taskprocessor_listener *listener)
 {
        listener->callbacks->shutdown(listener);
        ao2_ref(listener->tps, -1);
 }
 
-struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks)
+struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
 {
        RAII_VAR(struct ast_taskprocessor_listener *, listener,
-                       ao2_alloc(sizeof(*listener), listener_destroy), ao2_cleanup);
+                       ao2_alloc(sizeof(*listener), NULL), ao2_cleanup);
 
        if (!listener) {
                return NULL;
        }
        listener->callbacks = callbacks;
+       listener->user_data = user_data;
 
        ao2_ref(listener, +1);
        return listener;
 }
 
+static void *default_listener_pvt_alloc(void)
+{
+       struct default_taskprocessor_listener_pvt *pvt;
+
+       pvt = ast_calloc(1, sizeof(*pvt));
+       if (!pvt) {
+               return NULL;
+       }
+       ast_cond_init(&pvt->cond, NULL);
+       ast_mutex_init(&pvt->lock);
+       pvt->poll_thread = AST_PTHREADT_NULL;
+       return pvt;
+}
+
 /* Provide a reference to a taskprocessor.  Create the taskprocessor if necessary, but don't
  * create the taskprocessor if we were told via ast_tps_options to return a reference only
  * if it already exists */
@@ -508,6 +494,7 @@ struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_o
 {
        struct ast_taskprocessor *p;
        struct ast_taskprocessor_listener *listener;
+       struct default_taskprocessor_listener_pvt *pvt;
 
        if (ast_strlen_zero(name)) {
                ast_log(LOG_ERROR, "requesting a nameless taskprocessor!!!\n");
@@ -522,13 +509,19 @@ struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_o
                return NULL;
        }
        /* Create a new taskprocessor. Start by creating a default listener */
-       listener = ast_taskprocessor_listener_alloc(&default_listener_callbacks);
+       pvt = default_listener_pvt_alloc();
+       if (!pvt) {
+               return NULL;
+       }
+       listener = ast_taskprocessor_listener_alloc(&default_listener_callbacks, pvt);
        if (!listener) {
+               default_listener_pvt_destroy(pvt);
                return NULL;
        }
 
        p = ast_taskprocessor_create_with_listener(name, listener);
        if (!p) {
+               default_listener_pvt_destroy(pvt);
                ao2_ref(listener, -1);
                return NULL;
        }
@@ -565,14 +558,6 @@ struct ast_taskprocessor *ast_taskprocessor_create_with_listener(const char *nam
        ao2_ref(p, +1);
        listener->tps = p;
 
-       /* Allocation of private data must come after setting taskprocessor parameters
-        * so that listeners who rely on taskprocessor data will have access to it.
-        */
-       listener->private_data = listener->callbacks->alloc(listener);
-       if (!listener->private_data) {
-               return NULL;
-       }
-
        if (!(ao2_link(tps_singletons, p))) {
                ast_log(LOG_ERROR, "Failed to add taskprocessor '%s' to container\n", p->name);
                return NULL;
@@ -656,7 +641,7 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
        }
        ao2_unlock(tps);
 
-       if (size == 0) {
+       if (size == 0 && tps->listener->callbacks->emptied) {
                tps->listener->callbacks->emptied(tps->listener);
                return 0;
        }
index 05a5f8d..db38daa 100644 (file)
@@ -365,23 +365,25 @@ static void threadpool_destructor(void *obj)
  * is because the threadpool exists as the private data on a taskprocessor
  * listener.
  *
- * \param listener The taskprocessor listener where the threadpool will live.
+ * \param name The name of the threadpool.
+ * \param options The options the threadpool uses.
  * \retval NULL Could not initialize threadpool properly
  * \retval non-NULL The newly-allocated threadpool
  */
-static void *threadpool_alloc(struct ast_taskprocessor_listener *listener)
+static void *threadpool_alloc(const char *name, const struct ast_threadpool_options *options)
 {
        RAII_VAR(struct ast_threadpool *, pool,
                        ao2_alloc(sizeof(*pool), threadpool_destructor), ao2_cleanup);
-       struct ast_str *name = ast_str_create(64);
+       struct ast_str *control_tps_name = ast_str_create(64);
 
-       if (!name) {
+       if (!control_tps_name) {
                return NULL;
        }
 
-       ast_str_set(&name, 0, "%s-control", ast_taskprocessor_name(listener->tps));
+       ast_str_set(&control_tps_name, 0, "%s-control", name);
 
-       pool->control_tps = ast_taskprocessor_get(ast_str_buffer(name), TPS_REF_DEFAULT);
+       pool->control_tps = ast_taskprocessor_get(ast_str_buffer(control_tps_name), TPS_REF_DEFAULT);
+       ast_free(control_tps_name);
        if (!pool->control_tps) {
                return NULL;
        }
@@ -397,6 +399,7 @@ static void *threadpool_alloc(struct ast_taskprocessor_listener *listener)
        if (!pool->zombie_threads) {
                return NULL;
        }
+       pool->options = *options;
 
        ao2_ref(pool, +1);
        return pool;
@@ -545,7 +548,7 @@ static int queued_task_pushed(void *data)
 static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener,
                int was_empty)
 {
-       struct ast_threadpool *pool = listener->private_data;
+       struct ast_threadpool *pool = listener->user_data;
        struct task_pushed_data *tpd;
        SCOPED_AO2LOCK(lock, pool);
 
@@ -585,7 +588,7 @@ static int queued_emptied(void *data)
  */
 static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener)
 {
-       struct ast_threadpool *pool = listener->private_data;
+       struct ast_threadpool *pool = listener->user_data;
        SCOPED_AO2LOCK(lock, pool);
 
        if (pool->shutting_down) {
@@ -608,26 +611,11 @@ static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener)
  */
 static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener)
 {
-       struct ast_threadpool *pool = listener->private_data;
+       struct ast_threadpool *pool = listener->user_data;
 
        ao2_cleanup(pool->active_threads);
        ao2_cleanup(pool->idle_threads);
        ao2_cleanup(pool->zombie_threads);
-}
-
-/*!
- * \brief Taskprocessor listener destroy callback
- *
- * Since the threadpool is an ao2 object, all that is necessary is to
- * decrease the refcount. Since the control taskprocessor should already
- * be destroyed by this point, this should be the final reference to the
- * threadpool.
- *
- * \param private_data The threadpool to destroy
- */
-static void threadpool_destroy(void *private_data)
-{
-       struct ast_threadpool *pool = private_data;
        ao2_cleanup(pool);
 }
 
@@ -635,12 +623,10 @@ static void threadpool_destroy(void *private_data)
  * \brief Table of taskprocessor listener callbacks for threadpool's main taskprocessor
  */
 static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks = {
-       .alloc = threadpool_alloc,
        .start = threadpool_tps_start,
        .task_pushed = threadpool_tps_task_pushed,
        .emptied = threadpool_tps_emptied,
        .shutdown = threadpool_tps_shutdown,
-       .destroy = threadpool_destroy,
 };
 
 /*!
@@ -854,12 +840,15 @@ struct ast_threadpool *ast_threadpool_create(const char *name,
                struct ast_threadpool_listener *listener,
                int initial_size, const struct ast_threadpool_options *options)
 {
-       struct ast_threadpool *pool;
        struct ast_taskprocessor *tps;
-       RAII_VAR(struct ast_taskprocessor_listener *, tps_listener,
-                       ast_taskprocessor_listener_alloc(&threadpool_tps_listener_callbacks),
-                       ao2_cleanup);
+       RAII_VAR(struct ast_taskprocessor_listener *, tps_listener, NULL, ao2_cleanup);
+       RAII_VAR(struct ast_threadpool *, pool, threadpool_alloc(name, options), ao2_cleanup);
 
+       if (!pool) {
+               return NULL;
+       }
+       
+       tps_listener = ast_taskprocessor_listener_alloc(&threadpool_tps_listener_callbacks, pool);
        if (!tps_listener) {
                return NULL;
        }
@@ -870,19 +859,17 @@ struct ast_threadpool *ast_threadpool_create(const char *name,
        }
 
        tps = ast_taskprocessor_create_with_listener(name, tps_listener);
-
        if (!tps) {
                return NULL;
        }
 
-       pool = tps_listener->private_data;
        pool->tps = tps;
        if (listener) {
                ao2_ref(listener, +1);
                pool->listener = listener;
        }
-       pool->options = *options;
        ast_threadpool_set_size(pool, initial_size);
+       ao2_ref(pool, +1);
        return pool;
 }
 
index 377a2b3..c04eeeb 100644 (file)
@@ -260,7 +260,7 @@ struct test_listener_pvt {
 /*!
  * \brief test taskprocessor listener's alloc callback
  */
-static void *test_alloc(struct ast_taskprocessor_listener *listener)
+static void *test_listener_pvt_alloc(void)
 {
        struct test_listener_pvt *pvt;
 
@@ -283,7 +283,7 @@ static int test_start(struct ast_taskprocessor_listener *listener)
  */
 static void test_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
 {
-       struct test_listener_pvt *pvt = listener->private_data;
+       struct test_listener_pvt *pvt = listener->user_data;
        ++pvt->num_pushed;
        if (was_empty) {
                ++pvt->num_was_empty;
@@ -295,7 +295,7 @@ static void test_task_pushed(struct ast_taskprocessor_listener *listener, int wa
  */
 static void test_emptied(struct ast_taskprocessor_listener *listener)
 {
-       struct test_listener_pvt *pvt = listener->private_data;
+       struct test_listener_pvt *pvt = listener->user_data;
        ++pvt->num_emptied;
 }
 
@@ -304,26 +304,15 @@ static void test_emptied(struct ast_taskprocessor_listener *listener)
  */
 static void test_shutdown(struct ast_taskprocessor_listener *listener)
 {
-       struct test_listener_pvt *pvt = listener->private_data;
+       struct test_listener_pvt *pvt = listener->user_data;
        pvt->shutdown = 1;
 }
 
-/*!
- * \brief test taskprocessor listener's destroy callback.
- */
-static void test_destroy(void *private_data)
-{
-       struct test_listener_pvt *pvt = private_data;
-       ast_free(pvt);
-}
-
 static const struct ast_taskprocessor_listener_callbacks test_callbacks = {
-       .alloc = test_alloc,
        .start = test_start,
        .task_pushed = test_task_pushed,
        .emptied = test_emptied,
        .shutdown = test_shutdown,
-       .destroy = test_destroy,
 };
 
 /*!
@@ -381,9 +370,9 @@ static int check_stats(struct ast_test *test, const struct test_listener_pvt *pv
  */
 AST_TEST_DEFINE(taskprocessor_listener)
 {
-       struct ast_taskprocessor *tps;
-       struct ast_taskprocessor_listener *listener;
-       struct test_listener_pvt *pvt;
+       struct ast_taskprocessor *tps = NULL;
+       struct ast_taskprocessor_listener *listener = NULL;
+       struct test_listener_pvt *pvt = NULL;
        enum ast_test_result_state res = AST_TEST_PASS;
 
        switch (cmd) {
@@ -398,10 +387,17 @@ AST_TEST_DEFINE(taskprocessor_listener)
                break;
        }
 
-       listener = ast_taskprocessor_listener_alloc(&test_callbacks);
+       pvt = test_listener_pvt_alloc();
+       if (!pvt) {
+               ast_test_status_update(test, "Unable to allocate test taskprocessor listener user data\n");
+               return AST_TEST_FAIL;
+       }
+
+       listener = ast_taskprocessor_listener_alloc(&test_callbacks, pvt);
        if (!listener) {
                ast_test_status_update(test, "Unable to allocate test taskprocessor listener\n");
-               return AST_TEST_FAIL;
+               res = AST_TEST_FAIL;
+               goto test_exit;
        }
 
        tps = ast_taskprocessor_create_with_listener("test_listener", listener);
@@ -411,8 +407,6 @@ AST_TEST_DEFINE(taskprocessor_listener)
                goto test_exit;
        }
 
-       pvt = listener->private_data;
-
        ast_taskprocessor_push(tps, listener_test_task, NULL);
 
        if (check_stats(test, pvt, 1, 0, 1) < 0) {
@@ -449,9 +443,10 @@ AST_TEST_DEFINE(taskprocessor_listener)
        }
 
 test_exit:
-       ao2_ref(listener, -1);
+       ao2_cleanup(listener);
        /* This is safe even if tps is NULL */
        ast_taskprocessor_unreference(tps);
+       ast_free(pvt);
        return res;
 }