app_queue: track masquerades in app_queue to avoid leaked stasis subscriptions
authorNathan Bruning <nathan@iperity.com>
Wed, 8 Apr 2020 23:41:55 +0000 (01:41 +0200)
committerJoshua Colp <jcolp@sangoma.com>
Wed, 6 May 2020 09:10:26 +0000 (04:10 -0500)
Add a new "masquarade" channel event, and use it in app_queue to track unique id's.

Testcase is submitted as https://gerrit.asterisk.org/c/testsuite/+/14210

ASTERISK-28829 #close
ASTERISK-25844 #close

Change-Id: Ifc5f9f9fd70903f3c6e49738d3bc632b085d2df6

apps/app_queue.c
include/asterisk/stasis_channels.h
main/channel.c
main/stasis_channels.c

index 31b9a18..6ac5d89 100644 (file)
@@ -6449,6 +6449,33 @@ static void handle_hangup(void *userdata, struct stasis_subscription *sub,
        remove_stasis_subscriptions(queue_data);
 }
 
+static void handle_masquerade(void *userdata, struct stasis_subscription *sub,
+               struct stasis_message *msg)
+{
+       struct queue_stasis_data *queue_data = userdata;
+       struct ast_channel_blob *channel_blob = stasis_message_data(msg);
+       const char *new_channel_id;
+
+       new_channel_id = ast_json_string_get(ast_json_object_get(channel_blob->blob, "newchanneluniqueid"));
+
+       ao2_lock(queue_data);
+
+       if (queue_data->dying) {
+               ao2_unlock(queue_data);
+               return;
+       }
+
+       if (!strcmp(channel_blob->snapshot->base->uniqueid, queue_data->caller_uniqueid)) {
+               ast_debug(1, "Replacing caller channel %s with %s due to masquerade\n", queue_data->caller_uniqueid, new_channel_id);
+               ast_string_field_set(queue_data, caller_uniqueid, new_channel_id);
+       } else if (!strcmp(channel_blob->snapshot->base->uniqueid, queue_data->member_uniqueid)) {
+               ast_debug(1, "Replacing member channel %s with %s due to masquerade\n", queue_data->member_uniqueid, new_channel_id);
+               ast_string_field_set(queue_data, member_uniqueid, new_channel_id);
+       }
+
+       ao2_unlock(queue_data);
+}
+
 /*!
  * \internal
  * \brief Callback for all stasis channel events
@@ -6522,6 +6549,8 @@ static int setup_stasis_subs(struct queue_ent *qe, struct ast_channel *peer, str
                        handle_local_optimization_end, queue_data);
        stasis_message_router_add(queue_data->channel_router, ast_channel_hangup_request_type(),
                        handle_hangup, queue_data);
+       stasis_message_router_add(queue_data->channel_router, ast_channel_masquerade_type(),
+                       handle_masquerade, queue_data);
        stasis_message_router_set_default(queue_data->channel_router,
                        queue_channel_cb, queue_data);
 
index c90470a..9c47984 100644 (file)
@@ -502,6 +502,14 @@ struct stasis_message_type *ast_channel_varset_type(void);
 struct stasis_message_type *ast_channel_hangup_request_type(void);
 
 /*!
+ * \since 16
+ * \brief Message type for when a channel is being masqueraded
+ *
+ * \retval A stasis message type
+ */
+struct stasis_message_type *ast_channel_masquerade_type(void);
+
+/*!
  * \since 12
  * \brief Message type for when DTMF begins on a channel.
  *
index 821d8fd..7424b81 100644 (file)
@@ -10691,6 +10691,7 @@ AST_MUTEX_DEFINE_STATIC(channel_move_lock);
 
 int ast_channel_move(struct ast_channel *dest, struct ast_channel *source)
 {
+       RAII_VAR(struct ast_json *, blob, NULL, ast_json_unref);
        SCOPED_MUTEX(lock, &channel_move_lock);
 
        if (dest == source) {
@@ -10715,6 +10716,10 @@ int ast_channel_move(struct ast_channel *dest, struct ast_channel *source)
        ast_channel_masq_set(dest, source);
        ast_channel_masqr_set(source, dest);
 
+       blob = ast_json_pack("{s: s}",
+                       "newchanneluniqueid", ast_channel_uniqueid(dest));
+       ast_channel_publish_blob(source, ast_channel_masquerade_type(), blob);
+
        ast_channel_unlock(dest);
        ast_channel_unlock(source);
 
index 12c8f44..805525f 100644 (file)
@@ -1587,6 +1587,7 @@ STASIS_MESSAGE_TYPE_DEFN(ast_channel_varset_type,
 STASIS_MESSAGE_TYPE_DEFN(ast_channel_hangup_request_type,
        .to_json = hangup_request_to_json,
        );
+STASIS_MESSAGE_TYPE_DEFN(ast_channel_masquerade_type);
 STASIS_MESSAGE_TYPE_DEFN(ast_channel_dtmf_begin_type);
 STASIS_MESSAGE_TYPE_DEFN(ast_channel_dtmf_end_type,
        .to_json = dtmf_end_to_json,
@@ -1635,6 +1636,7 @@ static void stasis_channels_cleanup(void)
        STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dial_type);
        STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_varset_type);
        STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_hangup_request_type);
+       STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_masquerade_type);
        STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dtmf_begin_type);
        STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dtmf_end_type);
        STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_hold_type);
@@ -1684,6 +1686,7 @@ int ast_stasis_channels_init(void)
        res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_dial_type);
        res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_varset_type);
        res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_hangup_request_type);
+       res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_masquerade_type);
        res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_dtmf_begin_type);
        res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_dtmf_end_type);
        res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_hold_type);