threadpool, res_pjsip: Add serializer group shutdown API calls.
authorRichard Mudgett <rmudgett@digium.com>
Fri, 19 Jun 2015 21:16:17 +0000 (16:16 -0500)
committerRichard Mudgett <rmudgett@digium.com>
Thu, 25 Jun 2015 19:33:44 +0000 (14:33 -0500)
A module trying to unload needs to wait for all serializers it creates and
uses to complete processing before unloading.

ASTERISK-24907
Reported by: Kevin Harwell

Change-Id: I8c80b90f2f82754e8dbb02ddf3c9121e5e966059

include/asterisk/res_pjsip.h
include/asterisk/threadpool.h
main/threadpool.c
res/res_pjsip.c

index 747396c..9475d6d 100644 (file)
@@ -1102,6 +1102,23 @@ struct ast_sip_endpoint *ast_sip_get_artificial_endpoint(void);
  */
 struct ast_taskprocessor *ast_sip_create_serializer(void);
 
+struct ast_serializer_shutdown_group;
+
+/*!
+ * \brief Create a new serializer for SIP tasks
+ * \since 13.5.0
+ *
+ * See \ref ast_threadpool_serializer for more information on serializers.
+ * SIP creates serializers so that tasks operating on similar data will run
+ * in sequence.
+ *
+ * \param shutdown_group Group shutdown controller. (NULL if no group association)
+ *
+ * \retval NULL Failure
+ * \retval non-NULL Newly-created serializer
+ */
+struct ast_taskprocessor *ast_sip_create_serializer_group(struct ast_serializer_shutdown_group *shutdown_group);
+
 /*!
  * \brief Set a serializer on a SIP dialog so requests and responses are automatically serialized
  *
index 1c67058..75ce0e4 100644 (file)
@@ -195,6 +195,28 @@ int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), vo
  */
 void ast_threadpool_shutdown(struct ast_threadpool *pool);
 
+struct ast_serializer_shutdown_group;
+
+/*!
+ * \brief Create a serializer group shutdown control object.
+ * \since 13.5.0
+ *
+ * \return ao2 object to control shutdown of a serializer group.
+ */
+struct ast_serializer_shutdown_group *ast_serializer_shutdown_group_alloc(void);
+
+/*!
+ * \brief Wait for the serializers in the group to shutdown with timeout.
+ * \since 13.5.0
+ *
+ * \param shutdown_group Group shutdown controller. (Returns 0 immediately if NULL)
+ * \param timeout Number of seconds to wait for the serializers in the group to shutdown.
+ *     Zero if the timeout is disabled.
+ *
+ * \return Number of seriaizers that did not get shutdown within the timeout.
+ */
+int ast_serializer_shutdown_group_join(struct ast_serializer_shutdown_group *shutdown_group, int timeout);
+
 /*!
  * \brief Get the threadpool serializer currently associated with this thread.
  * \since 14.0.0
@@ -234,9 +256,40 @@ struct ast_taskprocessor *ast_threadpool_serializer_get_current(void);
  *
  * \param name Name of the serializer. (must be unique)
  * \param pool \ref ast_threadpool for execution.
+ *
  * \return \ref ast_taskprocessor for enqueuing work.
  * \return \c NULL on error.
  */
 struct ast_taskprocessor *ast_threadpool_serializer(const char *name, struct ast_threadpool *pool);
 
+/*!
+ * \brief Serialized execution of tasks within a \ref ast_threadpool.
+ * \since 13.5.0
+ *
+ * A \ref ast_taskprocessor with the same contract as a default taskprocessor
+ * (tasks execute serially) except instead of executing out of a dedicated
+ * thread, execution occurs in a thread from a \ref ast_threadpool. Think of it
+ * as a lightweight thread.
+ *
+ * While it guarantees that each task will complete before executing the next,
+ * there is no guarantee as to which thread from the \c pool individual tasks
+ * will execute. This normally only matters if your code relys on thread
+ * specific information, such as thread locals.
+ *
+ * Use ast_taskprocessor_unreference() to dispose of the returned \ref
+ * ast_taskprocessor.
+ *
+ * Only a single taskprocessor with a given name may exist. This function will fail
+ * if a taskprocessor with the given name already exists.
+ *
+ * \param name Name of the serializer. (must be unique)
+ * \param pool \ref ast_threadpool for execution.
+ * \param shutdown_group Group shutdown controller. (NULL if no group association)
+ *
+ * \return \ref ast_taskprocessor for enqueuing work.
+ * \return \c NULL on error.
+ */
+struct ast_taskprocessor *ast_threadpool_serializer_group(const char *name,
+       struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group);
+
 #endif /* ASTERISK_THREADPOOL_H */
index 6b412d2..d97a7ad 100644 (file)
@@ -1126,18 +1126,126 @@ static void worker_set_state(struct worker_thread *worker, enum worker_state sta
        ast_cond_signal(&worker->cond);
 }
 
+/*! Serializer group shutdown control object. */
+struct ast_serializer_shutdown_group {
+       /*! Shutdown thread waits on this conditional. */
+       ast_cond_t cond;
+       /*! Count of serializers needing to shutdown. */
+       int count;
+};
+
+static void serializer_shutdown_group_dtor(void *vdoomed)
+{
+       struct ast_serializer_shutdown_group *doomed = vdoomed;
+
+       ast_cond_destroy(&doomed->cond);
+}
+
+struct ast_serializer_shutdown_group *ast_serializer_shutdown_group_alloc(void)
+{
+       struct ast_serializer_shutdown_group *shutdown_group;
+
+       shutdown_group = ao2_alloc(sizeof(*shutdown_group), serializer_shutdown_group_dtor);
+       if (!shutdown_group) {
+               return NULL;
+       }
+       ast_cond_init(&shutdown_group->cond, NULL);
+       return shutdown_group;
+}
+
+int ast_serializer_shutdown_group_join(struct ast_serializer_shutdown_group *shutdown_group, int timeout)
+{
+       int remaining;
+       ast_mutex_t *lock;
+
+       if (!shutdown_group) {
+               return 0;
+       }
+
+       lock = ao2_object_get_lockaddr(shutdown_group);
+       ast_assert(lock != NULL);
+
+       ao2_lock(shutdown_group);
+       if (timeout) {
+               struct timeval start;
+               struct timespec end;
+
+               start = ast_tvnow();
+               end.tv_sec = start.tv_sec + timeout;
+               end.tv_nsec = start.tv_usec * 1000;
+               while (shutdown_group->count) {
+                       if (ast_cond_timedwait(&shutdown_group->cond, lock, &end)) {
+                               /* Error or timed out waiting for the count to reach zero. */
+                               break;
+                       }
+               }
+       } else {
+               while (shutdown_group->count) {
+                       if (ast_cond_wait(&shutdown_group->cond, lock)) {
+                               /* Error */
+                               break;
+                       }
+               }
+       }
+       remaining = shutdown_group->count;
+       ao2_unlock(shutdown_group);
+       return remaining;
+}
+
+/*!
+ * \internal
+ * \brief Increment the number of serializer members in the group.
+ * \since 13.5.0
+ *
+ * \param shutdown_group Group shutdown controller.
+ *
+ * \return Nothing
+ */
+static void serializer_shutdown_group_inc(struct ast_serializer_shutdown_group *shutdown_group)
+{
+       ao2_lock(shutdown_group);
+       ++shutdown_group->count;
+       ao2_unlock(shutdown_group);
+}
+
+/*!
+ * \internal
+ * \brief Decrement the number of serializer members in the group.
+ * \since 13.5.0
+ *
+ * \param shutdown_group Group shutdown controller.
+ *
+ * \return Nothing
+ */
+static void serializer_shutdown_group_dec(struct ast_serializer_shutdown_group *shutdown_group)
+{
+       ao2_lock(shutdown_group);
+       --shutdown_group->count;
+       if (!shutdown_group->count) {
+               ast_cond_signal(&shutdown_group->cond);
+       }
+       ao2_unlock(shutdown_group);
+}
+
 struct serializer {
+       /*! Threadpool the serializer will use to process the jobs. */
        struct ast_threadpool *pool;
+       /*! Which group will wait for this serializer to shutdown. */
+       struct ast_serializer_shutdown_group *shutdown_group;
 };
 
 static void serializer_dtor(void *obj)
 {
        struct serializer *ser = obj;
+
        ao2_cleanup(ser->pool);
        ser->pool = NULL;
+       ao2_cleanup(ser->shutdown_group);
+       ser->shutdown_group = NULL;
 }
 
-static struct serializer *serializer_create(struct ast_threadpool *pool)
+static struct serializer *serializer_create(struct ast_threadpool *pool,
+       struct ast_serializer_shutdown_group *shutdown_group)
 {
        struct serializer *ser;
 
@@ -1147,6 +1255,7 @@ static struct serializer *serializer_create(struct ast_threadpool *pool)
        }
        ao2_ref(pool, +1);
        ser->pool = pool;
+       ser->shutdown_group = ao2_bump(shutdown_group);
        return ser;
 }
 
@@ -1187,6 +1296,10 @@ static int serializer_start(struct ast_taskprocessor_listener *listener)
 static void serializer_shutdown(struct ast_taskprocessor_listener *listener)
 {
        struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener);
+
+       if (ser->shutdown_group) {
+               serializer_shutdown_group_dec(ser->shutdown_group);
+       }
        ao2_cleanup(ser);
 }
 
@@ -1201,27 +1314,35 @@ struct ast_taskprocessor *ast_threadpool_serializer_get_current(void)
        return ast_threadstorage_get_ptr(&current_serializer);
 }
 
-struct ast_taskprocessor *ast_threadpool_serializer(const char *name, struct ast_threadpool *pool)
+struct ast_taskprocessor *ast_threadpool_serializer_group(const char *name,
+       struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
 {
-       RAII_VAR(struct serializer *, ser, NULL, ao2_cleanup);
-       RAII_VAR(struct ast_taskprocessor_listener *, listener, NULL, ao2_cleanup);
-       struct ast_taskprocessor *tps = NULL;
+       struct serializer *ser;
+       struct ast_taskprocessor_listener *listener;
+       struct ast_taskprocessor *tps;
 
-       ser = serializer_create(pool);
+       ser = serializer_create(pool, shutdown_group);
        if (!ser) {
                return NULL;
        }
 
        listener = ast_taskprocessor_listener_alloc(&serializer_tps_listener_callbacks, ser);
        if (!listener) {
+               ao2_ref(ser, -1);
                return NULL;
        }
-       ser = NULL; /* ownership transferred to listener */
+       /* ser ref transferred to listener */
 
        tps = ast_taskprocessor_create_with_listener(name, listener);
-       if (!tps) {
-               return NULL;
+       if (tps && shutdown_group) {
+               serializer_shutdown_group_inc(shutdown_group);
        }
 
+       ao2_ref(listener, -1);
        return tps;
 }
+
+struct ast_taskprocessor *ast_threadpool_serializer(const char *name, struct ast_threadpool *pool)
+{
+       return ast_threadpool_serializer_group(name, pool, NULL);
+}
index 8d5adf6..23cf2f0 100644 (file)
@@ -3327,20 +3327,25 @@ int ast_sip_append_body(pjsip_tx_data *tdata, const char *body_text)
        return 0;
 }
 
-struct ast_taskprocessor *ast_sip_create_serializer(void)
+struct ast_taskprocessor *ast_sip_create_serializer_group(struct ast_serializer_shutdown_group *shutdown_group)
 {
        struct ast_taskprocessor *serializer;
        char name[AST_UUID_STR_LEN];
 
        ast_uuid_generate_str(name, sizeof(name));
 
-       serializer = ast_threadpool_serializer(name, sip_threadpool);
+       serializer = ast_threadpool_serializer_group(name, sip_threadpool, shutdown_group);
        if (!serializer) {
                return NULL;
        }
        return serializer;
 }
 
+struct ast_taskprocessor *ast_sip_create_serializer(void)
+{
+       return ast_sip_create_serializer_group(NULL);
+}
+
 /*!
  * \internal
  * \brief Shutdown the serializers in the default pool.