ARI/res_stasis: Subscribe to both Local channel halves when originating to app
authorMatthew Jordan <mjordan@digium.com>
Mon, 7 Jul 2014 02:15:00 +0000 (02:15 +0000)
committerMatthew Jordan <mjordan@digium.com>
Mon, 7 Jul 2014 02:15:00 +0000 (02:15 +0000)
This patch fixes two bugs:

1. When originating a channel into a Stasis application, we already create a
   subscription for the channel that is going into our Stasis app.
   Unfortunately, when you create a Local channel and pass it off to a Stasis
   app, you really aren't creating just one channel: you're creating two. This
   patch snags the second half of the Local channel pair (assuming it is a
   Local channel pair, but luckily core_local is kind about such assumptions)
   and subscribes to it as well.

2. Subscriptions are a bit sticky right now. If a subscription is made, the
   'interest' count gets bumped on the Stasis subscription - but unless
   something explicitly unsubscribes the channel, said subscription sticks
   around. This is not much of a problem is a user is creating the subscription
   - if they made it, they must want it. However, when we are creating
   implicit subscriptions, we need to make sure something clears them out.
   This patch takes a pessimistic approach: it watches the cache updates
   coming from Stasis and, if we notice that the cache just cleared out an
   object, we delete our subscription object. This keeps our ao2 container of
   Stasis forwards in an application from growing out of hand; it also is a
   bit more forgiving for end users who may not realize they were supposed to
   unsubscribe from that channel that just hung up.

Review: https://reviewboard.asterisk.org/r/3710/
#ASTERISK-23939 #close
........

Merged revisions 418089 from http://svn.asterisk.org/svn/asterisk/branches/12

git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@418090 65c4cc65-6c06-0410-ace0-fbb531ad65f3

include/asterisk/stasis_app.h
res/ari/resource_channels.c
res/res_stasis.c
res/stasis/app.c

index 334155a..a7b2040 100644 (file)
@@ -297,6 +297,21 @@ enum stasis_app_subscribe_res stasis_app_unsubscribe(const char *app_name,
        const char **event_source_uris, int event_sources_count,
        struct ast_json **json);
 
+/*!
+ * \brief Directly subscribe an application to a channel
+ *
+ * \param app_name Name of the application to subscribe.
+ * \param chan The channel to subscribe to
+ *
+ * \return \ref stasis_app_subscribe_res return code.
+ *
+ * \note This method can be used when you already hold a channel and its
+ *       lock. This bypasses the channel lookup that would normally be
+ *       performed by \ref stasis_app_subscribe.
+ */
+enum stasis_app_subscribe_res stasis_app_subscribe_channel(const char *app_name,
+       struct ast_channel *chan);
+
 /*! @} */
 
 /*! @{ */
index 6cc00ce..3936092 100644 (file)
@@ -42,6 +42,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
 #include "asterisk/stasis_app_snoop.h"
 #include "asterisk/stasis_channels.h"
 #include "asterisk/causes.h"
+#include "asterisk/core_local.h"
 #include "resource_channels.h"
 
 #include <limits.h>
@@ -775,6 +776,7 @@ static void ari_channels_handle_originate_with_id(const char *args_endpoint,
        struct ast_format tmp_fmt;
        char *stuff;
        struct ast_channel *chan;
+       struct ast_channel *local_peer;
        RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
        struct ast_assigned_ids assignedids = {
                .uniqueid = args_channel_id,
@@ -859,20 +861,24 @@ static void ari_channels_handle_originate_with_id(const char *args_endpoint,
                return;
        }
 
-       snapshot = ast_channel_snapshot_get_latest(ast_channel_uniqueid(chan));
-       ast_channel_unlock(chan);
+       /* See if this is a Local channel and if so, get the peer */
+       local_peer = ast_local_get_peer(chan);
 
        if (!ast_strlen_zero(args_app)) {
-               /* channel: + channel ID + null terminator */
-               char uri[9 + strlen(ast_channel_uniqueid(chan))];
-               const char *uris[1] = { uri, };
-
-               sprintf(uri, "channel:%s", ast_channel_uniqueid(chan));
-               stasis_app_subscribe(args_app, uris, 1, NULL);
+               stasis_app_subscribe_channel(args_app, chan);
+               if (local_peer) {
+                       stasis_app_subscribe_channel(args_app, local_peer);
+               }
        }
 
+       snapshot = ast_channel_snapshot_get_latest(ast_channel_uniqueid(chan));
+       ast_channel_unlock(chan);
+
        ast_ari_response_ok(response, ast_channel_snapshot_to_json(snapshot, NULL));
        ast_channel_unref(chan);
+       if (local_peer) {
+               ast_channel_unref(local_peer);
+       }
 }
 
 void ast_ari_channels_originate_with_id(struct ast_variable *headers,
index 0184d20..ff74245 100644 (file)
@@ -1225,6 +1225,29 @@ static enum stasis_app_subscribe_res app_handle_subscriptions(
        return STASIS_ASR_OK;
 }
 
+enum stasis_app_subscribe_res stasis_app_subscribe_channel(const char *app_name,
+       struct ast_channel *chan)
+{
+       RAII_VAR(struct stasis_app *, app, find_app_by_name(app_name), ao2_cleanup);
+       int res;
+
+       if (!app) {
+               return STASIS_ASR_APP_NOT_FOUND;
+       }
+
+       ast_debug(3, "%s: Subscribing to %s\n", app_name, ast_channel_uniqueid(chan));
+
+       res = app_subscribe_channel(app, chan);
+       if (res != 0) {
+               ast_log(LOG_ERROR, "Error subscribing app '%s' to channel '%s'\n",
+                       app_name, ast_channel_uniqueid(chan));
+               return STASIS_ASR_INTERNAL_ERROR;
+       }
+
+       return STASIS_ASR_OK;
+}
+
+
 /*!
  * \internal
  * \brief Subscribe an app to an event source.
index 4dcb635..41f6ccf 100644 (file)
@@ -36,6 +36,8 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
 #include "asterisk/stasis_endpoints.h"
 #include "asterisk/stasis_message_router.h"
 
+static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate);
+
 struct stasis_app {
        /*! Aggregation topic for this application. */
        struct stasis_topic *topic;
@@ -449,7 +451,7 @@ static struct ast_json *channel_callerid(
 static channel_snapshot_monitor channel_monitors[] = {
        channel_state,
        channel_dialplan,
-       channel_callerid
+       channel_callerid,
 };
 
 static void sub_channel_update_handler(void *data,
@@ -486,6 +488,10 @@ static void sub_channel_update_handler(void *data,
                        app_send(app, msg);
                }
        }
+
+       if (!new_snapshot && old_snapshot) {
+               unsubscribe(app, "channel", old_snapshot->uniqueid, 1);
+       }
 }
 
 static struct ast_json *simple_endpoint_event(
@@ -513,6 +519,7 @@ static void sub_endpoint_update_handler(void *data,
        struct stasis_app *app = data;
        struct stasis_cache_update *update;
        struct ast_endpoint_snapshot *new_snapshot;
+       struct ast_endpoint_snapshot *old_snapshot;
        const struct timeval *tv;
 
        ast_assert(stasis_message_type(message) == stasis_cache_update_type());
@@ -522,17 +529,22 @@ static void sub_endpoint_update_handler(void *data,
        ast_assert(update->type == ast_endpoint_snapshot_type());
 
        new_snapshot = stasis_message_data(update->new_snapshot);
-       tv = update->new_snapshot ?
-               stasis_message_timestamp(update->new_snapshot) :
-               stasis_message_timestamp(message);
+       old_snapshot = stasis_message_data(update->old_snapshot);
 
-       json = simple_endpoint_event("EndpointStateChange", new_snapshot, tv);
+       if (new_snapshot) {
+               tv = stasis_message_timestamp(update->new_snapshot);
 
-       if (!json) {
-               return;
+               json = simple_endpoint_event("EndpointStateChange", new_snapshot, tv);
+               if (!json) {
+                       return;
+               }
+
+               app_send(app, json);
        }
 
-       app_send(app, json);
+       if (!new_snapshot && old_snapshot) {
+               unsubscribe(app, "endpoint", old_snapshot->id, 1);
+       }
 }
 
 static struct ast_json *simple_bridge_event(
@@ -580,11 +592,13 @@ static void sub_bridge_update_handler(void *data,
                json = simple_bridge_event("BridgeCreated", new_snapshot, tv);
        }
 
-       if (!json) {
-               return;
+       if (json) {
+               app_send(app, json);
        }
 
-       app_send(app, json);
+       if (!new_snapshot && old_snapshot) {
+               unsubscribe(app, "bridge", old_snapshot->uniqueid, 1);
+       }
 }
 
 
@@ -982,7 +996,7 @@ static int subscribe_channel(struct stasis_app *app, void *obj)
        return app_subscribe_channel(app, obj);
 }
 
-static int unsubscribe(struct stasis_app *app, const char *kind, const char *id)
+static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate)
 {
        RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
        SCOPED_AO2LOCK(lock, app->forwards);
@@ -997,7 +1011,7 @@ static int unsubscribe(struct stasis_app *app, const char *kind, const char *id)
        forwards->interested--;
 
        ast_debug(3, "%s '%s': is %d interested in %s\n", kind, id, forwards->interested, app->name);
-       if (forwards->interested == 0) {
+       if (forwards->interested == 0 || terminate) {
                /* No one is interested any more; unsubscribe */
                ast_debug(3, "%s '%s' unsubscribed from %s\n", kind, id, app->name);
                forwards_unsubscribe(forwards);
@@ -1024,7 +1038,7 @@ int app_unsubscribe_channel_id(struct stasis_app *app, const char *channel_id)
                return -1;
        }
 
-       return unsubscribe(app, "channel", channel_id);
+       return unsubscribe(app, "channel", channel_id, 0);
 }
 
 int app_is_subscribed_channel_id(struct stasis_app *app, const char *channel_id)
@@ -1093,7 +1107,7 @@ int app_unsubscribe_bridge_id(struct stasis_app *app, const char *bridge_id)
                return -1;
        }
 
-       return unsubscribe(app, "bridge", bridge_id);
+       return unsubscribe(app, "bridge", bridge_id, 0);
 }
 
 int app_is_subscribed_bridge_id(struct stasis_app *app, const char *bridge_id)
@@ -1153,7 +1167,7 @@ int app_unsubscribe_endpoint_id(struct stasis_app *app, const char *endpoint_id)
                return -1;
        }
 
-       return unsubscribe(app, "endpoint", endpoint_id);
+       return unsubscribe(app, "endpoint", endpoint_id, 0);
 }
 
 int app_is_subscribed_endpoint_id(struct stasis_app *app, const char *endpoint_id)