ARI: Add ability to raise arbitrary User Events
[asterisk/asterisk.git] / main / bridge_channel.c
index 96bfb20..2b37f25 100644 (file)
@@ -35,6 +35,7 @@
 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
 
 #include <signal.h>
+#include <semaphore.h>
 
 #include "asterisk/heap.h"
 #include "asterisk/astobj2.h"
@@ -70,6 +71,142 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
  */
 typedef int (*ast_bridge_channel_post_action_data)(struct ast_bridge_channel *bridge_channel, enum bridge_channel_action_type action, const void *data, size_t datalen);
 
+/*!
+ * \brief Counter used for assigning synchronous bridge action IDs
+ */
+static int sync_ids;
+
+/*!
+ * \brief Frame payload for synchronous bridge actions.
+ *
+ * The payload serves as a wrapper around the actual payload of the
+ * frame, with the addition of an id used to find the associated
+ * bridge_sync object.
+ */
+struct sync_payload {
+       /*! Unique ID for this synchronous action */
+       unsigned int id;
+       /*! Actual frame data to process */
+       unsigned char data[0];
+};
+
+/*!
+ * \brief Synchronous bridge action object.
+ *
+ * Synchronous bridge actions require the ability for one thread to wait
+ * and for another thread to indicate that the action has completed. This
+ * structure facilitates that goal by providing synchronization structures.
+ */
+struct bridge_sync {
+       /*! Unique ID of this synchronization object. Corresponds with ID in synchronous frame payload */
+       unsigned int id;
+       /*! Semaphore used for synchronization */
+       sem_t sem;
+       /*! Pointer to next entry in the list */
+       AST_LIST_ENTRY(bridge_sync) list;
+};
+
+/*!
+ * \brief List holding active synchronous action objects.
+ */
+static AST_RWLIST_HEAD_STATIC(sync_structs, bridge_sync);
+
+/*!
+ * \brief initialize a synchronous bridge object.
+ *
+ * This both initializes the structure and adds it to the list of
+ * synchronization structures.
+ *
+ * \param sync_struct The synchronization object to initialize.
+ * \param id ID to assign to the synchronization object.
+ */
+static void bridge_sync_init(struct bridge_sync *sync_struct, unsigned int id)
+{
+       memset(sync_struct, 0, sizeof(*sync_struct));
+       sync_struct->id = id;
+       sem_init(&sync_struct->sem, 0, 0);
+
+       AST_RWLIST_WRLOCK(&sync_structs);
+       AST_RWLIST_INSERT_TAIL(&sync_structs, sync_struct, list);
+       AST_RWLIST_UNLOCK(&sync_structs);
+}
+
+/*!
+ * \brief Clean up a syncrhonization bridge object.
+ *
+ * This frees fields within the synchronization object and removes
+ * it from the list of active synchronization objects.
+ *
+ * Since synchronization objects are stack-allocated, it is vital
+ * that this is called before the synchronization object goes
+ * out of scope.
+ *
+ * \param sync_struct Synchronization object to clean up.
+ */
+static void bridge_sync_cleanup(struct bridge_sync *sync_struct)
+{
+       struct bridge_sync *iter;
+
+       AST_RWLIST_WRLOCK(&sync_structs);
+       AST_LIST_TRAVERSE_SAFE_BEGIN(&sync_structs, iter, list) {
+               if (iter->id == sync_struct->id) {
+                       AST_LIST_REMOVE_CURRENT(list);
+                       break;
+               }
+       }
+       AST_LIST_TRAVERSE_SAFE_END;
+       AST_RWLIST_UNLOCK(&sync_structs);
+
+       sem_destroy(&sync_struct->sem);
+}
+
+/*!
+ * \brief Failsafe for synchronous bridge action waiting.
+ *
+ * When waiting for a synchronous bridge action to complete,
+ * if there is a frame resource leak somewhere, it is possible
+ * that we will never get notified that the synchronous action
+ * completed.
+ *
+ * If a significant amount of time passes, then we will abandon
+ * waiting for the synchrnous bridge action to complete.
+ *
+ * This constant represents the number of milliseconds we will
+ * wait for the bridge action to complete.
+ */
+#define PLAYBACK_TIMEOUT (600 * 1000)
+
+/*!
+ * \brief Wait for a synchronous bridge action to complete.
+ *
+ * \param sync_struct Synchronization object corresponding to the bridge action.
+ */
+static void bridge_sync_wait(struct bridge_sync *sync_struct)
+{
+       struct timeval timeout_val = ast_tvadd(ast_tvnow(), ast_samp2tv(PLAYBACK_TIMEOUT, 1000));
+       struct timespec timeout_spec = {
+               .tv_sec = timeout_val.tv_sec,
+               .tv_nsec = timeout_val.tv_usec * 1000,
+       };
+
+       sem_timedwait(&sync_struct->sem, &timeout_spec);
+}
+
+/*!
+ * \brief Signal that waiting for a synchronous bridge action is no longer necessary.
+ *
+ * This may occur for several reasons
+ * \li The synchronous bridge action has completed.
+ * \li The bridge channel has been removed from the bridge.
+ * \li The synchronous bridge action could not be queued.
+ *
+ * \param sync_struct Synchronization object corresponding to the bridge action.
+ */
+static void bridge_sync_signal(struct bridge_sync *sync_struct)
+{
+       sem_post(&sync_struct->sem);
+}
+
 void ast_bridge_channel_lock_bridge(struct ast_bridge_channel *bridge_channel)
 {
        struct ast_bridge *bridge;
@@ -146,7 +283,7 @@ void ast_bridge_channel_leave_bridge_nolock(struct ast_bridge_channel *bridge_ch
                return;
        }
 
-       ast_debug(1, "Setting %p(%s) state from:%d to:%d\n",
+       ast_debug(1, "Setting %p(%s) state from:%u to:%u\n",
                bridge_channel, ast_channel_name(bridge_channel->chan), bridge_channel->state,
                new_state);
 
@@ -222,25 +359,28 @@ void ast_bridge_channel_update_linkedids(struct ast_bridge_channel *bridge_chann
 {
        struct ast_bridge_channel *other = NULL;
        struct ast_bridge *bridge = bridge_channel->bridge;
-       const char *oldest_linkedid = ast_channel_linkedid(bridge_channel->chan);
+       struct ast_channel *oldest_linkedid_chan = bridge_channel->chan;
 
        AST_LIST_TRAVERSE(&bridge->channels, other, entry) {
                if (other == swap) {
                        continue;
                }
-               oldest_linkedid = ast_channel_oldest_linkedid(oldest_linkedid, ast_channel_linkedid(other->chan));
+               oldest_linkedid_chan = ast_channel_internal_oldest_linkedid(
+                       oldest_linkedid_chan, other->chan);
        }
 
-       if (ast_strlen_zero(oldest_linkedid)) {
-               return;
-       }
-
-       ast_channel_linkedid_set(bridge_channel->chan, oldest_linkedid);
+       ast_channel_lock(bridge_channel->chan);
+       ast_channel_internal_copy_linkedid(bridge_channel->chan,
+               oldest_linkedid_chan);
+       ast_channel_unlock(bridge_channel->chan);
        AST_LIST_TRAVERSE(&bridge->channels, other, entry) {
                if (other == swap) {
                        continue;
                }
-               ast_channel_linkedid_set(other->chan, oldest_linkedid);
+               ast_channel_lock(other->chan);
+               ast_channel_internal_copy_linkedid(other->chan,
+                       oldest_linkedid_chan);
+               ast_channel_unlock(other->chan);
        }
 }
 
@@ -253,6 +393,7 @@ void ast_bridge_channel_update_accountcodes(struct ast_bridge_channel *bridge_ch
                if (other == swap) {
                        continue;
                }
+               ast_channel_lock_both(bridge_channel->chan, other->chan);
 
                if (!ast_strlen_zero(ast_channel_accountcode(bridge_channel->chan)) && ast_strlen_zero(ast_channel_peeraccount(other->chan))) {
                        ast_debug(1, "Setting peeraccount to %s for %s from data on channel %s\n",
@@ -286,6 +427,8 @@ void ast_bridge_channel_update_accountcodes(struct ast_bridge_channel *bridge_ch
                                ast_channel_peeraccount_set(bridge_channel->chan, ast_channel_accountcode(other->chan));
                        }
                }
+               ast_channel_unlock(bridge_channel->chan);
+               ast_channel_unlock(other->chan);
        }
 }
 
@@ -336,6 +479,8 @@ void ast_bridge_channel_kick(struct ast_bridge_channel *bridge_channel, int caus
  */
 static int bridge_channel_write_frame(struct ast_bridge_channel *bridge_channel, struct ast_frame *frame)
 {
+       ast_assert(frame->frametype != AST_FRAME_BRIDGE_ACTION_SYNC);
+
        ast_bridge_channel_lock_bridge(bridge_channel);
 /*
  * XXX need to implement a deferred write queue for when there
@@ -487,7 +632,8 @@ static void bridge_channel_unsuspend(struct ast_bridge_channel *bridge_channel)
  * \retval 0 on success.
  * \retval -1 on error.
  */
-static int bridge_channel_queue_action_data(struct ast_bridge_channel *bridge_channel, enum bridge_channel_action_type action, const void *data, size_t datalen)
+static int bridge_channel_queue_action_data(struct ast_bridge_channel *bridge_channel,
+       enum bridge_channel_action_type action, const void *data, size_t datalen)
 {
        struct ast_frame frame = {
                .frametype = AST_FRAME_BRIDGE_ACTION,
@@ -501,6 +647,52 @@ static int bridge_channel_queue_action_data(struct ast_bridge_channel *bridge_ch
 
 /*!
  * \internal
+ * \brief Queue an action frame onto the bridge channel with data synchronously.
+ * \since 12.2.0
+ *
+ * The function will not return until the queued frame is freed.
+ *
+ * \param bridge_channel Which channel to queue the frame onto.
+ * \param action Type of bridge action frame.
+ * \param data Frame payload data to pass.
+ * \param datalen Frame payload data length to pass.
+ *
+ * \retval 0 on success.
+ * \retval -1 on error.
+ */
+static int bridge_channel_queue_action_data_sync(struct ast_bridge_channel *bridge_channel,
+       enum bridge_channel_action_type action, const void *data, size_t datalen)
+{
+       struct sync_payload *sync_payload;
+       int sync_payload_len = sizeof(*sync_payload) + datalen;
+       struct bridge_sync sync_struct;
+       struct ast_frame frame = {
+               .frametype = AST_FRAME_BRIDGE_ACTION_SYNC,
+               .subclass.integer = action,
+       };
+
+       /* Make sure we don't end up trying to wait on ourself to deliver the frame */
+       ast_assert(!pthread_equal(pthread_self(), bridge_channel->thread));
+
+       sync_payload = ast_alloca(sync_payload_len);
+       sync_payload->id = ast_atomic_fetchadd_int(&sync_ids, +1);
+       memcpy(sync_payload->data, data, datalen);
+
+       frame.datalen = sync_payload_len;
+       frame.data.ptr = sync_payload;
+
+       bridge_sync_init(&sync_struct, sync_payload->id);
+       if (ast_bridge_channel_queue_frame(bridge_channel, &frame)) {
+               bridge_sync_cleanup(&sync_struct);
+               return -1;
+       }
+
+       bridge_sync_wait(&sync_struct);
+       bridge_sync_cleanup(&sync_struct);
+       return 0;
+}
+/*!
+ * \internal
  * \brief Write an action frame onto the bridge channel with data.
  * \since 12.0.0
  *
@@ -512,7 +704,8 @@ static int bridge_channel_queue_action_data(struct ast_bridge_channel *bridge_ch
  * \retval 0 on success.
  * \retval -1 on error.
  */
-static int bridge_channel_write_action_data(struct ast_bridge_channel *bridge_channel, enum bridge_channel_action_type action, const void *data, size_t datalen)
+static int bridge_channel_write_action_data(struct ast_bridge_channel *bridge_channel,
+       enum bridge_channel_action_type action, const void *data, size_t datalen)
 {
        struct ast_frame frame = {
                .frametype = AST_FRAME_BRIDGE_ACTION,
@@ -524,6 +717,27 @@ static int bridge_channel_write_action_data(struct ast_bridge_channel *bridge_ch
        return bridge_channel_write_frame(bridge_channel, &frame);
 }
 
+static void bridge_frame_free(struct ast_frame *frame)
+{
+       if (frame->frametype == AST_FRAME_BRIDGE_ACTION_SYNC) {
+               struct sync_payload *sync_payload = frame->data.ptr;
+               struct bridge_sync *sync;
+
+               AST_RWLIST_RDLOCK(&sync_structs);
+               AST_RWLIST_TRAVERSE(&sync_structs, sync, list) {
+                       if (sync->id == sync_payload->id) {
+                               break;
+                       }
+               }
+               if (sync) {
+                       bridge_sync_signal(sync);
+               }
+               AST_RWLIST_UNLOCK(&sync_structs);
+       }
+
+       ast_frfree(frame);
+}
+
 int ast_bridge_channel_queue_frame(struct ast_bridge_channel *bridge_channel, struct ast_frame *fr)
 {
        struct ast_frame *dup;
@@ -551,7 +765,7 @@ int ast_bridge_channel_queue_frame(struct ast_bridge_channel *bridge_channel, st
        if (bridge_channel->state != BRIDGE_CHANNEL_STATE_WAIT) {
                /* Drop frames on channels leaving the bridge. */
                ast_bridge_channel_unlock(bridge_channel);
-               ast_frfree(dup);
+               bridge_frame_free(dup);
                return 0;
        }
 
@@ -624,14 +838,18 @@ int ast_bridge_channel_write_hold(struct ast_bridge_channel *bridge_channel, con
                datalen = 0;
        }
 
+       ast_channel_lock(bridge_channel->chan);
        ast_channel_publish_blob(bridge_channel->chan, ast_channel_hold_type(), blob);
+       ast_channel_unlock(bridge_channel->chan);
        return ast_bridge_channel_write_control_data(bridge_channel, AST_CONTROL_HOLD,
                moh_class, datalen);
 }
 
 int ast_bridge_channel_write_unhold(struct ast_bridge_channel *bridge_channel)
 {
+       ast_channel_lock(bridge_channel->chan);
        ast_channel_publish_blob(bridge_channel->chan, ast_channel_unhold_type(), NULL);
+       ast_channel_unlock(bridge_channel->chan);
        return ast_bridge_channel_write_control_data(bridge_channel, AST_CONTROL_UNHOLD, NULL, 0);
 }
 
@@ -806,7 +1024,7 @@ static int payload_helper_playfile(ast_bridge_channel_post_action_data post_it,
        size_t len_payload = sizeof(*payload) + len_name + len_moh;
 
        /* Fill in play file frame data. */
-       payload = alloca(len_payload);
+       payload = ast_alloca(len_payload);
        payload->custom_play = custom_play;
        payload->moh_offset = len_moh ? len_name : 0;
        strcpy(payload->playfile, playfile);/* Safe */
@@ -829,6 +1047,13 @@ int ast_bridge_channel_queue_playfile(struct ast_bridge_channel *bridge_channel,
                bridge_channel, custom_play, playfile, moh_class);
 }
 
+int ast_bridge_channel_queue_playfile_sync(struct ast_bridge_channel *bridge_channel,
+               ast_bridge_custom_play_fn custom_play, const char *playfile, const char *moh_class)
+{
+       return payload_helper_playfile(bridge_channel_queue_action_data_sync,
+               bridge_channel, custom_play, playfile, moh_class);
+}
+
 struct bridge_custom_callback {
        /*! Call this function on the bridge channel thread. */
        ast_bridge_custom_callback_fn callback;
@@ -1379,53 +1604,55 @@ static void bridge_channel_attended_transfer(struct ast_bridge_channel *bridge_c
  *
  * \param bridge_channel Channel to execute the action on.
  * \param action What to do.
+ * \param data data from the action.
  *
  * \return Nothing
  */
-static void bridge_channel_handle_action(struct ast_bridge_channel *bridge_channel, struct ast_frame *action)
+static void bridge_channel_handle_action(struct ast_bridge_channel *bridge_channel,
+       enum bridge_channel_action_type action, void *data)
 {
-       switch (action->subclass.integer) {
+       switch (action) {
        case BRIDGE_CHANNEL_ACTION_DTMF_STREAM:
                bridge_channel_suspend(bridge_channel);
                ast_indicate(bridge_channel->chan, AST_CONTROL_SRCUPDATE);
-               bridge_channel_dtmf_stream(bridge_channel, action->data.ptr);
+               bridge_channel_dtmf_stream(bridge_channel, data);
                ast_indicate(bridge_channel->chan, AST_CONTROL_SRCUPDATE);
                bridge_channel_unsuspend(bridge_channel);
                break;
        case BRIDGE_CHANNEL_ACTION_TALKING_START:
        case BRIDGE_CHANNEL_ACTION_TALKING_STOP:
                bridge_channel_talking(bridge_channel,
-                       action->subclass.integer == BRIDGE_CHANNEL_ACTION_TALKING_START);
+                       action == BRIDGE_CHANNEL_ACTION_TALKING_START);
                break;
        case BRIDGE_CHANNEL_ACTION_PLAY_FILE:
                bridge_channel_suspend(bridge_channel);
                ast_indicate(bridge_channel->chan, AST_CONTROL_SRCUPDATE);
-               bridge_channel_playfile(bridge_channel, action->data.ptr);
+               bridge_channel_playfile(bridge_channel, data);
                ast_indicate(bridge_channel->chan, AST_CONTROL_SRCUPDATE);
                bridge_channel_unsuspend(bridge_channel);
                break;
        case BRIDGE_CHANNEL_ACTION_RUN_APP:
                bridge_channel_suspend(bridge_channel);
                ast_indicate(bridge_channel->chan, AST_CONTROL_SRCUPDATE);
-               bridge_channel_run_app(bridge_channel, action->data.ptr);
+               bridge_channel_run_app(bridge_channel, data);
                ast_indicate(bridge_channel->chan, AST_CONTROL_SRCUPDATE);
                bridge_channel_unsuspend(bridge_channel);
                break;
        case BRIDGE_CHANNEL_ACTION_CALLBACK:
-               bridge_channel_do_callback(bridge_channel, action->data.ptr);
+               bridge_channel_do_callback(bridge_channel, data);
                break;
        case BRIDGE_CHANNEL_ACTION_PARK:
                bridge_channel_suspend(bridge_channel);
                ast_indicate(bridge_channel->chan, AST_CONTROL_SRCUPDATE);
-               bridge_channel_park(bridge_channel, action->data.ptr);
+               bridge_channel_park(bridge_channel, data);
                ast_indicate(bridge_channel->chan, AST_CONTROL_SRCUPDATE);
                bridge_channel_unsuspend(bridge_channel);
                break;
        case BRIDGE_CHANNEL_ACTION_BLIND_TRANSFER:
-               bridge_channel_blind_transfer(bridge_channel, action->data.ptr);
+               bridge_channel_blind_transfer(bridge_channel, data);
                break;
        case BRIDGE_CHANNEL_ACTION_ATTENDED_TRANSFER:
-               bridge_channel_attended_transfer(bridge_channel, action->data.ptr);
+               bridge_channel_attended_transfer(bridge_channel, data);
                break;
        default:
                break;
@@ -1690,6 +1917,7 @@ static void bridge_channel_handle_write(struct ast_bridge_channel *bridge_channe
 {
        struct ast_frame *fr;
        char nudge;
+       struct sync_payload *sync_payload;
 
        ast_bridge_channel_lock(bridge_channel);
        if (read(bridge_channel->alert_pipe[0], &nudge, sizeof(nudge)) < 0) {
@@ -1705,7 +1933,11 @@ static void bridge_channel_handle_write(struct ast_bridge_channel *bridge_channe
        }
        switch (fr->frametype) {
        case AST_FRAME_BRIDGE_ACTION:
-               bridge_channel_handle_action(bridge_channel, fr);
+               bridge_channel_handle_action(bridge_channel, fr->subclass.integer, fr->data.ptr);
+               break;
+       case AST_FRAME_BRIDGE_ACTION_SYNC:
+               sync_payload = fr->data.ptr;
+               bridge_channel_handle_action(bridge_channel, fr->subclass.integer, sync_payload->data);
                break;
        case AST_FRAME_CONTROL:
                bridge_channel_handle_control(bridge_channel, fr);
@@ -1718,7 +1950,7 @@ static void bridge_channel_handle_write(struct ast_bridge_channel *bridge_channe
                ast_write(bridge_channel->chan, fr);
                break;
        }
-       ast_frfree(fr);
+       bridge_frame_free(fr);
 }
 
 /*! \brief Internal function to handle DTMF from a channel */
@@ -1735,7 +1967,7 @@ static struct ast_frame *bridge_handle_dtmf(struct ast_bridge_channel *bridge_ch
        if (hook) {
                enum ast_frame_type frametype = frame->frametype;
 
-               ast_frfree(frame);
+               bridge_frame_free(frame);
                frame = NULL;
 
                ao2_ref(hook, -1);
@@ -1795,7 +2027,7 @@ static void bridge_handle_trip(struct ast_bridge_channel *bridge_channel)
                switch (frame->subclass.integer) {
                case AST_CONTROL_HANGUP:
                        ast_bridge_channel_kick(bridge_channel, 0);
-                       ast_frfree(frame);
+                       bridge_frame_free(frame);
                        return;
                default:
                        break;
@@ -1808,7 +2040,7 @@ static void bridge_handle_trip(struct ast_bridge_channel *bridge_channel)
                        return;
                }
                if (!bridge_channel->features->dtmf_passthrough) {
-                       ast_frfree(frame);
+                       bridge_frame_free(frame);
                        return;
                }
                break;
@@ -1818,7 +2050,7 @@ static void bridge_handle_trip(struct ast_bridge_channel *bridge_channel)
 
        /* Simply write the frame out to the bridge technology. */
        bridge_channel_write_frame(bridge_channel, frame);
-       ast_frfree(frame);
+       bridge_frame_free(frame);
 }
 
 /*!
@@ -2195,7 +2427,7 @@ static void bridge_channel_destroy(void *obj)
 
        /* Flush any unhandled wr_queue frames. */
        while ((fr = AST_LIST_REMOVE_HEAD(&bridge_channel->wr_queue, frame_list))) {
-               ast_frfree(fr);
+               bridge_frame_free(fr);
        }
        pipe_close(bridge_channel->alert_pipe);