res_pjsip: Refactor endpt_send_transaction (qualify_timeout)
authorGeorge Joseph <george.joseph@fairview5.com>
Wed, 20 May 2015 16:11:20 +0000 (10:11 -0600)
committerGeorge Joseph <george.joseph@fairview5.com>
Fri, 22 May 2015 15:17:32 +0000 (10:17 -0500)
This patch refactors the transaction timeout processing to eliminate
calling the lower level public pjsip functions and reverts to calling
pjsip_endpt_send_request again.  This is the result of me noticing
a possible incompatibility with pjproject-2.4 which was causing
contact status flapping.

The original version of this feature used the lower level calls to
get access to the tsx structure in order to cancel the transaction
when our own timer expires. Since we no longer have that access,
if our own timer expires before the pjsip timer, we call the callbacks
and just let the pjsip transaction take it's own course.  When the
transaction ends, it discovers the callbacks have already been run
and just cleans itself up.

A few messages in pjsip_configuration were also added/cleaned up.

ASTERISK-25105 #close

Change-Id: I0810f3999cf63f3a72607bbecac36af0a957f33e
Reported-by: George Joseph <george.joseph@fairview5.com>
Tested-by: George Joseph <george.joseph@fairview5.com>

include/asterisk/res_pjsip.h
res/res_pjsip.c
res/res_pjsip/pjsip_configuration.c

index 67c9c4b..4023014 100644 (file)
@@ -1294,6 +1294,13 @@ int ast_sip_send_request(pjsip_tx_data *tdata, struct pjsip_dialog *dlg,
  *
  * \retval 0 Success
  * \retval -1 Failure (out-of-dialog callback will not be called.)
+ *
+ * \note Timeout processing:
+ * There are 2 timers associated with this request, PJSIP timer_b which is
+ * set globally in the "system" section of pjsip.conf, and the timeout specified
+ * on this call.  The timer that expires first (before normal completion) will
+ * cause the callback to be run with e->body.tsx_state.type = PJSIP_EVENT_TIMER.
+ * The timer that expires second is simply ignored and the callback is not run again.
  */
 int ast_sip_send_out_of_dialog_request(pjsip_tx_data *tdata,
        struct ast_sip_endpoint *endpoint, int timeout, void *token,
index 3582fae..7bf4897 100644 (file)
@@ -2842,126 +2842,6 @@ static pj_bool_t does_method_match(const pj_str_t *message_method, const char *s
 #define TIMER_INACTIVE         0
 #define TIMEOUT_TIMER2         5
 
-struct tsx_data {
-       void *token;
-       void (*cb)(void*, pjsip_event*);
-       pjsip_transaction *tsx;
-       pj_timer_entry *timeout_timer;
-};
-
-static void send_tsx_on_tsx_state(pjsip_transaction *tsx, pjsip_event *event);
-
-pjsip_module send_tsx_module = {
-    .name = { "send_tsx_module", 23 },
-    .id = -1,
-    .priority = PJSIP_MOD_PRIORITY_APPLICATION,
-    .on_tsx_state = &send_tsx_on_tsx_state,
-};
-
-/*! \brief This is the pjsip_tsx_send_msg callback */
-static void send_tsx_on_tsx_state(pjsip_transaction *tsx, pjsip_event *event)
-{
-       struct tsx_data *tsx_data;
-
-       if (event->type != PJSIP_EVENT_TSX_STATE) {
-               return;
-       }
-
-       tsx_data = (struct tsx_data*) tsx->mod_data[send_tsx_module.id];
-       if (tsx_data == NULL) {
-               return;
-       }
-
-       if (tsx->status_code < 200) {
-               return;
-       }
-
-       if (event->body.tsx_state.type == PJSIP_EVENT_TIMER) {
-               ast_debug(1, "PJSIP tsx timer expired\n");
-       }
-
-       if (tsx_data->timeout_timer && tsx_data->timeout_timer->id != TIMER_INACTIVE) {
-               pj_mutex_lock(tsx->mutex_b);
-               pj_timer_heap_cancel_if_active(pjsip_endpt_get_timer_heap(tsx->endpt),
-                       tsx_data->timeout_timer, TIMER_INACTIVE);
-               pj_mutex_unlock(tsx->mutex_b);
-       }
-
-       /* Call the callback, if any, and prevent the callback from being called again
-        * by clearing the transaction's module_data.
-        */
-       tsx->mod_data[send_tsx_module.id] = NULL;
-
-       if (tsx_data->cb) {
-               (*tsx_data->cb)(tsx_data->token, event);
-       }
-}
-
-static void tsx_timer_callback(pj_timer_heap_t *theap, pj_timer_entry *entry)
-{
-       struct tsx_data *tsx_data = entry->user_data;
-
-       entry->id = TIMER_INACTIVE;
-       ast_debug(1, "Internal tsx timer expired\n");
-       pjsip_tsx_terminate(tsx_data->tsx, PJSIP_SC_TSX_TIMEOUT);
-}
-
-static pj_status_t endpt_send_transaction(pjsip_endpoint *endpt,
-       pjsip_tx_data *tdata, int timeout, void *token,
-       pjsip_endpt_send_callback cb)
-{
-       pjsip_transaction *tsx;
-       struct tsx_data *tsx_data;
-       pj_status_t status;
-       pjsip_event event;
-
-       ast_assert(endpt && tdata);
-
-       status = pjsip_tsx_create_uac(&send_tsx_module, tdata, &tsx);
-       if (status != PJ_SUCCESS) {
-               pjsip_tx_data_dec_ref(tdata);
-               ast_log(LOG_ERROR, "Unable to create pjsip uac\n");
-               return status;
-       }
-
-       tsx_data = PJ_POOL_ALLOC_T(tsx->pool, struct tsx_data);
-       tsx_data->token = token;
-       tsx_data->cb = cb;
-       tsx_data->tsx = tsx;
-       if (timeout > 0) {
-               tsx_data->timeout_timer = PJ_POOL_ALLOC_T(tsx->pool, pj_timer_entry);
-       } else {
-               tsx_data->timeout_timer = NULL;
-       }
-       tsx->mod_data[send_tsx_module.id] = tsx_data;
-
-       PJSIP_EVENT_INIT_TX_MSG(event, tdata);
-       pjsip_tx_data_set_transport(tdata, &tsx->tp_sel);
-
-       if (timeout > 0) {
-               pj_time_val timeout_timer_val = { timeout / 1000, timeout % 1000 };
-
-               pj_timer_entry_init(tsx_data->timeout_timer, TIMEOUT_TIMER2,
-                       tsx_data, &tsx_timer_callback);
-               pj_mutex_lock(tsx->mutex_b);
-               pj_timer_heap_cancel_if_active(pjsip_endpt_get_timer_heap(tsx->endpt),
-                       tsx_data->timeout_timer, TIMER_INACTIVE);
-               pj_timer_heap_schedule(pjsip_endpt_get_timer_heap(tsx->endpt),
-                       tsx_data->timeout_timer, &timeout_timer_val);
-               tsx_data->timeout_timer->id = TIMEOUT_TIMER2;
-               pj_mutex_unlock(tsx->mutex_b);
-       }
-
-       status = (*tsx->state_handler)(tsx, &event);
-       pjsip_tx_data_dec_ref(tdata);
-       if (status != PJ_SUCCESS) {
-               ast_log(LOG_ERROR, "Unable to send message\n");
-               return status;
-       }
-
-       return status;
-}
-
 /*! \brief Structure to hold information about an outbound request */
 struct send_request_data {
        /*! The endpoint associated with this request */
@@ -3006,41 +2886,212 @@ struct send_request_wrapper {
        void (*callback)(void *token, pjsip_event *e);
        /*! Non-zero when the callback is called. */
        unsigned int cb_called;
+       /*! Timeout timer. */
+       pj_timer_entry *timeout_timer;
+       /*! Original timeout. */
+       pj_int32_t timeout;
+       /*! Timeout/cleanup lock. */
+       pj_mutex_t *lock;
+       /*! The transmit data. */
+       pjsip_tx_data *tdata;
 };
 
-static void endpt_send_request_wrapper(void *token, pjsip_event *e)
+/*! \internal This function gets called by pjsip when the transaction ends,
+ * even if it timed out.  The lock prevents a race condition if both the pjsip
+ * transaction timer and our own timer expire simultaneously.
+ */
+static void endpt_send_request_cb(void *token, pjsip_event *e)
 {
        struct send_request_wrapper *req_wrapper = token;
 
-       req_wrapper->cb_called = 1;
-       if (req_wrapper->callback) {
+       if (e->body.tsx_state.type == PJSIP_EVENT_TIMER) {
+               ast_debug(2, "%p: PJSIP tsx timer expired\n", req_wrapper);
+
+               if (req_wrapper->timeout_timer
+                       && req_wrapper->timeout_timer->id != TIMEOUT_TIMER2) {
+                       ast_debug(3, "%p: Timeout already handled\n", req_wrapper);
+                       ao2_ref(req_wrapper, -1);
+                       return;
+               }
+       } else {
+               ast_debug(2, "%p: PJSIP tsx response received\n", req_wrapper);
+       }
+
+       pj_mutex_lock(req_wrapper->lock);
+
+       /* It's possible that our own timer was already processing while
+        * we were waiting on the lock so check the timer id.  If it's
+        * still TIMER2 then we still need to process.
+        */
+       if (req_wrapper->timeout_timer
+               && req_wrapper->timeout_timer->id == TIMEOUT_TIMER2) {
+               int timers_cancelled = 0;
+
+               ast_debug(3, "%p: Cancelling timer\n", req_wrapper);
+
+               timers_cancelled = pj_timer_heap_cancel_if_active(
+                       pjsip_endpt_get_timer_heap(ast_sip_get_pjsip_endpoint()),
+                       req_wrapper->timeout_timer, TIMER_INACTIVE);
+
+               if (timers_cancelled > 0) {
+                       /* If the timer was cancelled the callback will never run so
+                        * clean up its reference to the wrapper.
+                        */
+                       ast_debug(3, "%p: Timer cancelled\n", req_wrapper);
+                       ao2_ref(req_wrapper, -1);
+               } else {
+                       /* If it wasn't cancelled, it MAY be in the callback already
+                        * waiting on the lock so set the id to INACTIVE so
+                        * when the callback comes out of the lock, it knows to not
+                        * proceed.
+                        */
+                       ast_debug(3, "%p: Timer already expired\n", req_wrapper);
+                       req_wrapper->timeout_timer->id = TIMER_INACTIVE;
+               }
+       }
+
+       /* It's possible that our own timer expired and called the callbacks
+        * so no need to call them again.
+        */
+       if (!req_wrapper->cb_called && req_wrapper->callback) {
                req_wrapper->callback(req_wrapper->token, e);
+               req_wrapper->cb_called = 1;
+               ast_debug(2, "%p: Callbacks executed\n", req_wrapper);
        }
+       pj_mutex_unlock(req_wrapper->lock);
        ao2_ref(req_wrapper, -1);
 }
 
+/*! \internal This function gets called by our own timer when it expires.
+ * If the timer is cancelled however, the function does NOT get called.
+ * The lock prevents a race condition if both the pjsip transaction timer
+ * and our own timer expire simultaneously.
+ */
+static void send_request_timer_callback(pj_timer_heap_t *theap, pj_timer_entry *entry)
+{
+       pjsip_event event;
+       struct send_request_wrapper *req_wrapper = entry->user_data;
+
+       ast_debug(2, "%p: Internal tsx timer expired after %d msec\n",
+               req_wrapper, req_wrapper->timeout);
+
+       pj_mutex_lock(req_wrapper->lock);
+       /* If the id is not TIMEOUT_TIMER2 then the timer was cancelled above
+        * while the lock was being held so just clean up.
+        */
+       if (entry->id != TIMEOUT_TIMER2) {
+               pj_mutex_unlock(req_wrapper->lock);
+               ast_debug(3, "%p: Timeout already handled\n", req_wrapper);
+               ao2_ref(req_wrapper, -1);
+               return;
+       }
+
+       ast_debug(3, "%p: Timer handled here\n", req_wrapper);
+
+       PJSIP_EVENT_INIT_TX_MSG(event, req_wrapper->tdata);
+       event.body.tsx_state.type = PJSIP_EVENT_TIMER;
+       entry->id = TIMER_INACTIVE;
+
+       if (!req_wrapper->cb_called && req_wrapper->callback) {
+               req_wrapper->callback(req_wrapper->token, &event);
+               req_wrapper->cb_called = 1;
+               ast_debug(2, "%p: Callbacks executed\n", req_wrapper);
+       }
+
+       pj_mutex_unlock(req_wrapper->lock);
+       ao2_ref(req_wrapper, -1);
+}
+
+static void send_request_wrapper_destructor(void *obj)
+{
+       struct send_request_wrapper *req_wrapper = obj;
+
+       pj_mutex_destroy(req_wrapper->lock);
+       pjsip_tx_data_dec_ref(req_wrapper->tdata);
+       ast_debug(2, "%p: wrapper destroyed\n", req_wrapper);
+}
+
 static pj_status_t endpt_send_request(struct ast_sip_endpoint *endpoint,
-       pjsip_tx_data *tdata, int timeout, void *token, pjsip_endpt_send_callback cb)
+       pjsip_tx_data *tdata, pj_int32_t timeout, void *token, pjsip_endpt_send_callback cb)
 {
        struct send_request_wrapper *req_wrapper;
        pj_status_t ret_val;
+       pjsip_endpoint *endpt = ast_sip_get_pjsip_endpoint();
 
        /* Create wrapper to detect if the callback was actually called on an error. */
-       req_wrapper = ao2_alloc_options(sizeof(*req_wrapper), NULL,
+       req_wrapper = ao2_alloc_options(sizeof(*req_wrapper), send_request_wrapper_destructor,
                AO2_ALLOC_OPT_LOCK_NOLOCK);
        if (!req_wrapper) {
                pjsip_tx_data_dec_ref(tdata);
                return PJ_ENOMEM;
        }
+
+       ast_debug(2, "%p: Wrapper created\n", req_wrapper);
+
        req_wrapper->token = token;
        req_wrapper->callback = cb;
+       req_wrapper->timeout = timeout;
+       req_wrapper->timeout_timer = NULL;
+       req_wrapper->lock = NULL;
+       req_wrapper->tdata = tdata;
+       /* Add a reference to tdata.  The wrapper destructor cleans it up. */
+       pjsip_tx_data_add_ref(tdata);
+
+       ret_val = pj_mutex_create_simple(tdata->pool, "tsx_timeout", &req_wrapper->lock);
+       if (ret_val != PJ_SUCCESS) {
+               char errmsg[PJ_ERR_MSG_SIZE];
+               pj_strerror(ret_val, errmsg, sizeof(errmsg));
+               ast_log(LOG_ERROR, "Error %d '%s' sending %.*s request to endpoint %s\n",
+                       (int) ret_val, errmsg, (int) pj_strlen(&tdata->msg->line.req.method.name),
+                       pj_strbuf(&tdata->msg->line.req.method.name),
+                       endpoint ? ast_sorcery_object_get_id(endpoint) : "<unknown>");
+               pjsip_tx_data_dec_ref(tdata);
+               ao2_ref(req_wrapper, -1);
+               return PJ_ENOMEM;
+       }
+
+       pj_mutex_lock(req_wrapper->lock);
+
+       if (timeout > 0) {
+               pj_time_val timeout_timer_val = { timeout / 1000, timeout % 1000 };
+
+               req_wrapper->timeout_timer = PJ_POOL_ALLOC_T(tdata->pool, pj_timer_entry);
+
+               ast_debug(2, "%p: Set timer to %d msec\n", req_wrapper, timeout);
 
+               pj_timer_entry_init(req_wrapper->timeout_timer, TIMEOUT_TIMER2,
+                       req_wrapper, &send_request_timer_callback);
+
+               pj_timer_heap_cancel_if_active(pjsip_endpt_get_timer_heap(endpt),
+                       req_wrapper->timeout_timer, TIMER_INACTIVE);
+
+               /* We need to insure that the wrapper and tdata are available if/when the
+                * timer callback is executed.
+                */
+               ao2_ref(req_wrapper, +1);
+               pj_timer_heap_schedule(pjsip_endpt_get_timer_heap(endpt),
+                       req_wrapper->timeout_timer, &timeout_timer_val);
+
+               req_wrapper->timeout_timer->id = TIMEOUT_TIMER2;
+       } else {
+               req_wrapper->timeout_timer = NULL;
+       }
+
+       /* We need to insure that the wrapper and tdata are available when the
+        * transaction callback is executed.
+        */
        ao2_ref(req_wrapper, +1);
-       ret_val = endpt_send_transaction(ast_sip_get_pjsip_endpoint(), tdata, timeout,
-               req_wrapper, endpt_send_request_wrapper);
+
+       ret_val = pjsip_endpt_send_request(endpt, tdata, -1, req_wrapper, endpt_send_request_cb);
        if (ret_val != PJ_SUCCESS) {
                char errmsg[PJ_ERR_MSG_SIZE];
 
+               if (timeout > 0) {
+                       pj_timer_heap_cancel_if_active(pjsip_endpt_get_timer_heap(endpt),
+                               req_wrapper->timeout_timer, TIMER_INACTIVE);
+                       ao2_ref(req_wrapper, -1);
+               }
+
                /* Complain of failure to send the request. */
                pj_strerror(ret_val, errmsg, sizeof(errmsg));
                ast_log(LOG_ERROR, "Error %d '%s' sending %.*s request to endpoint %s\n",
@@ -3061,6 +3112,7 @@ static pj_status_t endpt_send_request(struct ast_sip_endpoint *endpoint,
                        ao2_ref(req_wrapper, -1);
                }
        }
+       pj_mutex_unlock(req_wrapper->lock);
        ao2_ref(req_wrapper, -1);
        return ret_val;
 }
@@ -3076,10 +3128,6 @@ static void send_request_cb(void *token, pjsip_event *e)
        int res;
 
        switch(e->body.tsx_state.type) {
-       case PJSIP_EVENT_USER:
-               /* Map USER (transaction cancelled by timeout) to TIMER */
-               e->body.tsx_state.type = PJSIP_EVENT_TIMER;
-               break;
        case PJSIP_EVENT_TRANSPORT_ERROR:
        case PJSIP_EVENT_TIMER:
                break;
@@ -3695,25 +3743,8 @@ static int load_module(void)
                return AST_MODULE_LOAD_DECLINE;
        }
 
-       if (internal_sip_register_service(&send_tsx_module)) {
-               ast_log(LOG_ERROR, "Failed to initialize send request module. Aborting load\n");
-               internal_sip_unregister_service(&supplement_module);
-               ast_sip_destroy_distributor();
-               ast_res_pjsip_destroy_configuration();
-               ast_sip_destroy_global_headers();
-               stop_monitor_thread();
-               ast_sip_destroy_system();
-               pj_pool_release(memory_pool);
-               memory_pool = NULL;
-               pjsip_endpt_destroy(ast_pjsip_endpoint);
-               ast_pjsip_endpoint = NULL;
-               pj_caching_pool_destroy(&caching_pool);
-               return AST_MODULE_LOAD_DECLINE;
-       }
-
        if (internal_sip_initialize_outbound_authentication()) {
                ast_log(LOG_ERROR, "Failed to initialize outbound authentication. Aborting load\n");
-               internal_sip_unregister_service(&send_tsx_module);
                internal_sip_unregister_service(&supplement_module);
                ast_sip_destroy_distributor();
                ast_res_pjsip_destroy_configuration();
@@ -3757,7 +3788,6 @@ static int unload_pjsip(void *data)
        ast_res_pjsip_destroy_configuration();
        ast_sip_destroy_system();
        ast_sip_destroy_global_headers();
-       internal_sip_unregister_service(&send_tsx_module);
        internal_sip_unregister_service(&supplement_module);
        if (monitor_thread) {
                stop_monitor_thread();
index f147b34..9fa18c7 100644 (file)
@@ -121,9 +121,20 @@ static int persistent_endpoint_update_state(void *obj, void *arg, int flags)
 /*! \brief Function called when stuff relating to a contact happens (created/deleted) */
 static void persistent_endpoint_contact_created_observer(const void *object)
 {
-       char *id = ast_strdupa(ast_sorcery_object_get_id(object)), *aor = NULL;
+       char *id = ast_strdupa(ast_sorcery_object_get_id(object));
+       char *aor = NULL;
+       char *contact = NULL;
+
+       aor = id;
+       /* Dynamic contacts are delimited with ";@" and static ones with "@@" */
+       if ((contact = strstr(id, ";@")) || (contact = strstr(id, "@@"))) {
+               *contact = '\0';
+               contact += 2;
+       } else {
+               contact = id;
+       }
 
-       aor = strsep(&id, ";@");
+       ast_verb(1, "Contact %s/%s has been created\n", aor, contact);
 
        ao2_callback(persistent_endpoints, OBJ_NODATA, persistent_endpoint_update_state, aor);
 }
@@ -144,7 +155,7 @@ static void persistent_endpoint_contact_deleted_observer(const void *object)
                contact = id;
        }
 
-       ast_verb(1, "Contact %s/%s is now Unavailable\n", aor, contact);
+       ast_verb(1, "Contact %s/%s has been deleted\n", aor, contact);
 
        ao2_callback(persistent_endpoints, OBJ_NODATA, persistent_endpoint_update_state, aor);
 }