pjsip_distributor.c: Consistently pick a serializer for messages.
authorRichard Mudgett <rmudgett@digium.com>
Thu, 26 May 2016 22:35:04 +0000 (17:35 -0500)
committerRichard Mudgett <rmudgett@digium.com>
Thu, 9 Jun 2016 15:32:06 +0000 (10:32 -0500)
Incoming messages that are not part of a dialog or a recognized response
to one of our requests need to be sent to a consistent serializer.  Under
load we may be queueing retransmissions before we can process the original
message.  We don't need to throw these messages onto random serializers
and cause reentrancy and message sequencing problems.

* Created a pool of pjsip/distributor serializers that get picked by
hashing the call-id and remote tag strings of the received messages.

* Made ast_sip_destroy_distributor() destroy items in the reverse order of
creation.

ASTERISK-26088
Reported by:  Richard Mudgett

Change-Id: I2ce769389fc060d9f379977f559026fbcb632407

include/asterisk/res_pjsip.h
res/res_pjsip/pjsip_distributor.c

index 50d02d9..d1f0c98 100644 (file)
@@ -1301,6 +1301,17 @@ struct ast_serializer_shutdown_group;
 struct ast_taskprocessor *ast_sip_create_serializer_group(const char *name, struct ast_serializer_shutdown_group *shutdown_group);
 
 /*!
+ * \brief Determine the distributor serializer for the SIP message.
+ * \since 13.10.0
+ *
+ * \param rdata The incoming message.
+ *
+ * \retval Calculated distributor serializer on success.
+ * \retval NULL on error.
+ */
+struct ast_taskprocessor *ast_sip_get_distributor_serializer(pjsip_rx_data *rdata);
+
+/*!
  * \brief Set a serializer on a SIP dialog so requests and responses are automatically serialized
  *
  * Passing a NULL serializer is a way to remove a serializer from a dialog.
index 288a3e0..75ae461 100644 (file)
@@ -59,6 +59,12 @@ struct unidentified_request{
        char src_name[];
 };
 
+/*! Number of serializers in pool if one not otherwise known.  (Best if prime number) */
+#define DISTRIBUTOR_POOL_SIZE          31
+
+/*! Pool of serializers to use if not supplied. */
+static struct ast_taskprocessor *distributor_pool[DISTRIBUTOR_POOL_SIZE];
+
 /*!
  * \internal
  * \brief Record the task's serializer name on the tdata structure.
@@ -278,6 +284,83 @@ static pjsip_dialog *find_dialog(pjsip_rx_data *rdata)
        return dlg;
 }
 
+/*!
+ * \internal
+ * \brief Compute a hash value on a pjlib string
+ * \since 13.10.0
+ *
+ * \param[in] str The pjlib string to add to the hash
+ * \param[in] hash The hash value to add to
+ *
+ * \details
+ * This version of the function is for when you need to compute a
+ * string hash of more than one string.
+ *
+ * This famous hash algorithm was written by Dan Bernstein and is
+ * commonly used.
+ *
+ * \sa http://www.cse.yorku.ca/~oz/hash.html
+ */
+static int pjstr_hash_add(pj_str_t *str, int hash)
+{
+       size_t len;
+       const char *pos;
+
+       len = pj_strlen(str);
+       pos = pj_strbuf(str);
+       while (len--) {
+               hash = hash * 33 ^ *pos++;
+       }
+
+       return hash;
+}
+
+/*!
+ * \internal
+ * \brief Compute a hash value on a pjlib string
+ * \since 13.10.0
+ *
+ * \param[in] str The pjlib string to hash
+ *
+ * This famous hash algorithm was written by Dan Bernstein and is
+ * commonly used.
+ *
+ * http://www.cse.yorku.ca/~oz/hash.html
+ */
+static int pjstr_hash(pj_str_t *str)
+{
+       return pjstr_hash_add(str, 5381);
+}
+
+struct ast_taskprocessor *ast_sip_get_distributor_serializer(pjsip_rx_data *rdata)
+{
+       int hash;
+       pj_str_t *remote_tag;
+       struct ast_taskprocessor *serializer;
+
+       if (!rdata->msg_info.msg) {
+               return NULL;
+       }
+
+       if (rdata->msg_info.msg->type == PJSIP_REQUEST_MSG) {
+               remote_tag = &rdata->msg_info.from->tag;
+       } else {
+               remote_tag = &rdata->msg_info.to->tag;
+       }
+
+       /* Compute the hash from the SIP message call-id and remote-tag */
+       hash = pjstr_hash(&rdata->msg_info.cid->id);
+       hash = pjstr_hash_add(remote_tag, hash);
+       hash = abs(hash);
+
+       serializer = ao2_bump(distributor_pool[hash % ARRAY_LEN(distributor_pool)]);
+       if (serializer) {
+               ast_debug(3, "Calculated serializer %s to use for %s\n",
+                       ast_taskprocessor_name(serializer), pjsip_rx_data_get_info(rdata));
+       }
+       return serializer;
+}
+
 static pj_bool_t endpoint_lookup(pjsip_rx_data *rdata);
 
 static pjsip_module endpoint_mod = {
@@ -324,12 +407,23 @@ static pj_bool_t distributor(pjsip_rx_data *rdata)
                ast_debug(3, "No dialog serializer for response %s. Using request transaction as basis\n",
                        pjsip_rx_data_get_info(rdata));
                serializer = find_request_serializer(rdata);
+               if (!serializer) {
+                       /*
+                        * Pick a serializer for the unmatched response.  Maybe
+                        * the stack can figure out what it is for, or we really
+                        * should just toss it regardless.
+                        */
+                       serializer = ast_sip_get_distributor_serializer(rdata);
+               }
        } else if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_cancel_method)
                || !pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_bye_method)) {
                /* We have a BYE or CANCEL request without a serializer. */
                pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata,
                        PJSIP_SC_CALL_TSX_DOES_NOT_EXIST, NULL, NULL, NULL);
                return PJ_TRUE;
+       } else {
+               /* Pick a serializer for the out-of-dialog request. */
+               serializer = ast_sip_get_distributor_serializer(rdata);
        }
 
        pjsip_rx_data_clone(rdata, 0, &clone);
@@ -349,7 +443,10 @@ static pj_bool_t distributor(pjsip_rx_data *rdata)
                ao2_cleanup(clone->endpt_info.mod_data[endpoint_mod.id]);
                pjsip_rx_data_free_cloned(clone);
        } else {
-               ast_sip_push_task(serializer, distribute, clone);
+               if (ast_sip_push_task(serializer, distribute, clone)) {
+                       ao2_cleanup(clone->endpt_info.mod_data[endpoint_mod.id]);
+                       pjsip_rx_data_free_cloned(clone);
+               }
        }
 
        ast_taskprocessor_unreference(serializer);
@@ -796,6 +893,7 @@ static int cli_unid_print_header(void *obj, void *arg, int flags)
 
        return 0;
 }
+
 static int cli_unid_print_body(void *obj, void *arg, int flags)
 {
        struct unidentified_request *unid = obj;
@@ -886,6 +984,47 @@ static struct ast_sorcery_observer global_observer = {
        .loaded = global_loaded,
 };
 
+/*!
+ * \internal
+ * \brief Shutdown the serializers in the distributor pool.
+ * \since 13.10.0
+ *
+ * \return Nothing
+ */
+static void distributor_pool_shutdown(void)
+{
+       int idx;
+
+       for (idx = 0; idx < ARRAY_LEN(distributor_pool); ++idx) {
+               ast_taskprocessor_unreference(distributor_pool[idx]);
+               distributor_pool[idx] = NULL;
+       }
+}
+
+/*!
+ * \internal
+ * \brief Setup the serializers in the distributor pool.
+ * \since 13.10.0
+ *
+ * \retval 0 on success.
+ * \retval -1 on error.
+ */
+static int distributor_pool_setup(void)
+{
+       char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
+       int idx;
+
+       for (idx = 0; idx < ARRAY_LEN(distributor_pool); ++idx) {
+               /* Create name with seq number appended. */
+               ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/distributor");
+
+               distributor_pool[idx] = ast_sip_create_serializer(tps_name);
+               if (!distributor_pool[idx]) {
+                       return -1;
+               }
+       }
+       return 0;
+}
 
 int ast_sip_initialize_distributor(void)
 {
@@ -895,6 +1034,11 @@ int ast_sip_initialize_distributor(void)
                return -1;
        }
 
+       if (distributor_pool_setup()) {
+               ast_sip_destroy_distributor();
+               return -1;
+       }
+
        prune_context = ast_sched_context_create();
        if (!prune_context) {
                ast_sip_destroy_distributor();
@@ -927,8 +1071,10 @@ int ast_sip_initialize_distributor(void)
                return -1;
        }
 
-       unid_formatter = ao2_alloc(sizeof(struct ast_sip_cli_formatter_entry), NULL);
+       unid_formatter = ao2_alloc_options(sizeof(struct ast_sip_cli_formatter_entry), NULL,
+               AO2_ALLOC_OPT_LOCK_NOLOCK);
        if (!unid_formatter) {
+               ast_sip_destroy_distributor();
                ast_log(LOG_ERROR, "Unable to allocate memory for unid_formatter\n");
                return -1;
        }
@@ -940,6 +1086,7 @@ int ast_sip_initialize_distributor(void)
        unid_formatter->get_id = cli_unid_get_id;
        unid_formatter->retrieve_by_id = cli_unid_retrieve_by_id;
        ast_sip_register_cli_formatter(unid_formatter);
+
        ast_cli_register_multiple(cli_commands, ARRAY_LEN(cli_commands));
 
        return 0;
@@ -950,17 +1097,20 @@ void ast_sip_destroy_distributor(void)
        ast_cli_unregister_multiple(cli_commands, ARRAY_LEN(cli_commands));
        ast_sip_unregister_cli_formatter(unid_formatter);
 
-       internal_sip_unregister_service(&distributor_mod);
-       internal_sip_unregister_service(&endpoint_mod);
        internal_sip_unregister_service(&auth_mod);
+       internal_sip_unregister_service(&endpoint_mod);
+       internal_sip_unregister_service(&distributor_mod);
 
        ao2_cleanup(artificial_auth);
        ao2_cleanup(artificial_endpoint);
-       ao2_cleanup(unidentified_requests);
 
        ast_sorcery_observer_remove(ast_sip_get_sorcery(), "global", &global_observer);
 
        if (prune_context) {
                ast_sched_context_destroy(prune_context);
        }
+
+       distributor_pool_shutdown();
+
+       ao2_cleanup(unidentified_requests);
 }