bridge: Hold off more than one imparting channel at a time.
authorRichard Mudgett <rmudgett@digium.com>
Tue, 19 Apr 2016 21:58:32 +0000 (16:58 -0500)
committerRichard Mudgett <rmudgett@digium.com>
Wed, 20 Apr 2016 20:44:21 +0000 (15:44 -0500)
An earlier patch blocked the ast_bridge_impart() call until the channel
either entered the target bridge or it failed.  Unfortuantely, if the
target bridge is stasis and the imprted channel is not a stasis channel,
stasis bounces the channel out of the bridge to come back into the bridge
as a proper stasis channel.  When the channel is bounced out, that
released the block on ast_bridge_impart() to continue.  If the impart was
a result of a transfer, then it became a race to see if the swap channel
would get hung up before the imparted channel could come back into the
stasis bridge.  If the imparted channel won then everything is fine.  If
the swap channel gets hung up first then the transfer will fail because
the swap channel is leaving the bridge.

* Allow a chain of ast_bridge_impart()'s to happen before any are
unblocked to prevent the race condition described above.  When the channel
finally joins the bridge or completely fails to join the bridge then the
ast_bridge_impart() instances are unblocked.

ASTERISK-25947
Reported by: Richard Mudgett

ASTERISK-24649
Reported by: John Bigelow

ASTERISK-24782
Reported by: John Bigelow

Change-Id: I8fef369171f295f580024ab4971e95c799d0dde1

include/asterisk/bridge_channel_internal.h
main/bridge.c
main/bridge_channel.c

index 7f7d5a8..fb8e781 100644 (file)
@@ -151,47 +151,20 @@ int bridge_channel_internal_push_full(struct ast_bridge_channel *bridge_channel,
 void bridge_channel_internal_pull(struct ast_bridge_channel *bridge_channel);
 
 /*!
- * \brief Internal bridge channel wait condition and associated result.
- */
-struct bridge_channel_internal_cond {
-       /*! Lock for the data structure */
-       ast_mutex_t lock;
-       /*! Wait condition */
-       ast_cond_t cond;
-       /*! Wait until done */
-       int done;
-       /*! The bridge channel */
-       struct ast_bridge_channel *bridge_channel;
-};
-
-/*!
- * \internal
- * \brief Wait for the expected signal.
- * \since 13.5.0
- *
- * \param cond the wait object
- *
- * \return Nothing
- */
-void bridge_channel_internal_wait(struct bridge_channel_internal_cond *cond);
-
-/*!
- * \internal
- * \brief Signal the condition wait.
- * \since 13.5.0
+ * \brief Signal imparting threads to wake up.
+ * \since 13.9.0
  *
- * \param cond the wait object
+ * \param chan Channel imparted that we need to signal.
  *
  * \return Nothing
  */
-void bridge_channel_internal_signal(struct bridge_channel_internal_cond *cond);
+void bridge_channel_impart_signal(struct ast_channel *chan);
 
 /*!
  * \internal
  * \brief Join the bridge_channel to the bridge (blocking)
  *
  * \param bridge_channel The Channel in the bridge
- * \param cond data used for signaling
  *
  * \note The bridge_channel->swap holds a channel reference for the swap
  * channel going into the bridging system.  The ref ensures that the swap
@@ -206,8 +179,7 @@ void bridge_channel_internal_signal(struct bridge_channel_internal_cond *cond);
  * \retval 0 bridge channel successfully joined the bridge
  * \retval -1 bridge channel failed to join the bridge
  */
-int bridge_channel_internal_join(struct ast_bridge_channel *bridge_channel,
-                                struct bridge_channel_internal_cond *cond);
+int bridge_channel_internal_join(struct ast_bridge_channel *bridge_channel);
 
 /*!
  * \internal
index b9a436e..7e72052 100644 (file)
@@ -1481,6 +1481,150 @@ void ast_bridge_notify_masquerade(struct ast_channel *chan)
        ao2_ref(bridge_channel, -1);
 }
 
+/*!
+ * \brief Internal bridge impart wait condition and associated conditional.
+ */
+struct bridge_channel_impart_cond {
+       AST_LIST_ENTRY(bridge_channel_impart_cond) node;
+       /*! Lock for the data structure */
+       ast_mutex_t lock;
+       /*! Wait condition */
+       ast_cond_t cond;
+       /*! Wait until done */
+       int done;
+};
+
+AST_LIST_HEAD_NOLOCK(bridge_channel_impart_ds_head, bridge_channel_impart_cond);
+
+/*!
+ * \internal
+ * \brief Signal imparting threads to wake up.
+ * \since 13.9.0
+ *
+ * \param ds_head List of imparting threads to wake up.
+ *
+ * \return Nothing
+ */
+static void bridge_channel_impart_ds_head_signal(struct bridge_channel_impart_ds_head *ds_head)
+{
+       if (ds_head) {
+               struct bridge_channel_impart_cond *cond;
+
+               while ((cond = AST_LIST_REMOVE_HEAD(ds_head, node))) {
+                       ast_mutex_lock(&cond->lock);
+                       cond->done = 1;
+                       ast_cond_signal(&cond->cond);
+                       ast_mutex_unlock(&cond->lock);
+               }
+       }
+}
+
+static void bridge_channel_impart_ds_head_dtor(void *doomed)
+{
+       bridge_channel_impart_ds_head_signal(doomed);
+       ast_free(doomed);
+}
+
+/*!
+ * \internal
+ * \brief Fixup the bridge impart datastore.
+ * \since 13.9.0
+ *
+ * \param data Bridge impart datastore data to fixup from old_chan.
+ * \param old_chan The datastore is moving from this channel.
+ * \param new_chan The datastore is moving to this channel.
+ *
+ * \return Nothing
+ */
+static void bridge_channel_impart_ds_head_fixup(void *data, struct ast_channel *old_chan, struct ast_channel *new_chan)
+{
+       /*
+        * Signal any waiting impart threads.  The masquerade is going to kill
+        * old_chan and we don't need to be waiting on new_chan.
+        */
+       bridge_channel_impart_ds_head_signal(data);
+}
+
+static const struct ast_datastore_info bridge_channel_impart_ds_info = {
+       .type = "bridge-impart-ds",
+       .destroy = bridge_channel_impart_ds_head_dtor,
+       .chan_fixup = bridge_channel_impart_ds_head_fixup,
+};
+
+/*!
+ * \internal
+ * \brief Add impart wait datastore conditional to channel.
+ * \since 13.9.0
+ *
+ * \param chan Channel to add the impart wait conditional.
+ * \param cond Imparting conditional to add.
+ *
+ * \retval 0 on success.
+ * \retval -1 on error.
+ */
+static int bridge_channel_impart_add(struct ast_channel *chan, struct bridge_channel_impart_cond *cond)
+{
+       struct ast_datastore *datastore;
+       struct bridge_channel_impart_ds_head *ds_head;
+
+       ast_channel_lock(chan);
+
+       datastore = ast_channel_datastore_find(chan, &bridge_channel_impart_ds_info, NULL);
+       if (!datastore) {
+               datastore = ast_datastore_alloc(&bridge_channel_impart_ds_info, NULL);
+               if (!datastore) {
+                       ast_channel_unlock(chan);
+                       return -1;
+               }
+               ds_head = ast_calloc(1, sizeof(*ds_head));
+               if (!ds_head) {
+                       ast_channel_unlock(chan);
+                       ast_datastore_free(datastore);
+                       return -1;
+               }
+               datastore->data = ds_head;
+               ast_channel_datastore_add(chan, datastore);
+       } else {
+               ds_head = datastore->data;
+               ast_assert(ds_head != NULL);
+       }
+
+       AST_LIST_INSERT_TAIL(ds_head, cond, node);
+
+       ast_channel_unlock(chan);
+       return 0;
+}
+
+void bridge_channel_impart_signal(struct ast_channel *chan)
+{
+       struct ast_datastore *datastore;
+
+       ast_channel_lock(chan);
+       datastore = ast_channel_datastore_find(chan, &bridge_channel_impart_ds_info, NULL);
+       if (datastore) {
+               bridge_channel_impart_ds_head_signal(datastore->data);
+       }
+       ast_channel_unlock(chan);
+}
+
+/*!
+ * \internal
+ * \brief Block imparting channel thread until signaled.
+ * \since 13.9.0
+ *
+ * \param cond Imparting conditional to wait for.
+ *
+ * \return Nothing
+ */
+static void bridge_channel_impart_wait(struct bridge_channel_impart_cond *cond)
+{
+       ast_mutex_lock(&cond->lock);
+       while (!cond->done) {
+               ast_cond_wait(&cond->cond, &cond->lock);
+       }
+       ast_mutex_unlock(&cond->lock);
+}
+
 /*
  * XXX ASTERISK-21271 make ast_bridge_join() require features to be allocated just like ast_bridge_impart() and not expect the struct back.
  *
@@ -1549,7 +1693,7 @@ int ast_bridge_join(struct ast_bridge *bridge,
        }
 
        if (!res) {
-               res = bridge_channel_internal_join(bridge_channel, NULL);
+               res = bridge_channel_internal_join(bridge_channel);
        }
 
        /* Cleanup all the data in the bridge channel after it leaves the bridge. */
@@ -1566,6 +1710,7 @@ int ast_bridge_join(struct ast_bridge *bridge,
 
 join_exit:;
        ast_bridge_run_after_callback(chan);
+       bridge_channel_impart_signal(chan);
        if (!(ast_channel_softhangup_internal_flag(chan) & AST_SOFTHANGUP_ASYNCGOTO)
                && !ast_bridge_setup_after_goto(chan)) {
                /* Claim the after bridge goto is an async goto destination. */
@@ -1579,14 +1724,13 @@ join_exit:;
 /*! \brief Thread responsible for imparted bridged channels to be departed */
 static void *bridge_channel_depart_thread(void *data)
 {
-       struct bridge_channel_internal_cond *cond = data;
-       struct ast_bridge_channel *bridge_channel = cond->bridge_channel;
+       struct ast_bridge_channel *bridge_channel = data;
 
        if (bridge_channel->callid) {
                ast_callid_threadassoc_add(bridge_channel->callid);
        }
 
-       bridge_channel_internal_join(bridge_channel, cond);
+       bridge_channel_internal_join(bridge_channel);
 
        /*
         * cleanup
@@ -1599,6 +1743,8 @@ static void *bridge_channel_depart_thread(void *data)
        bridge_channel->features = NULL;
 
        ast_bridge_discard_after_callback(bridge_channel->chan, AST_BRIDGE_AFTER_CB_REASON_DEPART);
+       /* If join failed there will be impart threads waiting. */
+       bridge_channel_impart_signal(bridge_channel->chan);
        ast_bridge_discard_after_goto(bridge_channel->chan);
 
        return NULL;
@@ -1607,15 +1753,14 @@ static void *bridge_channel_depart_thread(void *data)
 /*! \brief Thread responsible for independent imparted bridged channels */
 static void *bridge_channel_ind_thread(void *data)
 {
-       struct bridge_channel_internal_cond *cond = data;
-       struct ast_bridge_channel *bridge_channel = cond->bridge_channel;
+       struct ast_bridge_channel *bridge_channel = data;
        struct ast_channel *chan;
 
        if (bridge_channel->callid) {
                ast_callid_threadassoc_add(bridge_channel->callid);
        }
 
-       bridge_channel_internal_join(bridge_channel, cond);
+       bridge_channel_internal_join(bridge_channel);
        chan = bridge_channel->chan;
 
        /* cleanup */
@@ -1632,15 +1777,18 @@ static void *bridge_channel_ind_thread(void *data)
        ao2_ref(bridge_channel, -1);
 
        ast_bridge_run_after_callback(chan);
+       /* If join failed there will be impart threads waiting. */
+       bridge_channel_impart_signal(chan);
        ast_bridge_run_after_goto(chan);
        return NULL;
 }
 
-int ast_bridge_impart(struct ast_bridge *bridge,
+static int bridge_impart_internal(struct ast_bridge *bridge,
        struct ast_channel *chan,
        struct ast_channel *swap,
        struct ast_bridge_features *features,
-       enum ast_bridge_impart_flags flags)
+       enum ast_bridge_impart_flags flags,
+       struct bridge_channel_impart_cond *cond)
 {
        int res = 0;
        struct ast_bridge_channel *bridge_channel;
@@ -1699,27 +1847,20 @@ int ast_bridge_impart(struct ast_bridge *bridge,
 
        /* Actually create the thread that will handle the channel */
        if (!res) {
-               struct bridge_channel_internal_cond cond = {
-                       .done = 0,
-                       .bridge_channel = bridge_channel
-               };
-               ast_mutex_init(&cond.lock);
-               ast_cond_init(&cond.cond, NULL);
-
+               res = bridge_channel_impart_add(chan, cond);
+       }
+       if (!res) {
                if ((flags & AST_BRIDGE_IMPART_CHAN_MASK) == AST_BRIDGE_IMPART_CHAN_INDEPENDENT) {
                        res = ast_pthread_create_detached(&bridge_channel->thread, NULL,
-                               bridge_channel_ind_thread, &cond);
+                               bridge_channel_ind_thread, bridge_channel);
                } else {
                        res = ast_pthread_create(&bridge_channel->thread, NULL,
-                               bridge_channel_depart_thread, &cond);
+                               bridge_channel_depart_thread, bridge_channel);
                }
 
                if (!res) {
-                       bridge_channel_internal_wait(&cond);
+                       bridge_channel_impart_wait(cond);
                }
-
-               ast_cond_destroy(&cond.cond);
-               ast_mutex_destroy(&cond.lock);
        }
 
        if (res) {
@@ -1740,6 +1881,32 @@ int ast_bridge_impart(struct ast_bridge *bridge,
        return 0;
 }
 
+int ast_bridge_impart(struct ast_bridge *bridge,
+       struct ast_channel *chan,
+       struct ast_channel *swap,
+       struct ast_bridge_features *features,
+       enum ast_bridge_impart_flags flags)
+{
+       struct bridge_channel_impart_cond cond = {
+               .done = 0,
+       };
+       int res;
+
+       ast_mutex_init(&cond.lock);
+       ast_cond_init(&cond.cond, NULL);
+
+       res = bridge_impart_internal(bridge, chan, swap, features, flags, &cond);
+       if (res) {
+               /* Impart failed.  Signal any other waiting impart threads */
+               bridge_channel_impart_signal(chan);
+       }
+
+       ast_cond_destroy(&cond.cond);
+       ast_mutex_destroy(&cond.lock);
+
+       return res;
+}
+
 int ast_bridge_depart(struct ast_channel *chan)
 {
        struct ast_bridge_channel *bridge_channel;
index 66f26ee..db4ecfe 100644 (file)
@@ -2637,27 +2637,7 @@ static void bridge_channel_event_join_leave(struct ast_bridge_channel *bridge_ch
        ao2_iterator_destroy(&iter);
 }
 
-void bridge_channel_internal_wait(struct bridge_channel_internal_cond *cond)
-{
-       ast_mutex_lock(&cond->lock);
-       while (!cond->done) {
-               ast_cond_wait(&cond->cond, &cond->lock);
-       }
-       ast_mutex_unlock(&cond->lock);
-}
-
-void bridge_channel_internal_signal(struct bridge_channel_internal_cond *cond)
-{
-       if (cond) {
-               ast_mutex_lock(&cond->lock);
-               cond->done = 1;
-               ast_cond_signal(&cond->cond);
-               ast_mutex_unlock(&cond->lock);
-       }
-}
-
-int bridge_channel_internal_join(struct ast_bridge_channel *bridge_channel,
-                                struct bridge_channel_internal_cond *cond)
+int bridge_channel_internal_join(struct ast_bridge_channel *bridge_channel)
 {
        int res = 0;
        struct ast_bridge_features *channel_features;
@@ -2687,7 +2667,6 @@ int bridge_channel_internal_join(struct ast_bridge_channel *bridge_channel,
                        bridge_channel->bridge->uniqueid,
                        bridge_channel,
                        ast_channel_name(bridge_channel->chan));
-               bridge_channel_internal_signal(cond);
                return -1;
        }
        ast_channel_internal_bridge_set(bridge_channel->chan, bridge_channel->bridge);
@@ -2722,8 +2701,6 @@ int bridge_channel_internal_join(struct ast_bridge_channel *bridge_channel,
        }
        bridge_reconfigured(bridge_channel->bridge, !bridge_channel->inhibit_colp);
 
-       bridge_channel_internal_signal(cond);
-
        if (bridge_channel->state == BRIDGE_CHANNEL_STATE_WAIT) {
                /*
                 * Indicate a source change since this channel is entering the
@@ -2735,6 +2712,7 @@ int bridge_channel_internal_join(struct ast_bridge_channel *bridge_channel,
                        ast_indicate(bridge_channel->chan, AST_CONTROL_SRCCHANGE);
                }
 
+               bridge_channel_impart_signal(bridge_channel->chan);
                ast_bridge_unlock(bridge_channel->bridge);
 
                /* Must release any swap ref after unlocking the bridge. */