ARI: Add ability to raise arbitrary User Events
authorScott Griepentrog <sgriepentrog@digium.com>
Thu, 22 May 2014 16:09:51 +0000 (16:09 +0000)
committerScott Griepentrog <sgriepentrog@digium.com>
Thu, 22 May 2014 16:09:51 +0000 (16:09 +0000)
User events can now be generated from ARI.  Events can be signalled with
arbitrary json variables, and include one or more of channel, bridge, or
endpoint snapshots.  An application must be specified which will receive
the event message (other applications can subscribe to it).  The message
will also be delivered via AMI provided a channel is attached.  Dialplan
generated user event messages are still transmitted via the channel, and
will only be received by a stasis application they are attached to or if
the channel is subscribed to.

This change also introduces the multi object blob mechanism used to send
multiple snapshot types in a single message.  The dialplan app UserEvent
was also changed to use multi object blob, and a new stasis message type
created to handle them.

ASTERISK-22697 #close
Review: https://reviewboard.asterisk.org/r/3494/
........

Merged revisions 414405 from http://svn.asterisk.org/svn/asterisk/branches/12

git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@414406 65c4cc65-6c06-0410-ace0-fbb531ad65f3

17 files changed:
CHANGES
apps/app_userevent.c
include/asterisk/stasis.h
include/asterisk/stasis_app.h
include/asterisk/stasis_channels.h
main/manager_channels.c
main/stasis.c
main/stasis_channels.c
main/stasis_endpoints.c
res/ari/ari_model_validators.c
res/ari/ari_model_validators.h
res/ari/resource_events.c
res/ari/resource_events.h
res/res_ari_events.c
res/res_stasis.c
res/stasis/app.c
rest-api/api-docs/events.json

diff --git a/CHANGES b/CHANGES
index c5e7bad..a67af2c 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -33,6 +33,15 @@ ARI
    a channel's ARI control queue until they are stopped. They also can not
    be rewound or fastforwarded.
 
+ * User events can now be generated from ARI.  Events can be signalled with
+   arbitrary json variables, and include one or more of channel, bridge, or
+   endpoint snapshots.  An application must be specified which will receive
+   the event message (other applications can subscribe to it).  The message
+   will also be delivered via AMI provided a channel is attached.  Dialplan
+   generated user event messages are still transmitted via the channel, and
+   will only be received by a stasis application they are attached to or if
+   the channel is subscribed to.
+
 chan_sip
 -----------
  * SIP peers can now specify 'trust_id_outbound' which affects RPID/PAI
index f5defd4..8f7219e 100644 (file)
@@ -115,7 +115,7 @@ static int userevent_exec(struct ast_channel *chan, const char *data)
        }
 
        ast_channel_lock(chan);
-       ast_channel_publish_blob(chan, ast_channel_user_event_type(), blob);
+       ast_multi_object_blob_single_channel_publish(chan, ast_multi_user_event_type(), blob);
        ast_channel_unlock(chan);
        return 0;
 }
index f5b4a60..4c4052c 100644 (file)
@@ -1032,6 +1032,77 @@ struct ao2_container *stasis_cache_dump_by_eid(struct stasis_cache *cache, struc
  */
 struct ao2_container *stasis_cache_dump_all(struct stasis_cache *cache, struct stasis_message_type *type);
 
+/*!
+ * \brief Object type code for multi user object snapshots
+ */
+enum stasis_user_multi_object_snapshot_type {
+       STASIS_UMOS_CHANNEL = 0,     /*!< Channel Snapshots */
+       STASIS_UMOS_BRIDGE,          /*!< Bridge Snapshots */
+       STASIS_UMOS_ENDPOINT,        /*!< Endpoint Snapshots */
+};
+
+/*! \brief Number of snapshot types */
+#define STASIS_UMOS_MAX (STASIS_UMOS_ENDPOINT + 1)
+
+/*!
+ * \brief Message type for custom user defined events with multi object blobs
+ * \return The stasis_message_type for user event
+ * \since 12.3.0
+ */
+struct stasis_message_type *ast_multi_user_event_type(void);
+
+/*!
+ * \brief Create a stasis multi object blob
+ * \since 12.3.0
+ *
+ * \details
+ * Multi object blob can store a combination of arbitrary json values
+ * (the blob) and also snapshots of various other system objects (such
+ * as channels, bridges, etc) for delivery through a stasis message.
+ * The multi object blob is first created, then optionally objects
+ * are added to it, before being attached to a message and delivered
+ * to stasis topic.
+ *
+ * \param blob Json blob
+ *
+ * \note When used for an ast_multi_user_event_type message, the
+ * json blob should contain at minimum {eventname: name}.
+ *
+ * \retval ast_multi_object_blob* if succeeded
+ * \retval NULL if creation failed
+ */
+struct ast_multi_object_blob *ast_multi_object_blob_create(struct ast_json *blob);
+
+/*!
+ * \brief Add an object to a multi object blob previously created
+ * \since 12.3.0
+ *
+ * \param multi The multi object blob previously created
+ * \param type Type code for the object such as channel, bridge, etc.
+ * \param object Snapshot object of the type supplied to typename
+ *
+ * \return Nothing
+ */
+void ast_multi_object_blob_add(struct ast_multi_object_blob *multi, enum stasis_user_multi_object_snapshot_type type, void *object);
+
+/*!
+ * \brief Create and publish a stasis message blob on a channel with it's snapshot
+ * \since 12.3.0
+ *
+ * \details
+ * For compatibility with app_userevent, this creates a multi object
+ * blob message, attaches the channel snapshot to it, and publishes it
+ * to the channel's topic.
+ *
+ * \param chan The channel to snapshot and publish event to
+ * \param type The message type
+ * \param blob A json blob to publish with the snapshot
+ *
+ * \return Nothing
+ */
+void ast_multi_object_blob_single_channel_publish(struct ast_channel *chan, struct stasis_message_type *type, struct ast_json *blob);
+
+
 /*! @} */
 
 /*! @{ */
index 02c67fd..334155a 100644 (file)
@@ -228,6 +228,33 @@ void stasis_app_unregister_event_source(struct stasis_app_event_source *obj);
  */
 void stasis_app_unregister_event_sources(void);
 
+/*! \brief Return code for stasis_app_user_event */
+enum stasis_app_user_event_res {
+       STASIS_APP_USER_OK,
+       STASIS_APP_USER_APP_NOT_FOUND,
+       STASIS_APP_USER_EVENT_SOURCE_NOT_FOUND,
+       STASIS_APP_USER_EVENT_SOURCE_BAD_SCHEME,
+       STASIS_APP_USER_USEREVENT_INVALID,
+       STASIS_APP_USER_INTERNAL_ERROR,
+};
+
+/*!
+ * \brief Generate a Userevent for stasis app (echo to AMI)
+ *
+ * \param app_name Name of the application to generate event for/to.
+ * \param event_name Name of the Userevent.
+ * \param source_uris URIs for the source objects to attach to event.
+ * \param sources_count Array size of source_uris.
+ * \param userevent_data Custom parameters for the user event
+ * \param userevents_count Array size of userevent_data
+ *
+ * \return \ref stasis_app_user_event_res return code.
+ */
+enum stasis_app_user_event_res stasis_app_user_event(const char *app_name,
+       const char *event_name,
+       const char **source_uris, int sources_count,
+       struct ast_json *json_variables);
+
 
 /*! \brief Return code for stasis_app_[un]subscribe */
 enum stasis_app_subscribe_res {
@@ -591,6 +618,13 @@ void stasis_app_control_publish(
        struct stasis_app_control *control, struct stasis_message *message);
 
 /*!
+ * \brief Returns the stasis topic for an app
+ *
+ * \param app Stasis app to get topic of
+ */
+struct stasis_topic *ast_app_get_topic(struct stasis_app *app);
+
+/*!
  * \brief Queue a control frame without payload.
  *
  * \param control Control to publish to.
index 9c64c1c..4b1da75 100644 (file)
@@ -381,14 +381,6 @@ struct stasis_message_type *ast_channel_varset_type(void);
 
 /*!
  * \since 12
- * \brief Message type for when a custom user event is sent on a channel.
- *
- * \retval A stasis message type
- */
-struct stasis_message_type *ast_channel_user_event_type(void);
-
-/*!
- * \since 12
  * \brief Message type for when a hangup is requested on a channel.
  *
  * \retval A stasis message type
index 507e2c9..9a15353 100644 (file)
@@ -629,54 +629,6 @@ static void channel_snapshot_update(void *data, struct stasis_subscription *sub,
        }
 }
 
-static int userevent_exclusion_cb(const char *key)
-{
-       if (!strcmp("type", key)) {
-               return 1;
-       }
-       if (!strcmp("eventname", key)) {
-               return 1;
-       }
-       return 0;
-}
-
-static void channel_user_event_cb(void *data, struct stasis_subscription *sub,
-       struct stasis_message *message)
-{
-       struct ast_channel_blob *obj = stasis_message_data(message);
-       RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
-       RAII_VAR(struct ast_str *, body, NULL, ast_free);
-       const char *eventname;
-
-       eventname = ast_json_string_get(ast_json_object_get(obj->blob, "eventname"));
-       body = ast_manager_str_from_json_object(obj->blob, userevent_exclusion_cb);
-       channel_event_string = ast_manager_build_channel_state_string(obj->snapshot);
-
-       if (!channel_event_string || !body) {
-               return;
-       }
-
-       /*** DOCUMENTATION
-               <managerEventInstance>
-                       <synopsis>A user defined event raised from the dialplan.</synopsis>
-                       <syntax>
-                               <channel_snapshot/>
-                               <parameter name="UserEvent">
-                                       <para>The event name, as specified in the dialplan.</para>
-                               </parameter>
-                       </syntax>
-                       <see-also>
-                               <ref type="application">UserEvent</ref>
-                       </see-also>
-               </managerEventInstance>
-       ***/
-       manager_event(EVENT_FLAG_USER, "UserEvent",
-                     "%s"
-                     "UserEvent: %s\r\n"
-                     "%s",
-                     ast_str_buffer(channel_event_string), eventname, ast_str_buffer(body));
-}
-
 static void publish_basic_channel_event(const char *event, int class, struct ast_channel_snapshot *snapshot)
 {
        RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
@@ -1160,9 +1112,6 @@ int manager_channels_init(void)
                ast_channel_snapshot_type(), channel_snapshot_update, NULL);
 
        ret |= stasis_message_router_add(message_router,
-               ast_channel_user_event_type(), channel_user_event_cb, NULL);
-
-       ret |= stasis_message_router_add(message_router,
                ast_channel_dtmf_begin_type(), channel_dtmf_begin_cb, NULL);
 
        ret |= stasis_message_router_add(message_router,
index 5eca791..a451e7b 100644 (file)
@@ -38,6 +38,29 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
 #include "asterisk/utils.h"
 #include "asterisk/uuid.h"
 #include "asterisk/vector.h"
+#include "asterisk/stasis_channels.h"
+#include "asterisk/stasis_bridges.h"
+#include "asterisk/stasis_endpoints.h"
+
+/*** DOCUMENTATION
+       <managerEvent language="en_US" name="UserEvent">
+               <managerEventInstance class="EVENT_FLAG_USER">
+                       <synopsis>A user defined event raised from the dialplan.</synopsis>
+                       <syntax>
+                               <channel_snapshot/>
+                               <parameter name="UserEvent">
+                                       <para>The event name, as specified in the dialplan.</para>
+                               </parameter>
+                       </syntax>
+                       <description>
+                               <para>Event may contain additional arbitrary parameters in addition to optional bridge and endpoint snapshots.  Multiple snapshots of the same type are prefixed with a numeric value.</para>
+                       </description>
+                       <see-also>
+                               <ref type="application">UserEvent</ref>
+                       </see-also>
+               </managerEventInstance>
+       </managerEvent>
+***/
 
 /*!
  * \page stasis-impl Stasis Implementation Notes
@@ -974,10 +997,241 @@ void stasis_log_bad_type_access(const char *name)
        ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name);
 }
 
+/*! \brief A multi object blob data structure to carry user event stasis messages */
+struct ast_multi_object_blob {
+       struct ast_json *blob;                             /*< A blob of JSON data */
+       AST_VECTOR(, void *) snapshots[STASIS_UMOS_MAX];   /*< Vector of snapshots for each type */
+};
+
+/*!
+ * \internal
+ * \brief Destructor for \ref ast_multi_object_blob objects
+ */
+static void multi_object_blob_dtor(void *obj)
+{
+       struct ast_multi_object_blob *multi = obj;
+       int type;
+       int i;
+
+       for (type = 0; type < STASIS_UMOS_MAX; ++type) {
+               for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
+                       ao2_cleanup(AST_VECTOR_GET(&multi->snapshots[type], i));
+               }
+               AST_VECTOR_FREE(&multi->snapshots[type]);
+       }
+       ast_json_unref(multi->blob);
+}
+
+/*! \brief Create a stasis user event multi object blob */
+struct ast_multi_object_blob *ast_multi_object_blob_create(struct ast_json *blob)
+{
+       int type;
+       RAII_VAR(struct ast_multi_object_blob *, multi,
+                       ao2_alloc(sizeof(*multi), multi_object_blob_dtor),
+                       ao2_cleanup);
+
+       ast_assert(blob != NULL);
+
+       if (!multi) {
+               return NULL;
+       }
+
+       for (type = 0; type < STASIS_UMOS_MAX; ++type) {
+               if (AST_VECTOR_INIT(&multi->snapshots[type], 0)) {
+                       return NULL;
+               }
+       }
+
+       multi->blob = ast_json_ref(blob);
+
+       ao2_ref(multi, +1);
+       return multi;
+}
+
+/*! \brief Add an object (snapshot) to the blob */
+void ast_multi_object_blob_add(struct ast_multi_object_blob *multi,
+       enum stasis_user_multi_object_snapshot_type type, void *object)
+{
+       if (!multi || !object) {
+               return;
+       }
+       AST_VECTOR_APPEND(&multi->snapshots[type],object);
+}
+
+/*! \brief Publish single channel user event (for app_userevent compatibility) */
+void ast_multi_object_blob_single_channel_publish(struct ast_channel *chan,
+       struct stasis_message_type *type, struct ast_json *blob)
+{
+       RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
+       RAII_VAR(struct ast_channel_snapshot *, channel_snapshot, NULL, ao2_cleanup);
+       RAII_VAR(struct ast_multi_object_blob *, multi, NULL, ao2_cleanup);
+
+       multi = ast_multi_object_blob_create(blob);
+       if (!multi) {
+               return;
+       }
+
+       channel_snapshot = ast_channel_snapshot_create(chan);
+       ao2_ref(channel_snapshot, +1);
+       ast_multi_object_blob_add(multi, STASIS_UMOS_CHANNEL, channel_snapshot);
+
+       message = stasis_message_create(type, multi);
+       if (message) {
+               /* app_userevent still publishes to channel */
+               stasis_publish(ast_channel_topic(chan), message);
+       }
+}
+
+/*! \internal \brief convert multi object blob to ari json */
+static struct ast_json *multi_user_event_to_json(
+       struct stasis_message *message,
+       const struct stasis_message_sanitizer *sanitize)
+{
+       RAII_VAR(struct ast_json *, out, NULL, ast_json_unref);
+       struct ast_multi_object_blob *multi = stasis_message_data(message);
+       struct ast_json *blob = multi->blob;
+       const struct timeval *tv = stasis_message_timestamp(message);
+       enum stasis_user_multi_object_snapshot_type type;
+       int i;
+
+       out = ast_json_object_create();
+       if (!out) {
+               return NULL;
+       }
+
+       ast_json_object_set(out, "type", ast_json_string_create("ChannelUserevent"));
+       ast_json_object_set(out, "timestamp", ast_json_timeval(*tv, NULL));
+       ast_json_object_set(out, "eventname", ast_json_ref(ast_json_object_get(blob, "eventname")));
+       ast_json_object_set(out, "userevent", ast_json_ref(blob)); /* eventname gets duplicated, that's ok */
+
+       for (type = 0; type < STASIS_UMOS_MAX; ++type) {
+               for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
+                       struct ast_json *json_object = NULL;
+                       char *name = NULL;
+                       void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
+
+                       switch (type) {
+                       case STASIS_UMOS_CHANNEL:
+                               json_object = ast_channel_snapshot_to_json(snapshot, sanitize);
+                               name = "channel";
+                               break;
+                       case STASIS_UMOS_BRIDGE:
+                               json_object = ast_bridge_snapshot_to_json(snapshot, sanitize);
+                               name = "bridge";
+                               break;
+                       case STASIS_UMOS_ENDPOINT:
+                               json_object = ast_endpoint_snapshot_to_json(snapshot, sanitize);
+                               name = "endpoint";
+                               break;
+                       }
+                       if (json_object) {
+                               ast_json_object_set(out, name, json_object);
+                       }
+               }
+       }
+       return ast_json_ref(out);
+}
+
+/*! \internal \brief convert multi object blob to ami string */
+static struct ast_str *multi_object_blob_to_ami(void *obj)
+{
+       struct ast_str *ami_str=ast_str_create(1024);
+       struct ast_str *ami_snapshot;
+       const struct ast_multi_object_blob *multi = obj;
+       enum stasis_user_multi_object_snapshot_type type;
+       int i;
+
+       if (!ami_str) {
+               return NULL;
+       }
+       if (!multi) {
+               ast_free(ami_str);
+               return NULL;
+       }
+
+       for (type = 0; type < STASIS_UMOS_MAX; ++type) {
+               for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
+                       char *name = "";
+                       void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
+                       ami_snapshot = NULL;
+
+                       if (i > 0) {
+                               ast_asprintf(&name, "%d", i + 1);
+                       }
+
+                       switch (type) {
+                       case STASIS_UMOS_CHANNEL:
+                               ami_snapshot = ast_manager_build_channel_state_string_prefix(snapshot, name);
+                               break;
+
+                       case STASIS_UMOS_BRIDGE:
+                               ami_snapshot = ast_manager_build_bridge_state_string_prefix(snapshot, name);
+                               break;
+
+                       case STASIS_UMOS_ENDPOINT:
+                               /* currently not sending endpoint snapshots to AMI */
+                               break;
+                       }
+                       if (ami_snapshot) {
+                               ast_str_append(&ami_str, 0, "%s", ast_str_buffer(ami_snapshot));
+                               ast_free(ami_snapshot);
+                       }
+               }
+       }
+
+       return ami_str;
+}
+
+/*! \internal \brief Callback to pass only user defined parameters from blob */
+static int userevent_exclusion_cb(const char *key)
+{
+       if (!strcmp("eventname", key)) {
+               return 1;
+       }
+       return 0;
+}
+
+static struct ast_manager_event_blob *multi_user_event_to_ami(
+       struct stasis_message *message)
+{
+       RAII_VAR(struct ast_str *, object_string, NULL, ast_free);
+       RAII_VAR(struct ast_str *, body, NULL, ast_free);
+       struct ast_multi_object_blob *multi = stasis_message_data(message);
+       const char *eventname;
+
+       eventname = ast_json_string_get(ast_json_object_get(multi->blob, "eventname"));
+       body = ast_manager_str_from_json_object(multi->blob, userevent_exclusion_cb);
+       object_string = multi_object_blob_to_ami(multi);
+       if (!object_string || !body) {
+               return NULL;
+       }
+
+       return ast_manager_event_blob_create(EVENT_FLAG_USER, "UserEvent",
+               "%s"
+               "UserEvent: %s\r\n"
+               "%s",
+               ast_str_buffer(object_string),
+               eventname,
+               ast_str_buffer(body));
+}
+
+
+/*!
+ * @{ \brief Define multi user event message type(s).
+ */
+
+STASIS_MESSAGE_TYPE_DEFN(ast_multi_user_event_type,
+       .to_json = multi_user_event_to_json,
+       .to_ami = multi_user_event_to_ami,
+       );
+
+/*! @} */
+
 /*! \brief Cleanup function for graceful shutdowns */
 static void stasis_cleanup(void)
 {
        STASIS_MESSAGE_TYPE_CLEANUP(stasis_subscription_change_type);
+       STASIS_MESSAGE_TYPE_CLEANUP(ast_multi_user_event_type);
 }
 
 int stasis_init(void)
@@ -995,6 +1249,10 @@ int stasis_init(void)
        if (STASIS_MESSAGE_TYPE_INIT(stasis_subscription_change_type) != 0) {
                return -1;
        }
+       if (STASIS_MESSAGE_TYPE_INIT(ast_multi_user_event_type) != 0) {
+               return -1;
+       }
 
        return 0;
 }
+
index 3522fb2..49fd3f9 100644 (file)
@@ -916,28 +916,6 @@ static struct ast_json *dtmf_end_to_json(
                "channel", json_channel);
 }
 
-static struct ast_json *user_event_to_json(
-       struct stasis_message *message,
-       const struct stasis_message_sanitizer *sanitize)
-{
-       struct ast_channel_blob *channel_blob = stasis_message_data(message);
-       struct ast_json *blob = channel_blob->blob;
-       struct ast_channel_snapshot *snapshot = channel_blob->snapshot;
-       const struct timeval *tv = stasis_message_timestamp(message);
-       struct ast_json *json_channel = ast_channel_snapshot_to_json(snapshot, sanitize);
-
-       if (!json_channel) {
-               return NULL;
-       }
-
-       return ast_json_pack("{s: s, s: o, s: O, s: O, s: o}",
-               "type", "ChannelUserevent",
-               "timestamp", ast_json_timeval(*tv, NULL),
-               "eventname", ast_json_object_get(blob, "eventname"),
-               "userevent", blob,
-               "channel", json_channel);
-}
-
 static struct ast_json *varset_to_json(
        struct stasis_message *message,
        const struct stasis_message_sanitizer *sanitize)
@@ -1007,9 +985,6 @@ STASIS_MESSAGE_TYPE_DEFN(ast_channel_varset_type,
        .to_ami = varset_to_ami,
        .to_json = varset_to_json,
        );
-STASIS_MESSAGE_TYPE_DEFN(ast_channel_user_event_type,
-       .to_json = user_event_to_json,
-       );
 STASIS_MESSAGE_TYPE_DEFN(ast_channel_hangup_request_type,
        .to_json = hangup_request_to_json,
        );
@@ -1048,7 +1023,6 @@ static void stasis_channels_cleanup(void)
        STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_snapshot_type);
        STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dial_type);
        STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_varset_type);
-       STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_user_event_type);
        STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_hangup_request_type);
        STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dtmf_begin_type);
        STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dtmf_end_type);
@@ -1097,7 +1071,6 @@ int ast_stasis_channels_init(void)
 
        res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_dial_type);
        res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_varset_type);
-       res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_user_event_type);
        res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_hangup_request_type);
        res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_dtmf_begin_type);
        res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_dtmf_end_type);
index 3f8d324..e3f5a3f 100644 (file)
@@ -193,7 +193,11 @@ struct ast_endpoint_snapshot *ast_endpoint_latest_snapshot(const char *tech,
        RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
        struct ast_endpoint_snapshot *snapshot;
 
-       ast_asprintf(&id, "%s/%s", tech, name);
+       if (ast_strlen_zero(name)) {
+               ast_asprintf(&id, "%s", tech);
+       } else {
+               ast_asprintf(&id, "%s/%s", tech, name);
+       }
        if (!id) {
                return NULL;
        }
index 5c24b9c..fa38155 100644 (file)
@@ -3076,7 +3076,6 @@ int ast_ari_validate_channel_userevent(struct ast_json *json)
        struct ast_json_iter *iter;
        int has_type = 0;
        int has_application = 0;
-       int has_channel = 0;
        int has_eventname = 0;
        int has_userevent = 0;
 
@@ -3110,9 +3109,17 @@ int ast_ari_validate_channel_userevent(struct ast_json *json)
                                res = 0;
                        }
                } else
+               if (strcmp("bridge", ast_json_object_iter_key(iter)) == 0) {
+                       int prop_is_valid;
+                       prop_is_valid = ast_ari_validate_bridge(
+                               ast_json_object_iter_value(iter));
+                       if (!prop_is_valid) {
+                               ast_log(LOG_ERROR, "ARI ChannelUserevent field bridge failed validation\n");
+                               res = 0;
+                       }
+               } else
                if (strcmp("channel", ast_json_object_iter_key(iter)) == 0) {
                        int prop_is_valid;
-                       has_channel = 1;
                        prop_is_valid = ast_ari_validate_channel(
                                ast_json_object_iter_value(iter));
                        if (!prop_is_valid) {
@@ -3120,6 +3127,15 @@ int ast_ari_validate_channel_userevent(struct ast_json *json)
                                res = 0;
                        }
                } else
+               if (strcmp("endpoint", ast_json_object_iter_key(iter)) == 0) {
+                       int prop_is_valid;
+                       prop_is_valid = ast_ari_validate_endpoint(
+                               ast_json_object_iter_value(iter));
+                       if (!prop_is_valid) {
+                               ast_log(LOG_ERROR, "ARI ChannelUserevent field endpoint failed validation\n");
+                               res = 0;
+                       }
+               } else
                if (strcmp("eventname", ast_json_object_iter_key(iter)) == 0) {
                        int prop_is_valid;
                        has_eventname = 1;
@@ -3158,11 +3174,6 @@ int ast_ari_validate_channel_userevent(struct ast_json *json)
                res = 0;
        }
 
-       if (!has_channel) {
-               ast_log(LOG_ERROR, "ARI ChannelUserevent missing required field channel\n");
-               res = 0;
-       }
-
        if (!has_eventname) {
                ast_log(LOG_ERROR, "ARI ChannelUserevent missing required field eventname\n");
                res = 0;
index 7214a58..a4512a1 100644 (file)
@@ -1278,7 +1278,9 @@ ari_validator ast_ari_validate_application_fn(void);
  * - type: string (required)
  * - application: string (required)
  * - timestamp: Date
- * - channel: Channel (required)
+ * - bridge: Bridge
+ * - channel: Channel
+ * - endpoint: Endpoint
  * - eventname: string (required)
  * - userevent: object (required)
  * ChannelVarset
index 098049f..d159741 100644 (file)
@@ -217,3 +217,59 @@ void ast_ari_websocket_events_event_websocket(struct ast_ari_websocket_session *
                ast_json_unref(msg);
        }
 }
+
+void ast_ari_events_user_event(struct ast_variable *headers,
+       struct ast_ari_events_user_event_args *args,
+       struct ast_ari_response *response)
+{
+       enum stasis_app_user_event_res res;
+       struct ast_json *json_variables = NULL;
+
+       if (args->variables) {
+               ast_ari_events_user_event_parse_body(args->variables, args);
+               json_variables = ast_json_object_get(args->variables, "variables");
+       }
+
+       if (ast_strlen_zero(args->application)) {
+               ast_ari_response_error(response, 400, "Bad Request",
+                       "Missing parameter application");
+               return;
+       }
+
+       res = stasis_app_user_event(args->application,
+               args->event_name,
+               args->source, args->source_count,
+               json_variables);
+
+       switch (res) {
+       case STASIS_APP_USER_OK:
+               ast_ari_response_no_content(response);
+               break;
+
+       case STASIS_APP_USER_APP_NOT_FOUND:
+               ast_ari_response_error(response, 404, "Not Found",
+                       "Application not found");
+               break;
+
+       case STASIS_APP_USER_EVENT_SOURCE_NOT_FOUND:
+               ast_ari_response_error(response, 422, "Unprocessable Entity",
+                       "Event source was not found");
+               break;
+
+       case STASIS_APP_USER_EVENT_SOURCE_BAD_SCHEME:
+               ast_ari_response_error(response, 400, "Bad Request",
+                       "Invalid event source URI scheme");
+               break;
+
+       case STASIS_APP_USER_USEREVENT_INVALID:
+               ast_ari_response_error(response, 400, "Bad Request",
+                       "Invalid userevnet data");
+               break;
+
+       case STASIS_APP_USER_INTERNAL_ERROR:
+       default:
+               ast_ari_response_error(response, 500, "Internal Server Error",
+                       "Error processing request");
+       }
+}
+
index 96ee5b3..0807709 100644 (file)
@@ -56,5 +56,39 @@ struct ast_ari_events_event_websocket_args {
  * \param args Swagger parameters.
  */
 void ast_ari_websocket_events_event_websocket(struct ast_ari_websocket_session *session, struct ast_variable *headers, struct ast_ari_events_event_websocket_args *args);
+/*! Argument struct for ast_ari_events_user_event() */
+struct ast_ari_events_user_event_args {
+       /*! Event name */
+       const char *event_name;
+       /*! The name of the application that will receive this event */
+       const char *application;
+       /*! Array of URI for event source (channel:{channelId}, bridge:{bridgeId}, endpoint:{tech}/{resource}, deviceState:{deviceName} */
+       const char **source;
+       /*! Length of source array. */
+       size_t source_count;
+       /*! Parsing context for source. */
+       char *source_parse;
+       /*! custom key/value pairs added to the user event */
+       struct ast_json *variables;
+};
+/*!
+ * \brief Body parsing function for /events/user/{eventName}.
+ * \param body The JSON body from which to parse parameters.
+ * \param[out] args The args structure to parse into.
+ * \retval zero on success
+ * \retval non-zero on failure
+ */
+int ast_ari_events_user_event_parse_body(
+       struct ast_json *body,
+       struct ast_ari_events_user_event_args *args);
+
+/*!
+ * \brief Generate a user event.
+ *
+ * \param headers HTTP headers
+ * \param args Swagger parameters
+ * \param[out] response HTTP response
+ */
+void ast_ari_events_user_event(struct ast_variable *headers, struct ast_ari_events_user_event_args *args, struct ast_ari_response *response);
 
 #endif /* _ASTERISK_RESOURCE_EVENTS_H */
index f5a3fac..4e56789 100644 (file)
@@ -149,15 +149,203 @@ fin: __attribute__((unused))
        ast_free(args.app_parse);
        ast_free(args.app);
 }
+int ast_ari_events_user_event_parse_body(
+       struct ast_json *body,
+       struct ast_ari_events_user_event_args *args)
+{
+       struct ast_json *field;
+       /* Parse query parameters out of it */
+       field = ast_json_object_get(body, "application");
+       if (field) {
+               args->application = ast_json_string_get(field);
+       }
+       field = ast_json_object_get(body, "source");
+       if (field) {
+               /* If they were silly enough to both pass in a query param and a
+                * JSON body, free up the query value.
+                */
+               ast_free(args->source);
+               if (ast_json_typeof(field) == AST_JSON_ARRAY) {
+                       /* Multiple param passed as array */
+                       size_t i;
+                       args->source_count = ast_json_array_size(field);
+                       args->source = ast_malloc(sizeof(*args->source) * args->source_count);
+
+                       if (!args->source) {
+                               return -1;
+                       }
+
+                       for (i = 0; i < args->source_count; ++i) {
+                               args->source[i] = ast_json_string_get(ast_json_array_get(field, i));
+                       }
+               } else {
+                       /* Multiple param passed as single value */
+                       args->source_count = 1;
+                       args->source = ast_malloc(sizeof(*args->source) * args->source_count);
+                       if (!args->source) {
+                               return -1;
+                       }
+                       args->source[0] = ast_json_string_get(field);
+               }
+       }
+       return 0;
+}
+
+/*!
+ * \brief Parameter parsing callback for /events/user/{eventName}.
+ * \param get_params GET parameters in the HTTP request.
+ * \param path_vars Path variables extracted from the request.
+ * \param headers HTTP headers.
+ * \param[out] response Response to the HTTP request.
+ */
+static void ast_ari_events_user_event_cb(
+       struct ast_tcptls_session_instance *ser,
+       struct ast_variable *get_params, struct ast_variable *path_vars,
+       struct ast_variable *headers, struct ast_ari_response *response)
+{
+       struct ast_ari_events_user_event_args args = {};
+       struct ast_variable *i;
+       RAII_VAR(struct ast_json *, body, NULL, ast_json_unref);
+#if defined(AST_DEVMODE)
+       int is_valid;
+       int code;
+#endif /* AST_DEVMODE */
+
+       for (i = get_params; i; i = i->next) {
+               if (strcmp(i->name, "application") == 0) {
+                       args.application = (i->value);
+               } else
+               if (strcmp(i->name, "source") == 0) {
+                       /* Parse comma separated list */
+                       char *vals[MAX_VALS];
+                       size_t j;
+
+                       args.source_parse = ast_strdup(i->value);
+                       if (!args.source_parse) {
+                               ast_ari_response_alloc_failed(response);
+                               goto fin;
+                       }
+
+                       if (strlen(args.source_parse) == 0) {
+                               /* ast_app_separate_args can't handle "" */
+                               args.source_count = 1;
+                               vals[0] = args.source_parse;
+                       } else {
+                               args.source_count = ast_app_separate_args(
+                                       args.source_parse, ',', vals,
+                                       ARRAY_LEN(vals));
+                       }
+
+                       if (args.source_count == 0) {
+                               ast_ari_response_alloc_failed(response);
+                               goto fin;
+                       }
+
+                       if (args.source_count >= MAX_VALS) {
+                               ast_ari_response_error(response, 400,
+                                       "Bad Request",
+                                       "Too many values for source");
+                               goto fin;
+                       }
+
+                       args.source = ast_malloc(sizeof(*args.source) * args.source_count);
+                       if (!args.source) {
+                               ast_ari_response_alloc_failed(response);
+                               goto fin;
+                       }
+
+                       for (j = 0; j < args.source_count; ++j) {
+                               args.source[j] = (vals[j]);
+                       }
+               } else
+               {}
+       }
+       for (i = path_vars; i; i = i->next) {
+               if (strcmp(i->name, "eventName") == 0) {
+                       args.event_name = (i->value);
+               } else
+               {}
+       }
+       /* Look for a JSON request entity */
+       body = ast_http_get_json(ser, headers);
+       if (!body) {
+               switch (errno) {
+               case EFBIG:
+                       ast_ari_response_error(response, 413, "Request Entity Too Large", "Request body too large");
+                       goto fin;
+               case ENOMEM:
+                       ast_ari_response_error(response, 500, "Internal Server Error", "Error processing request");
+                       goto fin;
+               case EIO:
+                       ast_ari_response_error(response, 400, "Bad Request", "Error parsing request body");
+                       goto fin;
+               }
+       }
+       args.variables = ast_json_ref(body);
+       ast_ari_events_user_event(headers, &args, response);
+#if defined(AST_DEVMODE)
+       code = response->response_code;
+
+       switch (code) {
+       case 0: /* Implementation is still a stub, or the code wasn't set */
+               is_valid = response->message == NULL;
+               break;
+       case 500: /* Internal Server Error */
+       case 501: /* Not Implemented */
+       case 404: /* Application does not exist. */
+       case 422: /* Event source not found. */
+       case 400: /* Invalid even tsource URI or userevent data. */
+               is_valid = 1;
+               break;
+       default:
+               if (200 <= code && code <= 299) {
+                       is_valid = ast_ari_validate_void(
+                               response->message);
+               } else {
+                       ast_log(LOG_ERROR, "Invalid error response %d for /events/user/{eventName}\n", code);
+                       is_valid = 0;
+               }
+       }
+
+       if (!is_valid) {
+               ast_log(LOG_ERROR, "Response validation failed for /events/user/{eventName}\n");
+               ast_ari_response_error(response, 500,
+                       "Internal Server Error", "Response validation failed");
+       }
+#endif /* AST_DEVMODE */
+
+fin: __attribute__((unused))
+       ast_free(args.source_parse);
+       ast_free(args.source);
+       return;
+}
 
 /*! \brief REST handler for /api-docs/events.{format} */
-static struct stasis_rest_handlers events = {
-       .path_segment = "events",
+static struct stasis_rest_handlers events_user_eventName = {
+       .path_segment = "eventName",
+       .is_wildcard = 1,
        .callbacks = {
+               [AST_HTTP_POST] = ast_ari_events_user_event_cb,
        },
        .num_children = 0,
        .children = {  }
 };
+/*! \brief REST handler for /api-docs/events.{format} */
+static struct stasis_rest_handlers events_user = {
+       .path_segment = "user",
+       .callbacks = {
+       },
+       .num_children = 1,
+       .children = { &events_user_eventName, }
+};
+/*! \brief REST handler for /api-docs/events.{format} */
+static struct stasis_rest_handlers events = {
+       .path_segment = "events",
+       .callbacks = {
+       },
+       .num_children = 1,
+       .children = { &events_user, }
+};
 
 static int load_module(void)
 {
index d9542cd..0184d20 100644 (file)
@@ -61,6 +61,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
 #include "asterisk/stasis_app_impl.h"
 #include "asterisk/stasis_channels.h"
 #include "asterisk/stasis_bridges.h"
+#include "asterisk/stasis_endpoints.h"
 #include "asterisk/stasis_message_router.h"
 #include "asterisk/strings.h"
 #include "stasis/app.h"
@@ -1310,6 +1311,89 @@ enum stasis_app_subscribe_res stasis_app_unsubscribe(const char *app_name,
                json, app_unsubscribe);
 }
 
+enum stasis_app_user_event_res stasis_app_user_event(const char *app_name,
+       const char *event_name,
+       const char **source_uris, int sources_count,
+       struct ast_json *json_variables)
+{
+       RAII_VAR(struct stasis_app *, app, find_app_by_name(app_name), ao2_cleanup);
+       RAII_VAR(struct ast_json *, blob, NULL, ast_json_unref);
+       RAII_VAR(struct ast_multi_object_blob *, multi, NULL, ao2_cleanup);
+       RAII_VAR(void *, obj, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
+       enum stasis_app_subscribe_res res = STASIS_APP_USER_INTERNAL_ERROR;
+       struct ast_json *json_value;
+       int have_channel = 0;
+       int i;
+
+       if (!app) {
+               ast_log(LOG_WARNING, "App %s not found\n", app_name);
+               return STASIS_APP_USER_APP_NOT_FOUND;
+       }
+
+       blob = json_variables;
+       if (!blob) {
+               blob = ast_json_pack("{}");
+       }
+       json_value = ast_json_string_create(event_name);
+       if (!json_value) {
+               ast_log(LOG_ERROR, "unable to create json string\n");
+               return res;
+       }
+       if (ast_json_object_set(blob, "eventname", json_value)) {
+               ast_log(LOG_ERROR, "unable to set eventname to blob\n");
+               return res;
+       }
+
+       multi = ast_multi_object_blob_create(blob);
+
+       for (i = 0; i < sources_count; ++i) {
+               const char *uri = source_uris[i];
+               void *snapshot=NULL;
+               enum stasis_user_multi_object_snapshot_type type;
+
+               if (ast_begins_with(uri, "channel:")) {
+                       type = STASIS_UMOS_CHANNEL;
+                       snapshot = ast_channel_snapshot_get_latest(uri + 8);
+                       have_channel = 1;
+               } else if (ast_begins_with(uri, "bridge:")) {
+                       type = STASIS_UMOS_BRIDGE;
+                       snapshot = ast_bridge_snapshot_get_latest(uri + 7);
+               } else if (ast_begins_with(uri, "endpoint:")) {
+                       type = STASIS_UMOS_ENDPOINT;
+                       snapshot = ast_endpoint_latest_snapshot(uri + 9, NULL);
+               } else {
+                       ast_log(LOG_WARNING, "Invalid scheme: %s\n", uri);
+                       return STASIS_APP_USER_EVENT_SOURCE_BAD_SCHEME;
+               }
+               if (!snapshot) {
+                       ast_log(LOG_ERROR, "Unable to get snapshot for %s\n", uri);
+                       return STASIS_APP_USER_EVENT_SOURCE_NOT_FOUND;
+               }
+               ast_multi_object_blob_add(multi, type, snapshot);
+       }
+
+       message = stasis_message_create(ast_multi_user_event_type(), multi);
+       if (!message) {
+               ast_log(LOG_ERROR, "Unable to create stasis user event message\n");
+               return res;
+       }
+
+       /*
+        * Publishing to two different topics is normally to be avoided -- except
+        * in this case both are final destinations with no forwards (only listeners).
+        * The message has to be delivered to the application topic for ARI, but a
+        * copy is also delivered directly to the manager for AMI if there is a channel.
+        */
+       stasis_publish(ast_app_get_topic(app), message);
+
+       if (have_channel) {
+               stasis_publish(ast_manager_get_topic(), message);
+       }
+
+       return STASIS_APP_USER_OK;
+}
+
 void stasis_app_ref(void)
 {
        ast_module_ref(ast_module_info->self);
index 9fcf848..4dcb635 100644 (file)
@@ -795,6 +795,10 @@ struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *dat
        return app;
 }
 
+struct stasis_topic *ast_app_get_topic(struct stasis_app *app) {
+       return app->topic;
+}
+
 /*!
  * \brief Send a message to the given application.
  * \param app App to send the message to.
index 20c8423..f19b0b1 100644 (file)
                                        ]
                                }
                        ]
+               },
+               {
+                       "path": "/events/user/{eventName}",
+                       "description": "Stasis application user events",
+                       "operations": [
+                               {
+                                       "httpMethod": "POST",
+                                       "summary": "Generate a user event.",
+                                       "nickname": "userEvent",
+                                       "responseClass": "void",
+                                       "parameters": [
+                                               {
+                                                       "name": "eventName",
+                                                       "description": "Event name",
+                                                       "paramType": "path",
+                                                       "required": true,
+                                                       "allowMultiple": false,
+                                                       "dataType": "string"
+                                               },
+                                               {
+                                                       "name": "application",
+                                                       "description": "The name of the application that will receive this event",
+                                                       "paramType": "query",
+                                                       "required": true,
+                                                       "allowMultiple": false,
+                                                       "dataType": "string"
+                                               },
+                                               {
+                                                       "name": "source",
+                                                       "description": "URI for event source (channel:{channelId}, bridge:{bridgeId}, endpoint:{tech}/{resource}, deviceState:{deviceName}",
+                                                       "paramType": "query",
+                                                       "required": false,
+                                                       "allowMultiple": true,
+                                                       "dataType": "string"
+                                               },
+                                               {
+                                                       "name": "variables",
+                                                       "description": "custom key/value pairs added to the user event",
+                                                       "paramType": "body",
+                                                       "required": false,
+                                                       "allowMultiple": false,
+                                                       "dataType": "containers"
+                                               }
+                                       ],
+                                       "errorResponses": [
+                                               {
+                                                       "code": 404,
+                                                       "reason": "Application does not exist."
+                                               },
+                                               {
+                                                       "code": 422,
+                                                       "reason": "Event source not found."
+                                               },
+                                               {
+                                                       "code": 400,
+                                                       "reason": "Invalid even tsource URI or userevent data."
+                                               }
+                                       ]
+                               }
+                       ]
                }
        ],
        "models": {
                                        "description": "The name of the user event."
                                },
                                "channel": {
-                                       "required": true,
+                                       "required": false,
                                        "type": "Channel",
-                                       "description": "The channel that signaled the user event."
+                                       "description": "A channel that is signaled with the user event."
+                               },
+                               "bridge": {
+                                       "required": false,
+                                       "type": "Bridge",
+                                       "description": "A bridge that is signaled with the user event."
+                               },
+                               "endpoint": {
+                                       "required": false,
+                                       "type": "Endpoint",
+                                       "description": "A endpoint that is signaled with the user event."
                                },
                                "userevent": {
                                        "required": true,