res_pjsip_transport_management.c: Misc cleanups to survive shutdown.
authorRichard Mudgett <rmudgett@digium.com>
Wed, 18 May 2016 22:37:27 +0000 (17:37 -0500)
committerRichard Mudgett <rmudgett@digium.com>
Wed, 15 Jun 2016 19:43:36 +0000 (14:43 -0500)
* In unload_module(), reordered destroying things to minimize the window
that the global transports container could be used by other threads on
shutdown.  When shutting down you need to stop things in the opposite
order of creation.

* Put the global transports container into an AO2_GLOBAL_OBJ_STATIC to
eliminate the crash potential by other threads using the container on
shutdown.

* Made struct monitored_transport.sip_received not use
ast_atomic_fetchadd_int() since it is used as a boolean value that is only
set TRUE.  It was previously incremented for every received SIP message
and could theoretically overflow.

* In monitored_transport_state_callback(), allocated the monitored
transport object without a lock since the lock was unused.

* In keepalive_global_loaded(), removed releasing the transports container
if the keepalive_thread could not be started.  I set it up to be tried
again if the user reloads the configuration.

Change-Id: I8d12d16ef564290fa6d25a32334bb5ce8fdf87ff

res/res_pjsip_transport_management.c

index 8ba8c2d..1cf8e50 100644 (file)
@@ -42,7 +42,7 @@
 static const pj_str_t keepalive_packet = { "\r\n\r\n", 4 };
 
 /*! \brief Global container of active transports */
-static struct ao2_container *transports;
+static AO2_GLOBAL_OBJ_STATIC(monitored_transports);
 
 /*! \brief Scheduler context for timing out connections with no data received */
 static struct ast_sched_context *sched;
@@ -84,6 +84,7 @@ static int keepalive_transport_cb(void *obj, void *arg, int flags)
 /*! \brief Thread which sends keepalives to all active connection-oriented transports */
 static void *keepalive_transport_thread(void *data)
 {
+       struct ao2_container *transports;
        pj_thread_desc desc;
        pj_thread_t *thread;
 
@@ -92,6 +93,11 @@ static void *keepalive_transport_thread(void *data)
                return NULL;
        }
 
+       transports = ao2_global_obj_ref(monitored_transports);
+       if (!transports) {
+               return NULL;
+       }
+
        /* Once loaded this module just keeps on going as it is unsafe to stop and change the underlying
         * callback for the transport manager.
         */
@@ -100,6 +106,7 @@ static void *keepalive_transport_thread(void *data)
                ao2_callback(transports, OBJ_NODATA, keepalive_transport_cb, NULL);
        }
 
+       ao2_ref(transports, -1);
        return NULL;
 }
 
@@ -108,7 +115,6 @@ AST_THREADSTORAGE(desc_storage);
 static int idle_sched_cb(const void *data)
 {
        struct monitored_transport *keepalive = (struct monitored_transport *) data;
-       int sip_received = ast_atomic_fetchadd_int(&keepalive->sip_received, 0);
 
        if (!pj_thread_is_registered()) {
                pj_thread_t *thread;
@@ -126,7 +132,7 @@ static int idle_sched_cb(const void *data)
                pj_thread_register("Transport Monitor", *desc, &thread);
        }
 
-       if (!sip_received) {
+       if (!keepalive->sip_received) {
                ast_log(LOG_NOTICE, "Shutting down transport '%s' since no request was received in %d seconds\n",
                                keepalive->transport->info, IDLE_TIMEOUT);
                pjsip_transport_shutdown(keepalive->transport);
@@ -148,23 +154,30 @@ static void monitored_transport_destroy(void *obj)
 static void monitored_transport_state_callback(pjsip_transport *transport, pjsip_transport_state state,
        const pjsip_transport_state_info *info)
 {
+       struct ao2_container *transports;
+
        /* We only care about reliable transports */
-       if (PJSIP_TRANSPORT_IS_RELIABLE(transport) &&
-                       (transport->dir == PJSIP_TP_DIR_INCOMING || keepalive_interval)) {
+       if (PJSIP_TRANSPORT_IS_RELIABLE(transport)
+               && (transport->dir == PJSIP_TP_DIR_INCOMING || keepalive_interval)
+               && (transports = ao2_global_obj_ref(monitored_transports))) {
                struct monitored_transport *monitored;
 
                switch (state) {
                case PJSIP_TP_STATE_CONNECTED:
-                       monitored = ao2_alloc(sizeof(*monitored), monitored_transport_destroy);
+                       monitored = ao2_alloc_options(sizeof(*monitored),
+                               monitored_transport_destroy, AO2_ALLOC_OPT_LOCK_NOLOCK);
                        if (!monitored) {
                                break;
                        }
                        monitored->transport = transport;
                        pjsip_transport_add_ref(monitored->transport);
+
                        ao2_link(transports, monitored);
+
                        if (transport->dir == PJSIP_TP_DIR_INCOMING) {
                                /* Let the scheduler inherit the reference from allocation */
                                if (ast_sched_add_variable(sched, IDLE_TIMEOUT, idle_sched_cb, monitored, 1) < 0) {
+                                       /* Uh Oh.  Could not schedule the idle check.  Kill the transport. */
                                        ao2_unlink(transports, monitored);
                                        ao2_ref(monitored, -1);
                                        pjsip_transport_shutdown(transport);
@@ -181,6 +194,8 @@ static void monitored_transport_state_callback(pjsip_transport *transport, pjsip
                default:
                        break;
                }
+
+               ao2_ref(transports, -1);
        }
 
        /* Forward to the old state callback if present */
@@ -242,7 +257,7 @@ static int monitored_transport_cmp_fn(void *obj, void *arg, int flags)
                break;
        }
 
-       return !cmp ? CMP_MATCH | CMP_STOP : 0;
+       return !cmp ? CMP_MATCH : 0;
 }
 
 static void keepalive_global_loaded(const char *object_type)
@@ -265,8 +280,8 @@ static void keepalive_global_loaded(const char *object_type)
 
        if (ast_pthread_create(&keepalive_thread, NULL, keepalive_transport_thread, NULL)) {
                ast_log(LOG_ERROR, "Could not create thread for sending keepalive messages.\n");
-               ao2_ref(transports, -1);
-               return;
+               keepalive_thread = AST_PTHREADT_NULL;
+               keepalive_interval = 0;
        }
 }
 
@@ -283,14 +298,21 @@ static struct ast_sorcery_observer keepalive_global_observer = {
  */
 static pj_bool_t idle_monitor_on_rx_request(pjsip_rx_data *rdata)
 {
+       struct ao2_container *transports;
        struct monitored_transport *idle_trans;
 
+       transports = ao2_global_obj_ref(monitored_transports);
+       if (!transports) {
+               return PJ_FALSE;
+       }
+
        idle_trans = ao2_find(transports, rdata->tp_info.transport->obj_name, OBJ_SEARCH_KEY);
+       ao2_ref(transports, -1);
        if (!idle_trans) {
                return PJ_FALSE;
        }
 
-       ast_atomic_fetchadd_int(&idle_trans->sip_received, +1);
+       idle_trans->sip_received = 1;
        ao2_ref(idle_trans, -1);
 
        return PJ_FALSE;
@@ -304,35 +326,38 @@ static pjsip_module idle_monitor_module = {
 
 static int load_module(void)
 {
+       struct ao2_container *transports;
        pjsip_tpmgr *tpmgr;
 
        CHECK_PJSIP_MODULE_LOADED();
 
+       tpmgr = pjsip_endpt_get_tpmgr(ast_sip_get_pjsip_endpoint());
+       if (!tpmgr) {
+               ast_log(LOG_ERROR, "No transport manager to attach keepalive functionality to.\n");
+               return AST_MODULE_LOAD_DECLINE;
+       }
+
        transports = ao2_container_alloc(TRANSPORTS_BUCKETS, monitored_transport_hash_fn,
                monitored_transport_cmp_fn);
        if (!transports) {
                ast_log(LOG_ERROR, "Could not create container for transports to perform keepalive on.\n");
                return AST_MODULE_LOAD_DECLINE;
        }
-
-       tpmgr = pjsip_endpt_get_tpmgr(ast_sip_get_pjsip_endpoint());
-       if (!tpmgr) {
-               ast_log(LOG_ERROR, "No transport manager to attach keepalive functionality to.\n");
-               ao2_ref(transports, -1);
-               return AST_MODULE_LOAD_DECLINE;
-       }
+       ao2_global_obj_replace_unref(monitored_transports, transports);
+       ao2_ref(transports, -1);
 
        sched = ast_sched_context_create();
        if (!sched) {
                ast_log(LOG_ERROR, "Failed to create keepalive scheduler context.\n");
-               ao2_ref(transports, -1);
+               ao2_global_obj_release(monitored_transports);
                return AST_MODULE_LOAD_DECLINE;
        }
 
        if (ast_sched_start_thread(sched)) {
                ast_log(LOG_ERROR, "Failed to start keepalive scheduler thread\n");
                ast_sched_context_destroy(sched);
-               ao2_ref(transports, -1);
+               sched = NULL;
+               ao2_global_obj_release(monitored_transports);
                return AST_MODULE_LOAD_DECLINE;
        }
 
@@ -343,25 +368,38 @@ static int load_module(void)
 
        ast_sorcery_observer_add(ast_sip_get_sorcery(), "global", &keepalive_global_observer);
        ast_sorcery_reload_object(ast_sip_get_sorcery(), "global");
+
        ast_module_shutdown_ref(ast_module_info->self);
        return AST_MODULE_LOAD_SUCCESS;
 }
 
 static int unload_module(void)
 {
-       pjsip_tpmgr *tpmgr = pjsip_endpt_get_tpmgr(ast_sip_get_pjsip_endpoint());
+       pjsip_tpmgr *tpmgr;
 
        if (keepalive_interval) {
                keepalive_interval = 0;
-               pthread_kill(keepalive_thread, SIGURG);
-               pthread_join(keepalive_thread, NULL);
+               if (keepalive_thread != AST_PTHREADT_NULL) {
+                       pthread_kill(keepalive_thread, SIGURG);
+                       pthread_join(keepalive_thread, NULL);
+                       keepalive_thread = AST_PTHREADT_NULL;
+               }
        }
 
-       ast_sched_context_destroy(sched);
-       ao2_ref(transports, -1);
+       ast_sorcery_observer_remove(ast_sip_get_sorcery(), "global", &keepalive_global_observer);
+
+       tpmgr = pjsip_endpt_get_tpmgr(ast_sip_get_pjsip_endpoint());
+       if (tpmgr) {
+               pjsip_tpmgr_set_state_cb(tpmgr, tpmgr_state_callback);
+       }
 
        ast_sip_unregister_service(&idle_monitor_module);
-       pjsip_tpmgr_set_state_cb(tpmgr, tpmgr_state_callback);
+
+       ast_sched_context_destroy(sched);
+       sched = NULL;
+
+       ao2_global_obj_release(monitored_transports);
+
        return 0;
 }