ARI: WebSocket event cleanup
authorDavid M. Lee <dlee@digium.com>
Tue, 27 Aug 2013 19:19:36 +0000 (19:19 +0000)
committerDavid M. Lee <dlee@digium.com>
Tue, 27 Aug 2013 19:19:36 +0000 (19:19 +0000)
Stasis events (which get distributed over the ARI WebSocket) are created
by subscribing to the channel_all_cached and bridge_all_cached topics,
filtering out events for channels/bridges currently subscribed to.

There are two issues with that. First was a race condition, where
messages in-flight to the master subscribe-to-all-things topic would get
sent out, even though the events happened before the channel was put
into Stasis. Secondly, as the number of channels and bridges grow in the
system, the work spent filtering messages becomes excessive.

Since r395954, individual channels and bridges have caching topics, and
can be subscribed to individually. This patch takes advantage, so that
channels and bridges are subscribed to on demand, instead of filtering
the global topics.

The one case where filtering is still required is handling BridgeMerge
messages, which are published directly to the bridge_all topic.

Other than the change to how subscriptions work, this patch mostly just
moves code around. Most of the work generating JSON objects from
messages was moved to .to_json handlers on the message types. The
callback functions handling app subscriptions were moved from res_stasis
(b/c they were global to the model) to stasis/app.c (b/c they are local
to the app now).

(closes issue ASTERISK-21969)
Reported by: Matt Jordan
Review: https://reviewboard.asterisk.org/r/2754/
........

Merged revisions 397816 from http://svn.asterisk.org/svn/asterisk/branches/12

git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@397820 65c4cc65-6c06-0410-ace0-fbb531ad65f3

main/stasis_bridges.c
res/res_ari_asterisk.c
res/res_ari_bridges.c
res/res_ari_events.c
res/res_stasis.c
res/stasis/app.c
res/stasis/app.h
rest-api-templates/param_parsing.mustache
rest-api-templates/res_ari_resource.c.mustache

index 2a79056..be1294a 100644 (file)
@@ -132,6 +132,9 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
 
 static struct ast_manager_event_blob *attended_transfer_to_ami(struct stasis_message *message);
 static struct ast_manager_event_blob *blind_transfer_to_ami(struct stasis_message *message);
+static struct ast_json *ast_channel_entered_bridge_to_json(struct stasis_message *msg);
+static struct ast_json *ast_channel_left_bridge_to_json(struct stasis_message *msg);
+static struct ast_json *ast_bridge_merge_message_to_json(struct stasis_message *msg);
 
 static struct stasis_cp_all *bridge_cache_all;
 
@@ -139,9 +142,12 @@ static struct stasis_cp_all *bridge_cache_all;
  * @{ \brief Define bridge message types.
  */
 STASIS_MESSAGE_TYPE_DEFN(ast_bridge_snapshot_type);
-STASIS_MESSAGE_TYPE_DEFN(ast_bridge_merge_message_type);
-STASIS_MESSAGE_TYPE_DEFN(ast_channel_entered_bridge_type);
-STASIS_MESSAGE_TYPE_DEFN(ast_channel_left_bridge_type);
+STASIS_MESSAGE_TYPE_DEFN(ast_bridge_merge_message_type,
+       .to_json = ast_bridge_merge_message_to_json);
+STASIS_MESSAGE_TYPE_DEFN(ast_channel_entered_bridge_type,
+       .to_json = ast_channel_entered_bridge_to_json);
+STASIS_MESSAGE_TYPE_DEFN(ast_channel_left_bridge_type,
+       .to_json = ast_channel_left_bridge_to_json);
 STASIS_MESSAGE_TYPE_DEFN(ast_blind_transfer_type, .to_ami = blind_transfer_to_ami);
 STASIS_MESSAGE_TYPE_DEFN(ast_attended_transfer_type, .to_ami = attended_transfer_to_ami);
 /*! @} */
@@ -307,6 +313,19 @@ static struct ast_bridge_merge_message *bridge_merge_message_create(struct ast_b
        return msg;
 }
 
+static struct ast_json *ast_bridge_merge_message_to_json(struct stasis_message *msg)
+{
+       struct ast_bridge_merge_message *merge;
+
+       merge = stasis_message_data(msg);
+
+        return ast_json_pack("{s: s, s: o, s: o, s: o}",
+                "type", "BridgeMerged",
+                "timestamp", ast_json_timeval(*stasis_message_timestamp(msg), NULL),
+                "bridge", ast_bridge_snapshot_to_json(merge->to),
+                "bridge_from", ast_bridge_snapshot_to_json(merge->from));
+}
+
 void ast_bridge_publish_merge(struct ast_bridge *to, struct ast_bridge *from)
 {
        RAII_VAR(struct ast_bridge_merge_message *, merge_msg, NULL, ao2_cleanup);
@@ -417,6 +436,35 @@ void ast_bridge_publish_leave(struct ast_bridge *bridge, struct ast_channel *cha
        stasis_publish(ast_bridge_topic(bridge), msg);
 }
 
+static struct ast_json *simple_bridge_channel_event(
+        const char *type,
+        struct ast_bridge_snapshot *bridge_snapshot,
+        struct ast_channel_snapshot *channel_snapshot,
+        const struct timeval *tv)
+{
+        return ast_json_pack("{s: s, s: o, s: o, s: o}",
+                "type", type,
+                "timestamp", ast_json_timeval(*tv, NULL),
+                "bridge", ast_bridge_snapshot_to_json(bridge_snapshot),
+                "channel", ast_channel_snapshot_to_json(channel_snapshot));
+}
+
+struct ast_json *ast_channel_entered_bridge_to_json(struct stasis_message *msg)
+{
+        struct ast_bridge_blob *obj = stasis_message_data(msg);
+
+       return simple_bridge_channel_event("ChannelEnteredBridge", obj->bridge,
+                obj->channel, stasis_message_timestamp(msg));
+}
+
+struct ast_json *ast_channel_left_bridge_to_json(struct stasis_message *msg)
+{
+        struct ast_bridge_blob *obj = stasis_message_data(msg);
+
+       return simple_bridge_channel_event("ChannelLeftBridge", obj->bridge,
+                obj->channel, stasis_message_timestamp(msg));
+}
+
 typedef struct ast_json *(*json_item_serializer_cb)(void *obj);
 
 static struct ast_json *container_to_json_array(struct ao2_container *items, json_item_serializer_cb item_cb)
index 3f34c7a..3f0c285 100644 (file)
@@ -81,8 +81,16 @@ static void ast_ari_get_asterisk_info_cb(
                                goto fin;
                        }
 
-                       args.only_count = ast_app_separate_args(
-                               args.only_parse, ',', vals, ARRAY_LEN(vals));
+                       if (strlen(args.only_parse) == 0) {
+                               /* ast_app_separate_args can't handle "" */
+                               args.only_count = 1;
+                               vals[0] = args.only_parse;
+                       } else {
+                               args.only_count = ast_app_separate_args(
+                                       args.only_parse, ',', vals,
+                                       ARRAY_LEN(vals));
+                       }
+
                        if (args.only_count == 0) {
                                ast_ari_response_alloc_failed(response);
                                goto fin;
index bc8e200..d3b3a64 100644 (file)
@@ -300,8 +300,16 @@ static void ast_ari_add_channel_to_bridge_cb(
                                goto fin;
                        }
 
-                       args.channel_count = ast_app_separate_args(
-                               args.channel_parse, ',', vals, ARRAY_LEN(vals));
+                       if (strlen(args.channel_parse) == 0) {
+                               /* ast_app_separate_args can't handle "" */
+                               args.channel_count = 1;
+                               vals[0] = args.channel_parse;
+                       } else {
+                               args.channel_count = ast_app_separate_args(
+                                       args.channel_parse, ',', vals,
+                                       ARRAY_LEN(vals));
+                       }
+
                        if (args.channel_count == 0) {
                                ast_ari_response_alloc_failed(response);
                                goto fin;
@@ -403,8 +411,16 @@ static void ast_ari_remove_channel_from_bridge_cb(
                                goto fin;
                        }
 
-                       args.channel_count = ast_app_separate_args(
-                               args.channel_parse, ',', vals, ARRAY_LEN(vals));
+                       if (strlen(args.channel_parse) == 0) {
+                               /* ast_app_separate_args can't handle "" */
+                               args.channel_count = 1;
+                               vals[0] = args.channel_parse;
+                       } else {
+                               args.channel_count = ast_app_separate_args(
+                                       args.channel_parse, ',', vals,
+                                       ARRAY_LEN(vals));
+                       }
+
                        if (args.channel_count == 0) {
                                ast_ari_response_alloc_failed(response);
                                goto fin;
index 5cea06f..567167f 100644 (file)
@@ -89,8 +89,16 @@ static void ast_ari_event_websocket_ws_cb(struct ast_websocket *ws_session,
                                goto fin;
                        }
 
-                       args.app_count = ast_app_separate_args(
-                               args.app_parse, ',', vals, ARRAY_LEN(vals));
+                       if (strlen(args.app_parse) == 0) {
+                               /* ast_app_separate_args can't handle "" */
+                               args.app_count = 1;
+                               vals[0] = args.app_parse;
+                       } else {
+                               args.app_count = ast_app_separate_args(
+                                       args.app_parse, ',', vals,
+                                       ARRAY_LEN(vals));
+                       }
+
                        if (args.app_count == 0) {
                                ast_ari_response_alloc_failed(response);
                                goto fin;
@@ -126,14 +134,16 @@ fin: __attribute__((unused))
                 * negotiation. Param parsing should happen earlier, but we
                 * need a way to pass it through the WebSocket code to the
                 * callback */
-               RAII_VAR(char *, msg, NULL, ast_free);
+               RAII_VAR(char *, msg, NULL, ast_json_free);
                if (response->message) {
                        msg = ast_json_dump_string(response->message);
                } else {
-                       msg = ast_strdup("?");
+                       ast_log(LOG_ERROR, "Missing response message\n");
+               }
+               if (msg) {
+                       ast_websocket_write(ws_session,
+                               AST_WEBSOCKET_OPCODE_TEXT, msg, strlen(msg));
                }
-               ast_websocket_write(ws_session, AST_WEBSOCKET_OPCODE_TEXT, msg,
-                       strlen(msg));
        }
        ast_free(args.app_parse);
        ast_free(args.app);
index 35c1847..ab2bf5c 100644 (file)
@@ -87,6 +87,12 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
 #define CONTROLS_NUM_BUCKETS 127
 
 /*!
+ * \brief Number of buckets for the Stasis bridges hash table.  Remember to
+ * keep it a prime number!
+ */
+#define BRIDGES_NUM_BUCKETS 127
+
+/*!
  * \brief Stasis application container.
  */
 struct ao2_container *apps_registry;
@@ -97,12 +103,6 @@ struct ao2_container *app_bridges;
 
 struct ao2_container *app_bridges_moh;
 
-/*! \brief Message router for the channel caching topic */
-struct stasis_message_router *channel_router;
-
-/*! \brief Message router for the bridge caching topic */
-struct stasis_message_router *bridge_router;
-
 /*! AO2 hash function for \ref app */
 static int app_hash(const void *obj, const int flags)
 {
@@ -153,6 +153,30 @@ static int control_compare(void *lhs, void *rhs, int flags)
        }
 }
 
+static int cleanup_cb(void *obj, void *arg, int flags)
+{
+       struct app *app = obj;
+
+       if (!app_is_finished(app)) {
+               return 0;
+       }
+
+       ast_verb(1, "Shutting down application '%s'\n", app_name(app));
+       app_shutdown(app);
+
+       return CMP_MATCH;
+
+}
+
+/*!
+ * \brief Clean up any old apps that we don't need any more.
+ */
+static void cleanup(void)
+{
+       ao2_callback(apps_registry, OBJ_MULTIPLE | OBJ_NODATA | OBJ_UNLINK,
+               cleanup_cb, NULL);
+}
+
 struct stasis_app_control *stasis_app_control_create(struct ast_channel *chan)
 {
        return control_create(chan);
@@ -435,229 +459,6 @@ struct ast_bridge *stasis_app_bridge_find_by_id(
        return ao2_find(app_bridges, bridge_id, OBJ_KEY);
 }
 
-/*! \brief Typedef for blob handler callbacks */
-typedef struct ast_json *(*channel_blob_handler_cb)(struct ast_channel_blob *);
-
-/*! \brief Callback to check whether an app is watching a given channel */
-static int app_watching_channel_cb(void *obj, void *arg, int flags)
-{
-       struct app *app = obj;
-       char *uniqueid = arg;
-
-       return app_is_watching_channel(app, uniqueid) ? CMP_MATCH : 0;
-}
-
-/*! \brief Get a container full of apps that are interested in the specified channel */
-static struct ao2_container *get_apps_watching_channel(const char *uniqueid)
-{
-       struct ao2_container *watching_apps;
-       char *uniqueid_dup;
-       RAII_VAR(struct ao2_iterator *,watching_apps_iter, NULL, ao2_iterator_destroy);
-       ast_assert(uniqueid != NULL);
-
-       uniqueid_dup = ast_strdupa(uniqueid);
-
-       watching_apps_iter = ao2_callback(apps_registry, OBJ_MULTIPLE, app_watching_channel_cb, uniqueid_dup);
-       watching_apps = watching_apps_iter->c;
-
-       if (!ao2_container_count(watching_apps)) {
-               return NULL;
-       }
-
-       ao2_ref(watching_apps, +1);
-       return watching_apps_iter->c;
-}
-
-/*! \brief Typedef for callbacks that get called on channel snapshot updates */
-typedef struct ast_json *(*channel_snapshot_monitor)(
-       struct ast_channel_snapshot *old_snapshot,
-       struct ast_channel_snapshot *new_snapshot,
-       const struct timeval *tv);
-
-static struct ast_json *simple_channel_event(
-       const char *type,
-       struct ast_channel_snapshot *snapshot,
-       const struct timeval *tv)
-{
-       return ast_json_pack("{s: s, s: o, s: o}",
-               "type", type,
-               "timestamp", ast_json_timeval(*tv, NULL),
-               "channel", ast_channel_snapshot_to_json(snapshot));
-}
-
-static struct ast_json *channel_created_event(
-       struct ast_channel_snapshot *snapshot,
-       const struct timeval *tv)
-{
-       return simple_channel_event("ChannelCreated", snapshot, tv);
-}
-
-static struct ast_json *channel_destroyed_event(
-       struct ast_channel_snapshot *snapshot,
-       const struct timeval *tv)
-{
-       return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
-               "type", "ChannelDestroyed",
-               "timestamp", ast_json_timeval(*tv, NULL),
-               "cause", snapshot->hangupcause,
-               "cause_txt", ast_cause2str(snapshot->hangupcause),
-               "channel", ast_channel_snapshot_to_json(snapshot));
-}
-
-static struct ast_json *channel_state_change_event(
-       struct ast_channel_snapshot *snapshot,
-       const struct timeval *tv)
-{
-       return simple_channel_event("ChannelStateChange", snapshot, tv);
-}
-
-/*! \brief Handle channel state changes */
-static struct ast_json *channel_state(
-       struct ast_channel_snapshot *old_snapshot,
-       struct ast_channel_snapshot *new_snapshot,
-       const struct timeval *tv)
-{
-       struct ast_channel_snapshot *snapshot = new_snapshot ? new_snapshot : old_snapshot;
-
-       if (!old_snapshot) {
-               return channel_created_event(snapshot, tv);
-       } else if (!new_snapshot) {
-               return channel_destroyed_event(snapshot, tv);
-       } else if (old_snapshot->state != new_snapshot->state) {
-               return channel_state_change_event(snapshot, tv);
-       }
-
-       return NULL;
-}
-
-static struct ast_json *channel_dialplan(
-       struct ast_channel_snapshot *old_snapshot,
-       struct ast_channel_snapshot *new_snapshot,
-       const struct timeval *tv)
-{
-       RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
-
-       /* No Newexten event on cache clear */
-       if (!new_snapshot) {
-               return NULL;
-       }
-
-       /* Empty application is not valid for a Newexten event */
-       if (ast_strlen_zero(new_snapshot->appl)) {
-               return NULL;
-       }
-
-       if (old_snapshot && ast_channel_snapshot_cep_equal(old_snapshot, new_snapshot)) {
-               return NULL;
-       }
-
-       return ast_json_pack("{s: s, s: o, s: s, s: s, s: o}",
-               "type", "ChannelDialplan",
-               "timestamp", ast_json_timeval(*tv, NULL),
-               "dialplan_app", new_snapshot->appl,
-               "dialplan_app_data", new_snapshot->data,
-               "channel", ast_channel_snapshot_to_json(new_snapshot));
-}
-
-static struct ast_json *channel_callerid(
-       struct ast_channel_snapshot *old_snapshot,
-       struct ast_channel_snapshot *new_snapshot,
-       const struct timeval *tv)
-{
-       RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
-
-       /* No NewCallerid event on cache clear or first event */
-       if (!old_snapshot || !new_snapshot) {
-               return NULL;
-       }
-
-       if (ast_channel_snapshot_caller_id_equal(old_snapshot, new_snapshot)) {
-               return NULL;
-       }
-
-       return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
-               "type", "ChannelCallerId",
-               "timestamp", ast_json_timeval(*tv, NULL),
-               "caller_presentation", new_snapshot->caller_pres,
-               "caller_presentation_txt", ast_describe_caller_presentation(
-                       new_snapshot->caller_pres),
-               "channel", ast_channel_snapshot_to_json(new_snapshot));
-}
-
-channel_snapshot_monitor channel_monitors[] = {
-       channel_state,
-       channel_dialplan,
-       channel_callerid
-};
-
-static int app_send_cb(void *obj, void *arg, int flags)
-{
-       struct app *app = obj;
-       struct ast_json *msg = arg;
-
-       app_send(app, msg);
-       return 0;
-}
-
-static void sub_channel_snapshot_handler(void *data,
-               struct stasis_subscription *sub,
-               struct stasis_topic *topic,
-               struct stasis_message *message)
-{
-       RAII_VAR(struct ao2_container *, watching_apps, NULL, ao2_cleanup);
-       struct stasis_cache_update *update = stasis_message_data(message);
-       struct ast_channel_snapshot *new_snapshot = stasis_message_data(update->new_snapshot);
-       struct ast_channel_snapshot *old_snapshot = stasis_message_data(update->old_snapshot);
-       /* Pull timestamp from the new snapshot, or from the update message
-        * when there isn't one. */
-       const struct timeval *tv = update->new_snapshot ? stasis_message_timestamp(update->new_snapshot) : stasis_message_timestamp(message);
-       int i;
-
-       watching_apps = get_apps_watching_channel(new_snapshot ? new_snapshot->uniqueid : old_snapshot->uniqueid);
-       if (!watching_apps) {
-               return;
-       }
-
-       for (i = 0; i < ARRAY_LEN(channel_monitors); ++i) {
-               RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
-
-               msg = channel_monitors[i](old_snapshot, new_snapshot, tv);
-               if (msg) {
-                       ao2_callback(watching_apps, OBJ_NODATA, app_send_cb, msg);
-               }
-       }
-}
-
-static void distribute_message(struct ao2_container *apps, struct ast_json *msg)
-{
-       ao2_callback(apps, OBJ_NODATA, app_send_cb, msg);
-}
-
-static void sub_channel_blob_handler(void *data,
-               struct stasis_subscription *sub,
-               struct stasis_topic *topic,
-               struct stasis_message *message)
-{
-       RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
-       RAII_VAR(struct ao2_container *, watching_apps, NULL, ao2_cleanup);
-       struct ast_channel_blob *obj = stasis_message_data(message);
-
-       if (!obj->snapshot) {
-               return;
-       }
-
-       msg = stasis_message_to_json(message);
-       if (!msg) {
-               return;
-       }
-
-       watching_apps = get_apps_watching_channel(obj->snapshot->uniqueid);
-       if (!watching_apps) {
-               return;
-       }
-
-       distribute_message(watching_apps, msg);
-}
 
 /*!
  * \brief In addition to running ao2_cleanup(), this function also removes the
@@ -709,7 +510,7 @@ void stasis_app_bridge_destroy(const char *bridge_id)
        ast_bridge_destroy(bridge, 0);
 }
 
-int app_send_start_msg(struct app *app, struct ast_channel *chan,
+static int send_start_msg(struct app *app, struct ast_channel *chan,
        int argc, char *argv[])
 {
        RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
@@ -726,8 +527,9 @@ int app_send_start_msg(struct app *app, struct ast_channel *chan,
                return -1;
        }
 
-       msg = ast_json_pack("{s: s, s: [], s: o}",
+       msg = ast_json_pack("{s: s, s: o, s: [], s: o}",
                "type", "StasisStart",
+               "timestamp", ast_json_timeval(ast_tvnow(), NULL),
                "args",
                "channel", ast_channel_snapshot_to_json(snapshot));
        if (!msg) {
@@ -750,7 +552,7 @@ int app_send_start_msg(struct app *app, struct ast_channel *chan,
        return 0;
 }
 
-int app_send_end_msg(struct app *app, struct ast_channel *chan)
+static int send_end_msg(struct app *app, struct ast_channel *chan)
 {
        RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
        RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
@@ -763,8 +565,9 @@ int app_send_end_msg(struct app *app, struct ast_channel *chan)
                return -1;
        }
 
-       msg = ast_json_pack("{s: s, s: o}",
+       msg = ast_json_pack("{s: s, s: o, s: o}",
                "type", "StasisEnd",
+               "timestamp", ast_json_timeval(ast_tvnow(), NULL),
                "channel", ast_channel_snapshot_to_json(snapshot));
        if (!msg) {
                return -1;
@@ -815,15 +618,17 @@ int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc,
        }
        ao2_link(app_controls, control);
 
-       res = app_send_start_msg(app, chan, argc, argv);
+       res = send_start_msg(app, chan, argc, argv);
        if (res != 0) {
                ast_log(LOG_ERROR,
                        "Error sending start message to '%s'\n", app_name);
-               return res;
+               return -1;
        }
 
-       if (app_add_channel(app, chan)) {
-               ast_log(LOG_ERROR, "Error adding listener for channel %s to app %s\n", ast_channel_name(chan), app_name);
+       res = app_subscribe_channel(app, chan);
+       if (res != 0) {
+               ast_log(LOG_ERROR, "Error subscribing app '%s' to channel '%s'\n",
+                       app_name, ast_channel_name(chan));
                return -1;
        }
 
@@ -831,13 +636,23 @@ int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc,
                RAII_VAR(struct ast_frame *, f, NULL, ast_frame_dtor);
                int r;
                int command_count;
+               struct ast_bridge *last_bridge = NULL;
+               struct ast_bridge *bridge = NULL;
 
                /* Check to see if a bridge absorbed our hangup frame */
                if (ast_check_hangup_locked(chan)) {
                        break;
                }
 
-               if (stasis_app_get_bridge(control)) {
+               last_bridge = bridge;
+               bridge = stasis_app_get_bridge(control);
+
+               if (bridge != last_bridge) {
+                       app_unsubscribe_bridge(app, last_bridge);
+                       app_subscribe_bridge(app, bridge);
+               }
+
+               if (bridge) {
                        /* Bridge is handling channel frames */
                        control_wait(control);
                        control_dispatch_all(control, chan);
@@ -882,14 +697,21 @@ int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc,
                }
        }
 
-       app_remove_channel(app, chan);
-       res = app_send_end_msg(app, chan);
+       app_unsubscribe_bridge(app, stasis_app_get_bridge(control));
+       app_unsubscribe_channel(app, chan);
+
+       res = send_end_msg(app, chan);
        if (res != 0) {
                ast_log(LOG_ERROR,
                        "Error sending end message to %s\n", app_name);
                return res;
        }
 
+       /* There's an off chance that app is ready for cleanup. Go ahead
+        * and clean up, just in case
+        */
+       cleanup();
+
        return res;
 }
 
@@ -912,29 +734,6 @@ int stasis_app_send(const char *app_name, struct ast_json *message)
        return 0;
 }
 
-static int cleanup_cb(void *obj, void *arg, int flags)
-{
-       struct app *app = obj;
-
-       if (!app_is_finished(app)) {
-               return 0;
-       }
-
-       ast_verb(1, "Cleaning up application '%s'\n", app_name(app));
-
-       return CMP_MATCH;
-
-}
-
-/*!
- * \brief Clean up any old apps that we don't need any more.
- */
-static void cleanup(void)
-{
-       ao2_callback(apps_registry, OBJ_MULTIPLE | OBJ_NODATA | OBJ_UNLINK,
-               cleanup_cb, NULL);
-}
-
 int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data)
 {
        RAII_VAR(struct app *, app, NULL, ao2_cleanup);
@@ -994,249 +793,22 @@ void stasis_app_unref(void)
        ast_module_unref(ast_module_info->self);
 }
 
-/*! \brief Callback to check whether an app is watching a given bridge */
-static int app_watching_bridge_cb(void *obj, void *arg, int flags)
-{
-       struct app *app = obj;
-       char *uniqueid = arg;
-
-       return app_is_watching_bridge(app, uniqueid) ? CMP_MATCH : 0;
-}
-
-/*! \brief Get a container full of apps that are interested in the specified bridge */
-static struct ao2_container *get_apps_watching_bridge(const char *uniqueid)
-{
-       struct ao2_container *watching_apps;
-       char *uniqueid_dup;
-       RAII_VAR(struct ao2_iterator *,watching_apps_iter, NULL, ao2_iterator_destroy);
-       ast_assert(uniqueid != NULL);
-
-       uniqueid_dup = ast_strdupa(uniqueid);
-
-       watching_apps_iter = ao2_callback(apps_registry, OBJ_MULTIPLE, app_watching_bridge_cb, uniqueid_dup);
-       watching_apps = watching_apps_iter->c;
-
-       if (!ao2_container_count(watching_apps)) {
-               return NULL;
-       }
-
-       ao2_ref(watching_apps, +1);
-       return watching_apps_iter->c;
-}
-
-/*! Callback used to remove an app's interest in a bridge */
-static int remove_bridge_cb(void *obj, void *arg, int flags)
-{
-       app_remove_bridge(obj, arg);
-       return 0;
-}
-
-static struct ast_json *simple_bridge_event(
-       const char *type,
-       struct ast_bridge_snapshot *snapshot,
-       const struct timeval *tv)
-{
-       return ast_json_pack("{s: s, s: o, s: o}",
-               "type", type,
-               "timestamp", ast_json_timeval(*tv, NULL),
-               "bridge", ast_bridge_snapshot_to_json(snapshot));
-}
-
-static struct ast_json *simple_bridge_channel_event(
-       const char *type,
-       struct ast_bridge_snapshot *bridge_snapshot,
-       struct ast_channel_snapshot *channel_snapshot,
-       const struct timeval *tv)
-{
-       return ast_json_pack("{s: s, s: o, s: o, s: o}",
-               "type", type,
-               "timestamp", ast_json_timeval(*tv, NULL),
-               "bridge", ast_bridge_snapshot_to_json(bridge_snapshot),
-               "channel", ast_channel_snapshot_to_json(channel_snapshot));
-}
-
-static void sub_bridge_snapshot_handler(void *data,
-               struct stasis_subscription *sub,
-               struct stasis_topic *topic,
-               struct stasis_message *message)
-{
-       RAII_VAR(struct ao2_container *, watching_apps, NULL, ao2_cleanup);
-       struct stasis_cache_update *update = stasis_message_data(message);
-       struct ast_bridge_snapshot *new_snapshot = stasis_message_data(update->new_snapshot);
-       struct ast_bridge_snapshot *old_snapshot = stasis_message_data(update->old_snapshot);
-       const struct timeval *tv = update->new_snapshot ? stasis_message_timestamp(update->new_snapshot) : stasis_message_timestamp(message);
-
-       RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
-
-       watching_apps = get_apps_watching_bridge(new_snapshot ? new_snapshot->uniqueid : old_snapshot->uniqueid);
-       if (!watching_apps || !ao2_container_count(watching_apps)) {
-               return;
-       }
-
-       if (!new_snapshot) {
-               RAII_VAR(char *, bridge_id, ast_strdup(old_snapshot->uniqueid), ast_free);
-
-               /* The bridge has gone away. Create the message, make sure no apps are
-                * watching this bridge anymore, and destroy the bridge's control
-                * structure */
-               msg = simple_bridge_event("BridgeDestroyed", old_snapshot, tv);
-               ao2_callback(watching_apps, OBJ_NODATA, remove_bridge_cb, bridge_id);
-               stasis_app_bridge_destroy(old_snapshot->uniqueid);
-       } else if (!old_snapshot) {
-               msg = simple_bridge_event("BridgeCreated", old_snapshot, tv);
-       }
-
-       if (!msg) {
-               return;
-       }
-
-       distribute_message(watching_apps, msg);
-}
-
-/*! \brief Callback used to merge two containers of applications */
-static int list_merge_cb(void *obj, void *arg, int flags)
-{
-       /* remove any current entries for this app */
-       ao2_find(arg, obj, OBJ_POINTER | OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE);
-       /* relink as the only entry */
-       ao2_link(arg, obj);
-       return 0;
-}
-
-/*! \brief Merge container src into container dst without modifying src */
-static void update_apps_list(struct ao2_container *dst, struct ao2_container *src)
-{
-       ao2_callback(src, OBJ_NODATA, list_merge_cb, dst);
-}
-
-/*! \brief Callback for adding to an app's bridges of interest */
-static int app_add_bridge_cb(void *obj, void *arg, int flags)
-{
-       app_add_bridge(obj, arg);
-       return 0;
-}
-
-/*! \brief Add interest in the given bridge to all apps in the container */
-static void update_bridge_interest(struct ao2_container *apps, const char *bridge_id)
-{
-       RAII_VAR(char *, bridge_id_dup, ast_strdup(bridge_id), ast_free);
-       ao2_callback(apps, OBJ_NODATA, app_add_bridge_cb, bridge_id_dup);
-}
-
-static void sub_bridge_merge_handler(void *data,
-               struct stasis_subscription *sub,
-               struct stasis_topic *topic,
-               struct stasis_message *message)
-{
-       RAII_VAR(struct ao2_container *, watching_apps_to, NULL, ao2_cleanup);
-       RAII_VAR(struct ao2_container *, watching_apps_from, NULL, ao2_cleanup);
-       RAII_VAR(struct ao2_container *, watching_apps_all, ao2_container_alloc(1, NULL, NULL), ao2_cleanup);
-       struct ast_bridge_merge_message *merge = stasis_message_data(message);
-       RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
-       RAII_VAR(struct ast_json *, blob, NULL, ast_json_unref);
-       const struct timeval *tv = stasis_message_timestamp(message);
-
-       watching_apps_to = get_apps_watching_bridge(merge->to->uniqueid);
-       if (watching_apps_to) {
-               update_apps_list(watching_apps_all, watching_apps_to);
-       }
-
-       watching_apps_from = get_apps_watching_bridge(merge->from->uniqueid);
-       if (watching_apps_from) {
-               update_bridge_interest(watching_apps_from, merge->to->uniqueid);
-               update_apps_list(watching_apps_all, watching_apps_from);
-       }
-
-       if (!ao2_container_count(watching_apps_all)) {
-               return;
-       }
-
-       msg = ast_json_pack("{s: s, s: o, s: o, s: o}",
-               "type", "BridgeMerged",
-               "timestamp", ast_json_timeval(*tv, NULL),
-               "bridge", ast_bridge_snapshot_to_json(merge->to),
-               "bridge_from", ast_bridge_snapshot_to_json(merge->from));
-
-       if (!msg) {
-               return;
-       }
-
-       distribute_message(watching_apps_all, msg);
-}
-
-static void sub_bridge_enter_handler(void *data,
-               struct stasis_subscription *sub,
-               struct stasis_topic *topic,
-               struct stasis_message *message)
-{
-       RAII_VAR(struct ao2_container *, watching_apps_channel, NULL, ao2_cleanup);
-       RAII_VAR(struct ao2_container *, watching_apps_bridge, NULL, ao2_cleanup);
-       RAII_VAR(struct ao2_container *, watching_apps_all, ao2_container_alloc(1, NULL, NULL), ao2_cleanup);
-       struct ast_bridge_blob *obj = stasis_message_data(message);
-       RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
-
-       watching_apps_bridge = get_apps_watching_bridge(obj->bridge->uniqueid);
-       if (watching_apps_bridge) {
-               update_apps_list(watching_apps_all, watching_apps_bridge);
-       }
-
-       watching_apps_channel = get_apps_watching_channel(obj->channel->uniqueid);
-       if (watching_apps_channel) {
-               update_bridge_interest(watching_apps_channel, obj->bridge->uniqueid);
-               update_apps_list(watching_apps_all, watching_apps_channel);
-       }
-
-       if (!ao2_container_count(watching_apps_all)) {
-               return;
-       }
-
-       msg = simple_bridge_channel_event("ChannelEnteredBridge", obj->bridge,
-               obj->channel, stasis_message_timestamp(message));
-
-       distribute_message(watching_apps_all, msg);
-}
-
-static void sub_bridge_leave_handler(void *data,
-               struct stasis_subscription *sub,
-               struct stasis_topic *topic,
-               struct stasis_message *message)
-{
-       RAII_VAR(struct ao2_container *, watching_apps_bridge, NULL, ao2_cleanup);
-       struct ast_bridge_blob *obj = stasis_message_data(message);
-       RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
-
-       watching_apps_bridge = get_apps_watching_bridge(obj->bridge->uniqueid);
-       if (!watching_apps_bridge) {
-               return;
-       }
-
-       msg = simple_bridge_channel_event("ChannelLeftBridge", obj->bridge,
-               obj->channel, stasis_message_timestamp(message));
-
-       distribute_message(watching_apps_bridge, msg);
-}
-
 static int load_module(void)
 {
-       int r = 0;
-
-       apps_registry =
-               ao2_container_alloc(APPS_NUM_BUCKETS, app_hash, app_compare);
+       apps_registry = ao2_container_alloc(APPS_NUM_BUCKETS, app_hash,
+               app_compare);
        if (apps_registry == NULL) {
                return AST_MODULE_LOAD_FAILURE;
        }
 
-       app_controls = ao2_container_alloc(CONTROLS_NUM_BUCKETS,
-                                            control_hash, control_compare);
+       app_controls = ao2_container_alloc(CONTROLS_NUM_BUCKETS, control_hash,
+               control_compare);
        if (app_controls == NULL) {
                return AST_MODULE_LOAD_FAILURE;
        }
 
-       app_bridges = ao2_container_alloc(CONTROLS_NUM_BUCKETS,
-                                            bridges_hash, bridges_compare);
-       if (app_bridges == NULL) {
-               return AST_MODULE_LOAD_FAILURE;
-       }
+        app_bridges = ao2_container_alloc(BRIDGES_NUM_BUCKETS, bridges_hash,
+               bridges_compare);
 
        app_bridges_moh = ao2_container_alloc_hash(
                AO2_ALLOC_OPT_LOCK_MUTEX, AO2_CONTAINER_ALLOC_OPT_DUPS_REJECT,
@@ -1246,52 +818,11 @@ static int load_module(void)
                return AST_MODULE_LOAD_FAILURE;
        }
 
-       channel_router = stasis_message_router_create(ast_channel_topic_all_cached());
-       if (!channel_router) {
-               return AST_MODULE_LOAD_FAILURE;
-       }
-
-       r |= stasis_message_router_add(channel_router, stasis_cache_update_type(), sub_channel_snapshot_handler, NULL);
-       /* TODO: This could be handled a lot better. Instead of subscribing to
-        * the one caching topic and filtering out messages by channel id, we
-        * should have individual caching topics per-channel, with a shared
-        * back-end cache. That would simplify a lot of what's going on right
-        * here.
-        */
-       r |= stasis_message_router_add(channel_router, ast_channel_user_event_type(), sub_channel_blob_handler, NULL);
-       r |= stasis_message_router_add(channel_router, ast_channel_varset_type(), sub_channel_blob_handler, NULL);
-       r |= stasis_message_router_add(channel_router, ast_channel_dtmf_end_type(), sub_channel_blob_handler, NULL);
-       r |= stasis_message_router_add(channel_router, ast_channel_hangup_request_type(), sub_channel_blob_handler, NULL);
-       if (r) {
-               return AST_MODULE_LOAD_FAILURE;
-       }
-
-       bridge_router = stasis_message_router_create(ast_bridge_topic_all_cached());
-       if (!bridge_router) {
-               return AST_MODULE_LOAD_FAILURE;
-       }
-
-       r |= stasis_message_router_add(bridge_router, stasis_cache_update_type(), sub_bridge_snapshot_handler, NULL);
-       r |= stasis_message_router_add(bridge_router, ast_bridge_merge_message_type(), sub_bridge_merge_handler, NULL);
-       r |= stasis_message_router_add(bridge_router, ast_channel_entered_bridge_type(), sub_bridge_enter_handler, NULL);
-       r |= stasis_message_router_add(bridge_router, ast_channel_left_bridge_type(), sub_bridge_leave_handler, NULL);
-       if (r) {
-               return AST_MODULE_LOAD_FAILURE;
-       }
-
        return AST_MODULE_LOAD_SUCCESS;
 }
 
 static int unload_module(void)
 {
-       int r = 0;
-
-       stasis_message_router_unsubscribe_and_join(channel_router);
-       channel_router = NULL;
-
-       stasis_message_router_unsubscribe_and_join(bridge_router);
-       bridge_router = NULL;
-
        ao2_cleanup(apps_registry);
        apps_registry = NULL;
 
@@ -1304,7 +835,7 @@ static int unload_module(void)
        ao2_cleanup(app_bridges_moh);
        app_bridges_moh = NULL;
 
-       return r;
+       return 0;
 }
 
 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS, "Stasis application support",
index 6f80ed6..8abe0c1 100644 (file)
@@ -29,132 +29,519 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
 
 #include "app.h"
 
+#include "asterisk/callerid.h"
 #include "asterisk/stasis_app.h"
+#include "asterisk/stasis_bridges.h"
 #include "asterisk/stasis_channels.h"
-
-/*!
- * \brief Number of buckets for the channels container for app instances.  Remember
- * to keep it a prime number!
- */
-#define APP_CHANNELS_BUCKETS 7
-
-/*!
- * \brief Number of buckets for the bridges container for app instances.  Remember
- * to keep it a prime number!
- */
-#define APP_BRIDGES_BUCKETS 7
+#include "asterisk/stasis_message_router.h"
 
 struct app {
+       /*! Aggregation topic for this application. */
+       struct stasis_topic *topic;
+       /*! Router for handling messages forwarded to \a topic. */
+       struct stasis_message_router *router;
+       /*! Subscription to watch for bridge merge messages */
+       struct stasis_subscription *bridge_merge_sub;
+       /*! Container of the channel forwards to this app's topic. */
+       struct ao2_container *forwards;
        /*! Callback function for this application. */
        stasis_app_cb handler;
        /*! Opaque data to hand to callback function. */
        void *data;
-       /*! List of channel identifiers this app instance is interested in */
-       struct ao2_container *channels;
-       /*! List of bridge identifiers this app instance owns */
-       struct ao2_container *bridges;
        /*! Name of the Stasis application */
        char name[];
 };
 
+/*! Subscription info for a particular channel/bridge. */
+struct app_forwards {
+       /*! Count of number of times this channel/bridge has been subscribed */
+       int interested;
+
+       /*! Forward for the regular topic */
+       struct stasis_subscription *topic_forward;
+       /*! Forward for the caching topic */
+       struct stasis_subscription *topic_cached_forward;
+
+       /*! Unique id of the object being forwarded */
+       char id[];
+};
+
+static void forwards_dtor(void *obj)
+{
+       struct app_forwards *forwards = obj;
+
+       ast_assert(forwards->topic_forward == NULL);
+       ast_assert(forwards->topic_cached_forward == NULL);
+}
+
+static void forwards_unsubscribe(struct app_forwards *forwards)
+{
+       stasis_unsubscribe(forwards->topic_forward);
+       forwards->topic_forward = NULL;
+       stasis_unsubscribe(forwards->topic_cached_forward);
+       forwards->topic_cached_forward = NULL;
+}
+
+static struct app_forwards *forwards_create(struct app *app,
+       const char *id)
+{
+       RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+
+       if (!app || ast_strlen_zero(id)) {
+               return NULL;
+       }
+
+       forwards = ao2_alloc(sizeof(*forwards) + strlen(id) + 1, forwards_dtor);
+       if (!forwards) {
+               return NULL;
+       }
+
+       strcpy(forwards->id, id);
+
+       ao2_ref(forwards, +1);
+       return forwards;
+}
+
+/*! Forward a channel's topics to an app */
+static struct app_forwards *forwards_create_channel(struct app *app,
+       struct ast_channel *chan)
+{
+       RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+
+       if (!app || !chan) {
+               return NULL;
+       }
+
+       forwards = forwards_create(app, ast_channel_uniqueid(chan));
+       if (!forwards) {
+               return NULL;
+       }
+
+       forwards->topic_forward = stasis_forward_all(ast_channel_topic(chan),
+               app->topic);
+       if (!forwards->topic_forward) {
+               return NULL;
+       }
+
+       forwards->topic_cached_forward = stasis_forward_all(
+               ast_channel_topic_cached(chan), app->topic);
+       if (!forwards->topic_cached_forward) {
+               /* Half-subscribed is a bad thing */
+               stasis_unsubscribe(forwards->topic_forward);
+               forwards->topic_forward = NULL;
+               return NULL;
+       }
+
+       ao2_ref(forwards, +1);
+       return forwards;
+}
+
+/*! Forward a bridge's topics to an app */
+static struct app_forwards *forwards_create_bridge(struct app *app,
+       struct ast_bridge *bridge)
+{
+       RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+
+       if (!app || !bridge) {
+               return NULL;
+       }
+
+       forwards = forwards_create(app, bridge->uniqueid);
+       if (!forwards) {
+               return NULL;
+       }
+
+       forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge),
+               app->topic);
+       if (!forwards->topic_forward) {
+               return NULL;
+       }
+
+       forwards->topic_cached_forward = stasis_forward_all(
+               ast_bridge_topic_cached(bridge), app->topic);
+       if (!forwards->topic_cached_forward) {
+               /* Half-subscribed is a bad thing */
+               stasis_unsubscribe(forwards->topic_forward);
+               forwards->topic_forward = NULL;
+               return NULL;
+       }
+
+       ao2_ref(forwards, +1);
+       return forwards;
+}
+
+static int forwards_sort(const void *obj_left, const void *obj_right, int flags)
+{
+    const struct app_forwards *object_left = obj_left;
+    const struct app_forwards *object_right = obj_right;
+    const char *right_key = obj_right;
+    int cmp;
+
+    switch (flags & (OBJ_POINTER | OBJ_KEY | OBJ_PARTIAL_KEY)) {
+    case OBJ_POINTER:
+        right_key = object_right->id;
+        /* Fall through */
+    case OBJ_KEY:
+        cmp = strcmp(object_left->id, right_key);
+        break;
+    case OBJ_PARTIAL_KEY:
+        /*
+         * We could also use a partial key struct containing a length
+         * so strlen() does not get called for every comparison instead.
+         */
+        cmp = strncmp(object_left->id, right_key, strlen(right_key));
+        break;
+    default:
+        /* Sort can only work on something with a full or partial key. */
+        ast_assert(0);
+        cmp = 0;
+        break;
+    }
+    return cmp;
+}
+
 static void app_dtor(void *obj)
 {
        struct app *app = obj;
 
+       ast_verb(1, "Destroying Stasis app %s\n", app->name);
+
+       ast_assert(app->router == NULL);
+       ast_assert(app->bridge_merge_sub == NULL);
+
+       ao2_cleanup(app->topic);
+       app->topic = NULL;
+       ao2_cleanup(app->forwards);
+       app->forwards = NULL;
        ao2_cleanup(app->data);
        app->data = NULL;
-       ao2_cleanup(app->channels);
-       app->channels = NULL;
-       ao2_cleanup(app->bridges);
-       app->bridges = NULL;
 }
 
-struct app *app_create(const char *name, stasis_app_cb handler, void *data)
+static void sub_default_handler(void *data, struct stasis_subscription *sub,
+       struct stasis_topic *topic, struct stasis_message *message)
 {
-       RAII_VAR(struct app *, app, NULL, ao2_cleanup);
-       size_t size;
+       struct app *app = data;
+       RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
 
-       ast_assert(name != NULL);
-       ast_assert(handler != NULL);
+       if (stasis_subscription_final_message(sub, message)) {
+               ao2_cleanup(app);
+       }
 
-       ast_verb(1, "Creating Stasis app '%s'\n", name);
+       /* By default, send any message that has a JSON representation */
+       json = stasis_message_to_json(message);
+       if (!json) {
+               return;
+       }
 
-       size = sizeof(*app) + strlen(name) + 1;
-       app = ao2_alloc_options(size, app_dtor, AO2_ALLOC_OPT_LOCK_MUTEX);
+       app_send(app, json);
+}
 
-       if (!app) {
-               return NULL;
+/*! \brief Typedef for callbacks that get called on channel snapshot updates */
+typedef struct ast_json *(*channel_snapshot_monitor)(
+       struct ast_channel_snapshot *old_snapshot,
+       struct ast_channel_snapshot *new_snapshot,
+       const struct timeval *tv);
+
+static struct ast_json *simple_channel_event(
+       const char *type,
+       struct ast_channel_snapshot *snapshot,
+       const struct timeval *tv)
+{
+       return ast_json_pack("{s: s, s: o, s: o}",
+               "type", type,
+               "timestamp", ast_json_timeval(*tv, NULL),
+               "channel", ast_channel_snapshot_to_json(snapshot));
+}
+
+static struct ast_json *channel_created_event(
+       struct ast_channel_snapshot *snapshot,
+       const struct timeval *tv)
+{
+       return simple_channel_event("ChannelCreated", snapshot, tv);
+}
+
+static struct ast_json *channel_destroyed_event(
+       struct ast_channel_snapshot *snapshot,
+       const struct timeval *tv)
+{
+       return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
+               "type", "ChannelDestroyed",
+               "timestamp", ast_json_timeval(*tv, NULL),
+               "cause", snapshot->hangupcause,
+               "cause_txt", ast_cause2str(snapshot->hangupcause),
+               "channel", ast_channel_snapshot_to_json(snapshot));
+}
+
+static struct ast_json *channel_state_change_event(
+       struct ast_channel_snapshot *snapshot,
+       const struct timeval *tv)
+{
+       return simple_channel_event("ChannelStateChange", snapshot, tv);
+}
+
+/*! \brief Handle channel state changes */
+static struct ast_json *channel_state(
+       struct ast_channel_snapshot *old_snapshot,
+       struct ast_channel_snapshot *new_snapshot,
+       const struct timeval *tv)
+{
+       struct ast_channel_snapshot *snapshot = new_snapshot ?
+               new_snapshot : old_snapshot;
+
+       if (!old_snapshot) {
+               return channel_created_event(snapshot, tv);
+       } else if (!new_snapshot) {
+               return channel_destroyed_event(snapshot, tv);
+       } else if (old_snapshot->state != new_snapshot->state) {
+               return channel_state_change_event(snapshot, tv);
        }
 
-       strncpy(app->name, name, size - sizeof(*app));
-       app->handler = handler;
-       ao2_ref(data, +1);
-       app->data = data;
+       return NULL;
+}
 
-       app->channels = ast_str_container_alloc(APP_CHANNELS_BUCKETS);
-       if (!app->channels) {
+static struct ast_json *channel_dialplan(
+       struct ast_channel_snapshot *old_snapshot,
+       struct ast_channel_snapshot *new_snapshot,
+       const struct timeval *tv)
+{
+       RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
+
+       /* No Newexten event on cache clear or first event */
+       if (!old_snapshot || !new_snapshot) {
                return NULL;
        }
 
-       app->bridges = ast_str_container_alloc(APP_BRIDGES_BUCKETS);
-       if (!app->bridges) {
+       /* Empty application is not valid for a Newexten event */
+       if (ast_strlen_zero(new_snapshot->appl)) {
                return NULL;
        }
 
-       ao2_ref(app, +1);
-       return app;
+       if (ast_channel_snapshot_cep_equal(old_snapshot, new_snapshot)) {
+               return NULL;
+       }
+
+       return ast_json_pack("{s: s, s: o, s: s, s: s, s: o}",
+               "type", "ChannelDialplan",
+               "timestamp", ast_json_timeval(*tv, NULL),
+               "dialplan_app", new_snapshot->appl,
+               "dialplan_app_data", new_snapshot->data,
+               "channel", ast_channel_snapshot_to_json(new_snapshot));
 }
 
-int app_add_channel(struct app *app, const struct ast_channel *chan)
+static struct ast_json *channel_callerid(
+       struct ast_channel_snapshot *old_snapshot,
+       struct ast_channel_snapshot *new_snapshot,
+       const struct timeval *tv)
 {
-       SCOPED_AO2LOCK(lock, app);
-       const char *uniqueid;
+       RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
 
-       ast_assert(app != NULL);
-       ast_assert(chan != NULL);
+       /* No NewCallerid event on cache clear or first event */
+       if (!old_snapshot || !new_snapshot) {
+               return NULL;
+       }
 
-       /* Don't accept new channels in an inactive application */
-       if (!app->handler) {
-               return -1;
+       if (ast_channel_snapshot_caller_id_equal(old_snapshot, new_snapshot)) {
+               return NULL;
        }
 
-       uniqueid = ast_channel_uniqueid(chan);
-       return ast_str_container_add(app->channels, uniqueid) ? -1 : 0;
+       return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
+               "type", "ChannelCallerId",
+               "timestamp", ast_json_timeval(*tv, NULL),
+               "caller_presentation", new_snapshot->caller_pres,
+               "caller_presentation_txt", ast_describe_caller_presentation(
+                       new_snapshot->caller_pres),
+               "channel", ast_channel_snapshot_to_json(new_snapshot));
 }
 
-void app_remove_channel(struct app* app, const struct ast_channel *chan)
+static channel_snapshot_monitor channel_monitors[] = {
+       channel_state,
+       channel_dialplan,
+       channel_callerid
+};
+
+static void sub_channel_update_handler(void *data,
+                struct stasis_subscription *sub,
+                struct stasis_topic *topic,
+                struct stasis_message *message)
 {
-       SCOPED_AO2LOCK(lock, app);
+       struct app *app = data;
+        struct stasis_cache_update *update;
+        struct ast_channel_snapshot *new_snapshot;
+        struct ast_channel_snapshot *old_snapshot;
+        const struct timeval *tv;
+        int i;
+
+       ast_assert(stasis_message_type(message) == stasis_cache_update_type());
+
+        update = stasis_message_data(message);
+
+       ast_assert(update->type == ast_channel_snapshot_type());
+
+        new_snapshot = stasis_message_data(update->new_snapshot);
+        old_snapshot = stasis_message_data(update->old_snapshot);
+
+        /* Pull timestamp from the new snapshot, or from the update message
+         * when there isn't one. */
+        tv = update->new_snapshot ?
+               stasis_message_timestamp(update->new_snapshot) :
+               stasis_message_timestamp(message);
 
-       ast_assert(app != NULL);
-       ast_assert(chan != NULL);
+        for (i = 0; i < ARRAY_LEN(channel_monitors); ++i) {
+                RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
 
-       ao2_find(app->channels, ast_channel_uniqueid(chan), OBJ_KEY | OBJ_NODATA | OBJ_UNLINK);
+                msg = channel_monitors[i](old_snapshot, new_snapshot, tv);
+                if (msg) {
+                        app_send(app, msg);
+                }
+        }
 }
 
-int app_add_bridge(struct app *app, const char *uniqueid)
+static struct ast_json *simple_bridge_event(
+        const char *type,
+        struct ast_bridge_snapshot *snapshot,
+        const struct timeval *tv)
 {
-       SCOPED_AO2LOCK(lock, app);
+        return ast_json_pack("{s: s, s: o, s: o}",
+                "type", type,
+                "timestamp", ast_json_timeval(*tv, NULL),
+                "bridge", ast_bridge_snapshot_to_json(snapshot));
+}
 
-       ast_assert(app != NULL);
-       ast_assert(uniqueid != NULL);
+static void sub_bridge_update_handler(void *data,
+                struct stasis_subscription *sub,
+                struct stasis_topic *topic,
+                struct stasis_message *message)
+{
+        RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
+       struct app *app = data;
+        struct stasis_cache_update *update;
+        struct ast_bridge_snapshot *new_snapshot;
+        struct ast_bridge_snapshot *old_snapshot;
+        const struct timeval *tv;
 
-       /* Don't accept new bridges in an inactive application */
-       if (!app->handler) {
-               return -1;
+       ast_assert(stasis_message_type(message) == stasis_cache_update_type());
+
+       update = stasis_message_data(message);
+
+       ast_assert(update->type == ast_bridge_snapshot_type());
+
+       new_snapshot = stasis_message_data(update->new_snapshot);
+       old_snapshot = stasis_message_data(update->old_snapshot);
+       tv = update->new_snapshot ?
+               stasis_message_timestamp(update->new_snapshot) :
+               stasis_message_timestamp(message);
+
+        if (!new_snapshot) {
+                json = simple_bridge_event("BridgeDestroyed", old_snapshot, tv);
+        } else if (!old_snapshot) {
+                json = simple_bridge_event("BridgeCreated", old_snapshot, tv);
+        }
+
+        if (!json) {
+                return;
+        }
+
+        app_send(app, json);
+}
+
+static void bridge_merge_handler(void *data, struct stasis_subscription *sub,
+       struct stasis_topic *topic, struct stasis_message *message)
+{
+       struct app *app = data;
+       struct ast_bridge_merge_message *merge;
+       RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+
+       if (stasis_subscription_final_message(sub, message)) {
+               ao2_cleanup(app);
        }
 
-       return ast_str_container_add(app->bridges, uniqueid) ? -1 : 0;
+       if (stasis_message_type(message) != ast_bridge_merge_message_type()) {
+               return;
+       }
+
+       merge = stasis_message_data(message);
+
+       /* Find out if we're subscribed to either bridge */
+       forwards = ao2_find(app->forwards, merge->from->uniqueid,
+               OBJ_SEARCH_KEY);
+       if (!forwards) {
+               forwards = ao2_find(app->forwards, merge->to->uniqueid,
+                       OBJ_SEARCH_KEY);
+       }
+
+       if (!forwards) {
+               return;
+       }
+
+       /* Forward the message to the app */
+       stasis_forward_message(app->topic, topic, message);
 }
 
-void app_remove_bridge(struct app* app, const char *uniqueid)
+struct app *app_create(const char *name, stasis_app_cb handler, void *data)
 {
-       SCOPED_AO2LOCK(lock, app);
+       RAII_VAR(struct app *, app, NULL, ao2_cleanup);
+       size_t size;
+       int res = 0;
+
+       ast_assert(name != NULL);
+       ast_assert(handler != NULL);
+
+       ast_verb(1, "Creating Stasis app '%s'\n", name);
+
+       size = sizeof(*app) + strlen(name) + 1;
+       app = ao2_alloc_options(size, app_dtor, AO2_ALLOC_OPT_LOCK_MUTEX);
+
+       if (!app) {
+               return NULL;
+       }
+
+       app->forwards = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_MUTEX,
+               AO2_CONTAINER_ALLOC_OPT_DUPS_OBJ_REJECT,
+               forwards_sort, NULL);
+       if (!app->forwards) {
+               return NULL;
+       }
+
+       app->topic = stasis_topic_create(name);
+       if (!app->topic) {
+               return NULL;
+       }
+
+       app->bridge_merge_sub = stasis_subscribe(ast_bridge_topic_all(),
+               bridge_merge_handler, app);
+       if (!app->bridge_merge_sub) {
+               return NULL;
+       }
+       /* Subscription holds a reference */
+       ao2_ref(app, +1);
+
+       app->router = stasis_message_router_create(app->topic);
+       if (!app->router) {
+               return NULL;
+       }
+
+        res |= stasis_message_router_add_cache_update(app->router,
+               ast_bridge_snapshot_type(), sub_bridge_update_handler, app);
+
+        res |= stasis_message_router_add_cache_update(app->router,
+               ast_channel_snapshot_type(), sub_channel_update_handler, app);
+
+       res |= stasis_message_router_set_default(app->router,
+               sub_default_handler, app);
 
-       ast_assert(app != NULL);
-       ast_assert(uniqueid != NULL);
+       if (res != 0) {
+               return NULL;
+       }
+       /* Router holds a reference */
+       ao2_ref(app, +1);
+
+       strncpy(app->name, name, size - sizeof(*app));
+       app->handler = handler;
+       ao2_ref(data, +1);
+       app->data = data;
 
-       ao2_find(app->bridges, uniqueid, OBJ_KEY | OBJ_NODATA | OBJ_UNLINK | OBJ_MULTIPLE);
+       ao2_ref(app, +1);
+       return app;
 }
 
 /*!
@@ -196,6 +583,18 @@ void app_deactivate(struct app *app)
        app->data = NULL;
 }
 
+void app_shutdown(struct app *app)
+{
+       SCOPED_AO2LOCK(lock, app);
+
+       ast_assert(app_is_finished(app));
+
+       stasis_message_router_unsubscribe(app->router);
+       app->router = NULL;
+       stasis_unsubscribe(app->bridge_merge_sub);
+       app->bridge_merge_sub = NULL;
+}
+
 int app_is_active(struct app *app)
 {
        SCOPED_AO2LOCK(lock, app);
@@ -206,8 +605,7 @@ int app_is_finished(struct app *app)
 {
        SCOPED_AO2LOCK(lock, app);
 
-       return app->handler == NULL &&
-               ao2_container_count(app->channels) == 0;
+       return app->handler == NULL && ao2_container_count(app->forwards) == 0;
 }
 
 void app_update(struct app *app, stasis_app_cb handler, void *data)
@@ -229,7 +627,6 @@ void app_update(struct app *app, stasis_app_cb handler, void *data)
                ast_verb(1, "Activating Stasis app '%s'\n", app->name);
        }
 
-
        app->handler = handler;
        ao2_cleanup(app->data);
        if (data) {
@@ -243,16 +640,100 @@ const char *app_name(const struct app *app)
        return app->name;
 }
 
-int app_is_watching_channel(struct app *app, const char *uniqueid)
+int app_subscribe_channel(struct app *app, struct ast_channel *chan)
 {
-       RAII_VAR(char *, found, NULL, ao2_cleanup);
-       found = ao2_find(app->channels, uniqueid, OBJ_KEY);
-       return found != NULL;
+       int res;
+
+       if (!app || !chan) {
+               return -1;
+       } else {
+               RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+               SCOPED_AO2LOCK(lock, app->forwards);
+
+               forwards = ao2_find(app->forwards, ast_channel_uniqueid(chan),
+                       OBJ_SEARCH_KEY | OBJ_NOLOCK);
+               if (!forwards) {
+                       /* Forwards not found, create one */
+                       forwards = forwards_create_channel(app, chan);
+                       if (!forwards) {
+                               return -1;
+                       }
+
+                       res = ao2_link_flags(app->forwards, forwards,
+                               OBJ_NOLOCK);
+                       if (!res) {
+                               return -1;
+                       }
+               }
+
+               ++forwards->interested;
+               return 0;
+       }
+}
+
+static int unsubscribe(struct app *app, const char *kind, const char *id)
+{
+       RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+       SCOPED_AO2LOCK(lock, app->forwards);
+
+       forwards = ao2_find(app->forwards, id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+       if (!forwards) {
+               ast_log(LOG_ERROR,
+                       "App '%s' not subscribed to %s '%s'",
+                       app->name, kind, id);
+               return -1;
+       }
+
+       if (--forwards->interested == 0) {
+               /* No one is interested any more; unsubscribe */
+               forwards_unsubscribe(forwards);
+               ao2_find(app->forwards, forwards,
+                       OBJ_POINTER | OBJ_NOLOCK | OBJ_UNLINK |
+                       OBJ_NODATA);
+       }
+
+       return 0;
 }
 
-int app_is_watching_bridge(struct app *app, const char *uniqueid)
+int app_unsubscribe_channel(struct app *app, struct ast_channel *chan)
 {
-       RAII_VAR(char *, found, NULL, ao2_cleanup);
-       found = ao2_find(app->bridges, uniqueid, OBJ_KEY);
-       return found != NULL;
+       if (!app || !chan) {
+               return -1;
+       }
+
+       return unsubscribe(app, "channel", ast_channel_uniqueid(chan));
+}
+
+int app_subscribe_bridge(struct app *app, struct ast_bridge *bridge)
+{
+       if (!app || !bridge) {
+               return -1;
+       } else {
+               RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+               SCOPED_AO2LOCK(lock, app->forwards);
+
+               forwards = ao2_find(app->forwards, bridge->uniqueid,
+                       OBJ_SEARCH_KEY | OBJ_NOLOCK);
+
+               if (!forwards) {
+                       /* Forwards not found, create one */
+                       forwards = forwards_create_bridge(app, bridge);
+                       if (!forwards) {
+                               return -1;
+                       }
+                       ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
+               }
+
+               ++forwards->interested;
+               return 0;
+       }
+}
+
+int app_unsubscribe_bridge(struct app *app, struct ast_bridge *bridge)
+{
+       if (!app || !bridge) {
+               return -1;
+       }
+
+       return unsubscribe(app, "bridge", bridge->uniqueid);
 }
index 0cf9221..5f9f1d7 100644 (file)
@@ -48,6 +48,15 @@ struct app;
 struct app *app_create(const char *name, stasis_app_cb handler, void *data);
 
 /*!
+ * \brief Tears down an application.
+ *
+ * It should be finished before calling this.
+ *
+ * \param app Application to unsubscribe.
+ */
+void app_shutdown(struct app *app);
+
+/*!
  * \brief Deactivates an application.
  *
  * Any channels currently in the application remain active (since the app might
@@ -96,17 +105,6 @@ void app_update(struct app *app, stasis_app_cb handler, void *data);
 const char *app_name(const struct app *app);
 
 /*!
- * \brief Subscribe an application to a topic.
- *
- * \param app Application.
- * \param topic Topic to subscribe to.
- * \return New subscription.
- * \return \c NULL on error.
- */
-struct stasis_subscription *app_subscribe(struct app *app,
-       struct stasis_topic *topic);
-
-/*!
  * \brief Send a message to an application.
  *
  * \param app Application.
@@ -114,83 +112,44 @@ struct stasis_subscription *app_subscribe(struct app *app,
  */
 void app_send(struct app *app, struct ast_json *message);
 
-/*!
- * \brief Send the start message to an application.
- *
- * \param app Application.
- * \param chan The channel entering the application.
- * \param argc The number of arguments for the application.
- * \param argv The arguments for the application.
- * \return 0 on success.
- * \return Non-zero on error.
- */
-int app_send_start_msg(struct app *app, struct ast_channel *chan, int argc,
-       char *argv[]);
+struct app_forwards;
 
 /*!
- * \brief Send the end message to an application.
+ * \brief Subscribes an application to a channel.
  *
  * \param app Application.
- * \param chan The channel leaving the application.
+ * \param chan Channel to subscribe to.
  * \return 0 on success.
  * \return Non-zero on error.
  */
-int app_send_end_msg(struct app *app, struct ast_channel *chan);
+int app_subscribe_channel(struct app *app, struct ast_channel *chan);
 
 /*!
- * \brief Checks if an application is watching a given channel.
+ * \brief Cancel the subscription an app has for a channel.
  *
- * \param app Application.
- * \param uniqueid Uniqueid of the channel to check about.
- * \return True (non-zero) if \a app is watching channel with given \a uniqueid
- * \return False (zero) if \a app isn't.
+ * \param app Subscribing application.
+ * \param forwards Returned object from app_subscribe_channel().
  */
-int app_is_watching_channel(struct app *app, const char *uniqueid);
+int app_unsubscribe_channel(struct app *app, struct ast_channel *chan);
 
 /*!
- * \brief Add a channel to an application's watch list.
+ * \brief Add a bridge subscription to an existing channel subscription.
  *
  * \param app Application.
- * \param chan Channel to watch.
+ * \param bridge Bridge to subscribe to.
  * \return 0 on success.
  * \return Non-zero on error.
  */
-int app_add_channel(struct app *app, const struct ast_channel *chan);
+int app_subscribe_bridge(struct app *app, struct ast_bridge *bridge);
 
 /*!
- * \brief Remove a channel from an application's watch list.
+ * \brief Cancel the bridge subscription for an application.
  *
- * \param app Application.
- * \param chan Channel to watch.
- */
-void app_remove_channel(struct app *app, const struct ast_channel *chan);
-
-/*!
- * \brief Add a bridge to an application's watch list by uniqueid.
- *
- * \param app Application.
- * \param bridge Bridge to watch.
+ * \param forwards Return from app_subscribe_channel().
+ * \param bridge Bridge to subscribe to.
  * \return 0 on success.
  * \return Non-zero on error.
  */
-int app_add_bridge(struct app *app, const char *uniqueid);
-
-/*!
- * \brief Remove a bridge from an application's watch list by uniqueid.
- *
- * \param app Application.
- * \param bridge Bridge to remove.
- */
-void app_remove_bridge(struct app* app, const char *uniqueid);
-
-/*!
- * \brief Checks if an application is watching a given bridge.
- *
- * \param app Application.
- * \param uniqueid Uniqueid of the bridge to check.
- * \return True (non-zero) if \a app is watching bridge with given \a uniqueid
- * \return False (zero) if \a app isn't.
- */
-int app_is_watching_bridge(struct app *app, const char *uniqueid);
+int app_unsubscribe_bridge(struct app *app, struct ast_bridge *bridge);
 
 #endif /* _ASTERISK_RES_STASIS_APP_H */
index 59c59e9..aabd728 100644 (file)
                                goto fin;
                        }
 
-                       args.{{c_name}}_count = ast_app_separate_args(
-                               args.{{c_name}}_parse, ',', vals, ARRAY_LEN(vals));
+                       if (strlen(args.{{c_name}}_parse) == 0) {
+                               /* ast_app_separate_args can't handle "" */
+                               args.{{c_name}}_count = 1;
+                               vals[0] = args.{{c_name}}_parse;
+                       } else {
+                               args.{{c_name}}_count = ast_app_separate_args(
+                                       args.{{c_name}}_parse, ',', vals,
+                                       ARRAY_LEN(vals));
+                       }
+
                        if (args.{{c_name}}_count == 0) {
                                ast_ari_response_alloc_failed(response);
                                goto fin;
index 906d55f..e6b2a88 100644 (file)
@@ -174,14 +174,16 @@ fin: __attribute__((unused))
                 * negotiation. Param parsing should happen earlier, but we
                 * need a way to pass it through the WebSocket code to the
                 * callback */
-               RAII_VAR(char *, msg, NULL, ast_free);
+               RAII_VAR(char *, msg, NULL, ast_json_free);
                if (response->message) {
                        msg = ast_json_dump_string(response->message);
                } else {
-                       msg = ast_strdup("?");
+                       ast_log(LOG_ERROR, "Missing response message\n");
+               }
+               if (msg) {
+                       ast_websocket_write(ws_session,
+                               AST_WEBSOCKET_OPCODE_TEXT, msg, strlen(msg));
                }
-               ast_websocket_write(ws_session, AST_WEBSOCKET_OPCODE_TEXT, msg,
-                       strlen(msg));
        }
 {{> param_cleanup}}
 }