res_pjsip: PJSIP Transport state monitor refactor.
authorRichard Mudgett <rmudgett@digium.com>
Fri, 28 Jul 2017 23:26:17 +0000 (18:26 -0500)
committerRichard Mudgett <rmudgett@digium.com>
Thu, 10 Aug 2017 17:18:58 +0000 (12:18 -0500)
The fix for the issue is broken up into three parts.

This is part one which refactors the transport state monitor code to allow
more modules to be able to monitor transports.

* Pull the management of PJPROJECT's transport state callback code from
res_pjsip_transport_management.c into res_pjsip.  Now other modules can
dynamically add and remove themselves from transport monitoring without
worrying about breaking PJPROJECT's callback chain.

* Add the ability for other modules to get a callback whenever a specific
transport is shutdown.

ASTERISK-27147

Change-Id: I7d9a31371eb1487c9b7050cf82a9af5180a57912

include/asterisk/res_pjsip.h
res/res_pjsip.c
res/res_pjsip/include/res_pjsip_private.h
res/res_pjsip/pjsip_transport_events.c [new file with mode: 0644]
res/res_pjsip_transport_management.c

index 890ce59..31db367 100644 (file)
@@ -2926,4 +2926,91 @@ int ast_sip_dtmf_to_str(const enum ast_sip_dtmf_mode dtmf,
  */
 int ast_sip_str_to_dtmf(const char *dtmf_mode);
 
+/*!
+ * \brief Transport shutdown monitor callback.
+ * \since 13.18.0
+ *
+ * \param data User data to know what to do when transport shuts down.
+ *
+ * \note The callback does not need to care that data is an ao2 object.
+ *
+ * \return Nothing
+ */
+typedef void (*ast_transport_monitor_shutdown_cb)(void *data);
+
+enum ast_transport_monitor_reg {
+       /*! \brief Successfully registered the transport monitor */
+       AST_TRANSPORT_MONITOR_REG_SUCCESS,
+       /*! \brief Replaced the already existing transport monitor with new one. */
+       AST_TRANSPORT_MONITOR_REG_REPLACED,
+       /*!
+        * \brief Transport not found to monitor.
+        * \note Transport is either already shutdown or is not reliable.
+        */
+       AST_TRANSPORT_MONITOR_REG_NOT_FOUND,
+       /*! \brief Error while registering transport monitor. */
+       AST_TRANSPORT_MONITOR_REG_FAILED,
+};
+
+/*!
+ * \brief Register a reliable transport shutdown monitor callback.
+ * \since 13.18.0
+ *
+ * \param transport Transport to monitor for shutdown.
+ * \param cb Who to call when transport is shutdown.
+ * \param ao2_data Data to pass with the callback.
+ *
+ * \return enum ast_transport_monitor_reg
+ */
+enum ast_transport_monitor_reg ast_sip_transport_monitor_register(pjsip_transport *transport,
+       ast_transport_monitor_shutdown_cb cb, void *ao2_data);
+
+/*!
+ * \brief Unregister a reliable transport shutdown monitor callback.
+ * \since 13.18.0
+ *
+ * \param transport Transport to monitor for shutdown.
+ * \param cb Who to call when transport is shutdown.
+ *
+ * \return Nothing
+ */
+void ast_sip_transport_monitor_unregister(pjsip_transport *transport, ast_transport_monitor_shutdown_cb cb);
+
+/*!
+ * \brief Unregister monitor callback from all reliable transports.
+ * \since 13.18.0
+ *
+ * \param cb Who to call when a transport is shutdown.
+ *
+ * \return Nothing
+ */
+void ast_sip_transport_monitor_unregister_all(ast_transport_monitor_shutdown_cb cb);
+
+/*! Transport state notification registration element.  */
+struct ast_sip_tpmgr_state_callback {
+       /*! PJPROJECT transport state notification callback */
+       pjsip_tp_state_callback cb;
+       AST_LIST_ENTRY(ast_sip_tpmgr_state_callback) node;
+};
+
+/*!
+ * \brief Register a transport state notification callback element.
+ * \since 13.18.0
+ *
+ * \param element What we are registering.
+ *
+ * \return Nothing
+ */
+void ast_sip_transport_state_register(struct ast_sip_tpmgr_state_callback *element);
+
+/*!
+ * \brief Unregister a transport state notification callback element.
+ * \since 13.18.0
+ *
+ * \param element What we are unregistering.
+ *
+ * \return Nothing
+ */
+void ast_sip_transport_state_unregister(struct ast_sip_tpmgr_state_callback *element);
+
 #endif /* _RES_PJSIP_H */
index f3648ac..2917df3 100644 (file)
@@ -4662,6 +4662,7 @@ static int unload_pjsip(void *data)
                ast_sip_destroy_system();
                ast_sip_destroy_global_headers();
                internal_sip_unregister_service(&supplement_module);
+               ast_sip_destroy_transport_events();
        }
 
        if (monitor_thread) {
@@ -4740,7 +4741,6 @@ static int load_pjsip(void)
        return AST_MODULE_LOAD_SUCCESS;
 
 error:
-       unload_pjsip(NULL);
        return AST_MODULE_LOAD_DECLINE;
 }
 
@@ -4806,6 +4806,11 @@ static int load_module(void)
                goto error;
        }
 
+       if (ast_sip_initialize_transport_events()) {
+               ast_log(LOG_ERROR, "Failed to initialize SIP transport monitor. Aborting load\n");
+               goto error;
+       }
+
        ast_sip_initialize_dns();
 
        ast_sip_initialize_global_headers();
index 0bdb633..2969f0e 100644 (file)
@@ -135,6 +135,29 @@ void ast_sip_destroy_distributor(void);
 
 /*!
  * \internal
+ * \brief Initialize the transport events notify module
+ * \since 13.18.0
+ *
+ * The transport events notify module is responsible for monitoring
+ * when transports die and calling any registered callbacks when that
+ * happens.  It also manages any PJPROJECT transport state callbacks
+ * registered to it so the callbacks be more dynamic allowing module
+ * loading/unloading.
+ *
+ * \retval -1 Failure
+ * \retval 0 Success
+ */
+int ast_sip_initialize_transport_events(void);
+
+/*!
+ * \internal
+ * \brief Destruct the transport events notify module.
+ * \since 13.18.0
+ */
+void ast_sip_destroy_transport_events(void);
+
+/*!
+ * \internal
  * \brief Initialize global type on a sorcery instance
  *
  * \retval -1 failure
diff --git a/res/res_pjsip/pjsip_transport_events.c b/res/res_pjsip/pjsip_transport_events.c
new file mode 100644 (file)
index 0000000..0f57303
--- /dev/null
@@ -0,0 +1,366 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2017, Digium Inc.
+ *
+ * Richard Mudgett <rmudgett@digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*!
+ * \file
+ * \brief Manages the global transport event notification callbacks.
+ *
+ * \author Richard Mudgett <rmudgett@digium.com>
+ *     See Also:
+ *
+ * \arg \ref AstCREDITS
+ */
+
+
+#include "asterisk.h"
+
+#include "asterisk/res_pjsip.h"
+#include "include/res_pjsip_private.h"
+#include "asterisk/linkedlists.h"
+#include "asterisk/vector.h"
+
+/* ------------------------------------------------------------------- */
+
+/*! \brief Number of buckets for monitored active transports */
+#define ACTIVE_TRANSPORTS_BUCKETS 127
+
+/*! Who to notify when transport shuts down. */
+struct transport_monitor_notifier {
+       /*! Who to call when transport shuts down. */
+       ast_transport_monitor_shutdown_cb cb;
+       /*! ao2 data object to pass to callback. */
+       void *data;
+};
+
+/*! \brief Structure for transport to be monitored */
+struct transport_monitor {
+       /*! \brief The underlying PJSIP transport */
+       pjsip_transport *transport;
+       /*! Who is interested in when this transport shuts down. */
+       AST_VECTOR(, struct transport_monitor_notifier) monitors;
+};
+
+/*! \brief Global container of active reliable transports */
+static AO2_GLOBAL_OBJ_STATIC(active_transports);
+
+/*! \brief Existing transport events callback that we need to invoke */
+static pjsip_tp_state_callback tpmgr_state_callback;
+
+/*! List of registered transport state callbacks. */
+static AST_RWLIST_HEAD(, ast_sip_tpmgr_state_callback) transport_state_list;
+
+
+/*! \brief Hashing function for struct transport_monitor */
+AO2_STRING_FIELD_HASH_FN(transport_monitor, transport->obj_name);
+
+/*! \brief Comparison function for struct transport_monitor */
+AO2_STRING_FIELD_CMP_FN(transport_monitor, transport->obj_name);
+
+static const char *transport_state2str(pjsip_transport_state state)
+{
+       const char *name;
+
+       switch (state) {
+       case PJSIP_TP_STATE_CONNECTED:
+               name = "CONNECTED";
+               break;
+       case PJSIP_TP_STATE_DISCONNECTED:
+               name = "DISCONNECTED";
+               break;
+       case PJSIP_TP_STATE_SHUTDOWN:
+               name = "SHUTDOWN";
+               break;
+       case PJSIP_TP_STATE_DESTROY:
+               name = "DESTROY";
+               break;
+       default:
+               /*
+                * We have to have a default case because the enum is
+                * defined by a third-party library.
+                */
+               ast_assert(0);
+               name = "<unknown>";
+               break;
+       }
+       return name;
+}
+
+static void transport_monitor_dtor(void *vdoomed)
+{
+       struct transport_monitor *monitored = vdoomed;
+       int idx;
+
+       for (idx = AST_VECTOR_SIZE(&monitored->monitors); idx--;) {
+               struct transport_monitor_notifier *notifier;
+
+               notifier = AST_VECTOR_GET_ADDR(&monitored->monitors, idx);
+               ao2_cleanup(notifier->data);
+       }
+       AST_VECTOR_FREE(&monitored->monitors);
+}
+
+/*! \brief Callback invoked when transport state changes occur */
+static void transport_state_callback(pjsip_transport *transport,
+       pjsip_transport_state state, const pjsip_transport_state_info *info)
+{
+       struct ao2_container *transports;
+
+       /* We only care about monitoring reliable transports */
+       if (PJSIP_TRANSPORT_IS_RELIABLE(transport)
+               && (transports = ao2_global_obj_ref(active_transports))) {
+               struct transport_monitor *monitored;
+
+               ast_debug(3, "Reliable transport '%s' state:%s\n",
+                       transport->obj_name, transport_state2str(state));
+               switch (state) {
+               case PJSIP_TP_STATE_CONNECTED:
+                       monitored = ao2_alloc_options(sizeof(*monitored),
+                               transport_monitor_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
+                       if (!monitored) {
+                               break;
+                       }
+                       monitored->transport = transport;
+                       if (AST_VECTOR_INIT(&monitored->monitors, 2)) {
+                               ao2_ref(monitored, -1);
+                               break;
+                       }
+
+                       ao2_link(transports, monitored);
+                       ao2_ref(monitored, -1);
+                       break;
+               case PJSIP_TP_STATE_DISCONNECTED:
+                       if (!transport->is_shutdown) {
+                               pjsip_transport_shutdown(transport);
+                       }
+                       break;
+               case PJSIP_TP_STATE_SHUTDOWN:
+                       /*
+                        * Set shutdown flag early so we can force a new transport to be
+                        * created if a monitor callback needs to reestablish a link.
+                        * PJPROJECT sets the flag after this routine returns even though
+                        * it has already called the transport's shutdown routine.
+                        */
+                       transport->is_shutdown = PJ_TRUE;
+
+                       monitored = ao2_find(transports, transport->obj_name,
+                               OBJ_SEARCH_KEY | OBJ_UNLINK);
+                       if (monitored) {
+                               int idx;
+
+                               for (idx = AST_VECTOR_SIZE(&monitored->monitors); idx--;) {
+                                       struct transport_monitor_notifier *notifier;
+
+                                       notifier = AST_VECTOR_GET_ADDR(&monitored->monitors, idx);
+                                       notifier->cb(notifier->data);
+                               }
+                               ao2_ref(monitored, -1);
+                       }
+                       break;
+               default:
+                       break;
+               }
+
+               ao2_ref(transports, -1);
+       }
+
+       /* Loop over other transport state callbacks registered with us. */
+       if (!AST_LIST_EMPTY(&transport_state_list)) {
+               struct ast_sip_tpmgr_state_callback *tpmgr_notifier;
+
+               AST_RWLIST_RDLOCK(&transport_state_list);
+               AST_LIST_TRAVERSE(&transport_state_list, tpmgr_notifier, node) {
+                       tpmgr_notifier->cb(transport, state, info);
+               }
+               AST_RWLIST_UNLOCK(&transport_state_list);
+       }
+
+       /* Forward to the old state callback if present */
+       if (tpmgr_state_callback) {
+               tpmgr_state_callback(transport, state, info);
+       }
+}
+
+static int transport_monitor_unregister_all(void *obj, void *arg, int flags)
+{
+       struct transport_monitor *monitored = obj;
+       ast_transport_monitor_shutdown_cb cb = arg;
+       int idx;
+
+       for (idx = AST_VECTOR_SIZE(&monitored->monitors); idx--;) {
+               struct transport_monitor_notifier *notifier;
+
+               notifier = AST_VECTOR_GET_ADDR(&monitored->monitors, idx);
+               if (notifier->cb == cb) {
+                       ao2_cleanup(notifier->data);
+                       AST_VECTOR_REMOVE_UNORDERED(&monitored->monitors, idx);
+                       break;
+               }
+       }
+       return 0;
+}
+
+void ast_sip_transport_monitor_unregister_all(ast_transport_monitor_shutdown_cb cb)
+{
+       struct ao2_container *transports;
+
+       transports = ao2_global_obj_ref(active_transports);
+       if (!transports) {
+               return;
+       }
+       ao2_callback(transports, OBJ_MULTIPLE | OBJ_NODATA, transport_monitor_unregister_all,
+               cb);
+       ao2_ref(transports, -1);
+}
+
+void ast_sip_transport_monitor_unregister(pjsip_transport *transport, ast_transport_monitor_shutdown_cb cb)
+{
+       struct ao2_container *transports;
+       struct transport_monitor *monitored;
+
+       transports = ao2_global_obj_ref(active_transports);
+       if (!transports) {
+               return;
+       }
+
+       ao2_lock(transports);
+       monitored = ao2_find(transports, transport->obj_name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+       if (monitored) {
+               int idx;
+
+               for (idx = AST_VECTOR_SIZE(&monitored->monitors); idx--;) {
+                       struct transport_monitor_notifier *notifier;
+
+                       notifier = AST_VECTOR_GET_ADDR(&monitored->monitors, idx);
+                       if (notifier->cb == cb) {
+                               ao2_cleanup(notifier->data);
+                               AST_VECTOR_REMOVE_UNORDERED(&monitored->monitors, idx);
+                               break;
+                       }
+               }
+               ao2_ref(monitored, -1);
+       }
+       ao2_unlock(transports);
+       ao2_ref(transports, -1);
+}
+
+enum ast_transport_monitor_reg ast_sip_transport_monitor_register(pjsip_transport *transport,
+       ast_transport_monitor_shutdown_cb cb, void *ao2_data)
+{
+       struct ao2_container *transports;
+       struct transport_monitor *monitored;
+       enum ast_transport_monitor_reg res = AST_TRANSPORT_MONITOR_REG_NOT_FOUND;
+
+       transports = ao2_global_obj_ref(active_transports);
+       if (!transports) {
+               return res;
+       }
+
+       ao2_lock(transports);
+       monitored = ao2_find(transports, transport->obj_name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+       if (monitored) {
+               int idx;
+               struct transport_monitor_notifier new_monitor;
+
+               /* Check if the callback monitor already exists */
+               for (idx = AST_VECTOR_SIZE(&monitored->monitors); idx--;) {
+                       struct transport_monitor_notifier *notifier;
+
+                       notifier = AST_VECTOR_GET_ADDR(&monitored->monitors, idx);
+                       if (notifier->cb == cb) {
+                               /* The monitor is already in the vector replace with new ao2_data. */
+                               ao2_replace(notifier->data, ao2_data);
+                               res = AST_TRANSPORT_MONITOR_REG_REPLACED;
+                               goto register_done;
+                       }
+               }
+
+               /* Add new monitor to vector */
+               new_monitor.cb = cb;
+               new_monitor.data = ao2_bump(ao2_data);
+               if (AST_VECTOR_APPEND(&monitored->monitors, new_monitor)) {
+                       ao2_cleanup(ao2_data);
+                       res = AST_TRANSPORT_MONITOR_REG_FAILED;
+               }
+
+register_done:
+               ao2_ref(monitored, -1);
+       }
+       ao2_unlock(transports);
+       ao2_ref(transports, -1);
+       return res;
+}
+
+void ast_sip_transport_state_unregister(struct ast_sip_tpmgr_state_callback *element)
+{
+       AST_RWLIST_WRLOCK(&transport_state_list);
+       AST_LIST_REMOVE(&transport_state_list, element, node);
+       AST_RWLIST_UNLOCK(&transport_state_list);
+}
+
+void ast_sip_transport_state_register(struct ast_sip_tpmgr_state_callback *element)
+{
+       struct ast_sip_tpmgr_state_callback *tpmgr_notifier;
+
+       AST_RWLIST_WRLOCK(&transport_state_list);
+       AST_LIST_TRAVERSE(&transport_state_list, tpmgr_notifier, node) {
+               if (element == tpmgr_notifier) {
+                       /* Already registered. */
+                       AST_RWLIST_UNLOCK(&transport_state_list);
+                       return;
+               }
+       }
+       AST_LIST_INSERT_HEAD(&transport_state_list, element, node);
+       AST_RWLIST_UNLOCK(&transport_state_list);
+}
+
+void ast_sip_destroy_transport_events(void)
+{
+       pjsip_tpmgr *tpmgr;
+
+       tpmgr = pjsip_endpt_get_tpmgr(ast_sip_get_pjsip_endpoint());
+       if (tpmgr) {
+               pjsip_tpmgr_set_state_cb(tpmgr, tpmgr_state_callback);
+       }
+
+       ao2_global_obj_release(active_transports);
+}
+
+int ast_sip_initialize_transport_events(void)
+{
+       pjsip_tpmgr *tpmgr;
+       struct ao2_container *transports;
+
+       tpmgr = pjsip_endpt_get_tpmgr(ast_sip_get_pjsip_endpoint());
+       if (!tpmgr) {
+               return -1;
+       }
+
+       transports = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
+               ACTIVE_TRANSPORTS_BUCKETS, transport_monitor_hash_fn, NULL,
+               transport_monitor_cmp_fn);
+       if (!transports) {
+               return -1;
+       }
+       ao2_global_obj_replace_unref(active_transports, transports);
+       ao2_ref(transports, -1);
+
+       tpmgr_state_callback = pjsip_tpmgr_get_state_cb(tpmgr);
+       pjsip_tpmgr_set_state_cb(tpmgr, &transport_state_callback);
+
+       return 0;
+}
index 86c53ca..eb92eb7 100644 (file)
@@ -34,7 +34,7 @@
 #include "asterisk/astobj2.h"
 
 /*! \brief Number of buckets for monitored transports */
-#define TRANSPORTS_BUCKETS 53
+#define TRANSPORTS_BUCKETS 127
 
 #define IDLE_TIMEOUT (pjsip_cfg()->tsx.td)
 
@@ -53,9 +53,6 @@ static pthread_t keepalive_thread = AST_PTHREADT_NULL;
 /*! \brief The global interval at which to send keepalives */
 static unsigned int keepalive_interval;
 
-/*! \brief Existing transport manager callback that we need to invoke */
-static pjsip_tp_state_callback tpmgr_state_callback;
-
 /*! \brief Structure for transport to be monitored */
 struct monitored_transport {
        /*! \brief The underlying PJSIP transport */
@@ -178,14 +175,13 @@ static void monitored_transport_state_callback(pjsip_transport *transport, pjsip
                                /* Let the scheduler inherit the reference from allocation */
                                if (ast_sched_add_variable(sched, IDLE_TIMEOUT, idle_sched_cb, monitored, 1) < 0) {
                                        /* Uh Oh.  Could not schedule the idle check.  Kill the transport. */
-                                       ao2_unlink(transports, monitored);
-                                       ao2_ref(monitored, -1);
                                        pjsip_transport_shutdown(transport);
+                               } else {
+                                       /* monitored ref successfully passed to idle_sched_cb() */
+                                       break;
                                }
-                       } else {
-                               /* No scheduled task, so get rid of the allocation reference */
-                               ao2_ref(monitored, -1);
                        }
+                       ao2_ref(monitored, -1);
                        break;
                case PJSIP_TP_STATE_SHUTDOWN:
                case PJSIP_TP_STATE_DISCONNECTED:
@@ -197,13 +193,12 @@ static void monitored_transport_state_callback(pjsip_transport *transport, pjsip
 
                ao2_ref(transports, -1);
        }
-
-       /* Forward to the old state callback if present */
-       if (tpmgr_state_callback) {
-               tpmgr_state_callback(transport, state, info);
-       }
 }
 
+struct ast_sip_tpmgr_state_callback monitored_transport_reg = {
+       monitored_transport_state_callback,
+};
+
 /*! \brief Hashing function for monitored transport */
 static int monitored_transport_hash_fn(const void *obj, int flags)
 {
@@ -327,16 +322,9 @@ static pjsip_module idle_monitor_module = {
 static int load_module(void)
 {
        struct ao2_container *transports;
-       pjsip_tpmgr *tpmgr;
 
        CHECK_PJSIP_MODULE_LOADED();
 
-       tpmgr = pjsip_endpt_get_tpmgr(ast_sip_get_pjsip_endpoint());
-       if (!tpmgr) {
-               ast_log(LOG_ERROR, "No transport manager to attach keepalive functionality to.\n");
-               return AST_MODULE_LOAD_DECLINE;
-       }
-
        transports = ao2_container_alloc(TRANSPORTS_BUCKETS, monitored_transport_hash_fn,
                monitored_transport_cmp_fn);
        if (!transports) {
@@ -363,8 +351,7 @@ static int load_module(void)
 
        ast_sip_register_service(&idle_monitor_module);
 
-       tpmgr_state_callback = pjsip_tpmgr_get_state_cb(tpmgr);
-       pjsip_tpmgr_set_state_cb(tpmgr, &monitored_transport_state_callback);
+       ast_sip_transport_state_register(&monitored_transport_reg);
 
        ast_sorcery_observer_add(ast_sip_get_sorcery(), "global", &keepalive_global_observer);
        ast_sorcery_reload_object(ast_sip_get_sorcery(), "global");
@@ -375,8 +362,6 @@ static int load_module(void)
 
 static int unload_module(void)
 {
-       pjsip_tpmgr *tpmgr;
-
        if (keepalive_interval) {
                keepalive_interval = 0;
                if (keepalive_thread != AST_PTHREADT_NULL) {
@@ -388,10 +373,7 @@ static int unload_module(void)
 
        ast_sorcery_observer_remove(ast_sip_get_sorcery(), "global", &keepalive_global_observer);
 
-       tpmgr = pjsip_endpt_get_tpmgr(ast_sip_get_pjsip_endpoint());
-       if (tpmgr) {
-               pjsip_tpmgr_set_state_cb(tpmgr, tpmgr_state_callback);
-       }
+       ast_sip_transport_state_unregister(&monitored_transport_reg);
 
        ast_sip_unregister_service(&idle_monitor_module);