Multiple revisions 420089-420090,420097
authorMatthew Jordan <mjordan@digium.com>
Tue, 5 Aug 2014 21:44:09 +0000 (21:44 +0000)
committerMatthew Jordan <mjordan@digium.com>
Tue, 5 Aug 2014 21:44:09 +0000 (21:44 +0000)
........
  r420089 | mjordan | 2014-08-05 15:10:52 -0500 (Tue, 05 Aug 2014) | 72 lines

  ARI: Add channel technology agnostic out of call text messaging

  This patch adds the ability to send and receive text messages from various
  technology stacks in Asterisk through ARI. This includes chan_sip (sip),
  res_pjsip_messaging (pjsip), and res_xmpp (xmpp). Messages are sent using the
  endpoints resource, and can be sent directly through that resource, or to a
  particular endpoint.

  For example, the following would send the message "Hello there" to PJSIP
  endpoint alice with a display URI of sip:asterisk@mycooldomain.org:

  ari/endpoints/sendMessage?to=pjsip:alice&from=sip:asterisk@mycooldomain.org&body=Hello+There

  This is equivalent to the following as well:

  ari/endpoints/PJSIP/alice/sendMessage?from=sip:asterisk@mycooldomain.org&body=Hello+There

  Both forms are available for message technologies that allow for arbitrary
  destinations, such as chan_sip.

  Inbound messages can now be received over ARI as well. An ARI application that
  subscribes to endpoints will receive messages from those endpoints:

  {
    "type": "TextMessageReceived",
    "timestamp": "2014-07-12T22:53:13.494-0500",
    "endpoint": {
      "technology": "PJSIP",
      "resource": "alice",
      "state": "online",
      "channel_ids": []
    },
    "message": {
      "from": "\"alice\" <sip:alice@127.0.0.1>",
      "to": "pjsip:asterisk@127.0.0.1",
      "body": "Watson, come here.",
      "variables": []
    },
    "application": "testsuite"
  }

  The above was made possible due to some rather major changes in the message
  core. This includes (but is not limited to):
  - Users of the message API can now register message handlers. A handler has
    two callbacks: one to determine if the handler has a destination for the
    message, and another to handle it.
  - All dialplan functionality of handling a message was moved into a message
    handler provided by the message API.
  - Messages can now have the technology/endpoint associated with them.
    Various other properties are also now more easily accessible.
  - A number of ao2 containers that weren't really needed were replaced with
    vectors. Iteration over ao2_containers is expensive and pointless when
    the lifetime of things is well defined and the number of things is very
    small.

  res_stasis now has a new file that makes up its structure, messaging. The
  messaging functionality implements a message handler, and passes received
  messages that match an interested endpoint over to the app for processing.

  Note that inadvertently while testing this, I reproduced ASTERISK-23969.
  res_pjsip_messaging was incorrectly parsing out the 'to' field, such that
  arbitrary SIP URIs mangled the endpoint lookup. This patch includes the
  fix for that as well.

  Review: https://reviewboard.asterisk.org/r/3726

  ASTERISK-23692 #close
  Reported by: Matt Jordan

  ASTERISK-23969 #close
  Reported by: Andrew Nagy
........
  r420090 | mjordan | 2014-08-05 15:16:37 -0500 (Tue, 05 Aug 2014) | 2 lines

  Remove automerge properties :-(
........
  r420097 | mjordan | 2014-08-05 16:36:25 -0500 (Tue, 05 Aug 2014) | 2 lines

  test_message: Fix strict-aliasing compilation issue
........

Merged revisions 420089-420090,420097 from http://svn.asterisk.org/svn/asterisk/branches/12

git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@420098 65c4cc65-6c06-0410-ace0-fbb531ad65f3

22 files changed:
CHANGES
channels/chan_sip.c
include/asterisk/json.h
include/asterisk/manager.h
include/asterisk/message.h
main/json.c
main/message.c
res/ari/ari_model_validators.c
res/ari/ari_model_validators.h
res/ari/resource_channels.c
res/ari/resource_endpoints.c
res/ari/resource_endpoints.h
res/res_ari_endpoints.c
res/res_pjsip_messaging.c
res/res_stasis.c
res/res_xmpp.c
res/stasis/app.c
res/stasis/messaging.c [new file with mode: 0644]
res/stasis/messaging.h [new file with mode: 0644]
rest-api/api-docs/endpoints.json
rest-api/api-docs/events.json
tests/test_message.c [new file with mode: 0644]

diff --git a/CHANGES b/CHANGES
index 14388ad..1974612 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -246,6 +246,17 @@ ARI
      when the recording was started.
    Note that all duration values are reported in seconds.
 
+ * Users of ARI can now send and receive out of call text messages. Messages
+   can be sent directly to a particular endpoint, or can be sent to the
+   endpoints resource directly and inferred from the URI scheme. Text
+   messages are passed to ARI clients as TextMessageReceived events. ARI
+   clients can choose to receive text messages by subscribing to the particular
+   endpoint technology or endpoints that they are interested in.
+
+ * The applications resource now supports subscriptions to all endpoints of
+   a particular channel technology. For example, subscribing to an eventSource
+   of 'endpoint:PJSIP' will subscribe to all PJSIP endpoints.
+
 res_pjsip
 ------------------
  * The endpoint configuration object now supports 'accountcode'. Any channel
index 0eb2dc1..884c284 100644 (file)
@@ -18871,6 +18871,7 @@ static void receive_message(struct sip_pvt *p, struct sip_request *req, struct a
        char *to;
        char from_name[50];
        char stripped[SIPBUFSIZE];
+       enum sip_get_dest_result dest_result;
 
        if (strncmp(content_type, "text/plain", strlen("text/plain"))) { /* No text/plain attachment */
                transmit_response(p, "415 Unsupported Media Type", req); /* Good enough, or? */
@@ -18980,7 +18981,8 @@ static void receive_message(struct sip_pvt *p, struct sip_request *req, struct a
                ast_string_field_set(p, context, sip_cfg.messagecontext);
        }
 
-       switch (get_destination(p, NULL, NULL)) {
+       dest_result = get_destination(p, NULL, NULL);
+       switch (dest_result) {
        case SIP_GET_DEST_REFUSED:
                /* Okay to send 403 since this is after auth processing */
                transmit_response(p, "403 Forbidden", req);
@@ -18990,12 +18992,9 @@ static void receive_message(struct sip_pvt *p, struct sip_request *req, struct a
                transmit_response(p, "416 Unsupported URI Scheme", req);
                sip_scheddestroy(p, DEFAULT_TRANS_TIMEOUT);
                return;
-       case SIP_GET_DEST_EXTEN_NOT_FOUND:
-       case SIP_GET_DEST_EXTEN_MATCHMORE:
-               transmit_response(p, "404 Not Found", req);
-               sip_scheddestroy(p, DEFAULT_TRANS_TIMEOUT);
-               return;
-       case SIP_GET_DEST_EXTEN_FOUND:
+       default:
+               /* We may have something other than dialplan who wants
+                * the message, so defer further error handling for now */
                break;
        }
 
@@ -19023,7 +19022,9 @@ static void receive_message(struct sip_pvt *p, struct sip_request *req, struct a
        res |= ast_msg_set_context(msg, "%s", p->context);
 
        res |= ast_msg_set_var(msg, "SIP_RECVADDR", ast_sockaddr_stringify(&p->recv));
+       res |= ast_msg_set_tech(msg, "%s", "SIP");
        if (!ast_strlen_zero(p->peername)) {
+               res |= ast_msg_set_endpoint(msg, "%s", p->peername);
                res |= ast_msg_set_var(msg, "SIP_PEERNAME", p->peername);
        }
 
@@ -19036,12 +19037,32 @@ static void receive_message(struct sip_pvt *p, struct sip_request *req, struct a
        if (res) {
                ast_msg_destroy(msg);
                transmit_response(p, "500 Internal Server Error", req);
-       } else {
+               sip_scheddestroy(p, DEFAULT_TRANS_TIMEOUT);
+               return;
+       }
+
+       if (ast_msg_has_destination(msg)) {
                ast_msg_queue(msg);
                transmit_response(p, "202 Accepted", req);
+               sip_scheddestroy(p, DEFAULT_TRANS_TIMEOUT);
+               return;
        }
 
+       /* Find a specific error cause to send */
+       switch (dest_result) {
+       case SIP_GET_DEST_EXTEN_NOT_FOUND:
+       case SIP_GET_DEST_EXTEN_MATCHMORE:
+               transmit_response(p, "404 Not Found", req);
+               break;
+       case SIP_GET_DEST_EXTEN_FOUND:
+       default:
+               /* We should have sent the message already! */
+               ast_assert(0);
+               transmit_response(p, "500 Internal Server Error", req);
+               break;
+       }
        sip_scheddestroy(p, DEFAULT_TRANS_TIMEOUT);
+       ast_msg_destroy(msg);
 }
 
 /*! \brief  CLI Command to show calls within limits set by call_limit */
index 6ceeb0b..8cb74a4 100644 (file)
@@ -1010,6 +1010,27 @@ struct ast_party_id;
  */
 struct ast_json *ast_json_party_id(struct ast_party_id *party);
 
+/*!
+ * \brief Convert a \c ast_json list of key/value pair tuples into a \c ast_variable list
+ * \since 12.5.0
+ *
+ * \param json_variables The JSON blob containing the variable
+ * \param variables An out reference to the variables to populate.
+ *        The pointer to the variables should be NULL when calling this.
+ *
+ * \code
+ * struct ast_json *json_variables = ast_json_pack("[ { s: s } ]", "foo", "bar");
+ * struct ast_variable *variables = NULL;
+ * int res;
+ *
+ * res = ast_json_to_ast_variables(json_variables, &variables);
+ * \endcode
+ *
+ * \retval 0 success
+ * \retval -1 error
+ */
+int ast_json_to_ast_variables(struct ast_json *json_variables, struct ast_variable **variables);
+
 /*!@}*/
 
 #endif /* _ASTERISK_JSON_H */
index fad7af1..dda2e54 100644 (file)
 /*! \brief Export manager structures */
 #define AST_MAX_MANHEADERS 128
 
-/*! \brief Manager Helper Function */
-typedef int (*manager_hook_t)(int, const char *, char *);
+/*! \brief Manager Helper Function
+ *
+ * \param category The class authorization category of the event
+ * \param event The name of the event being raised
+ * \param body The body of the event
+ *
+ * \retval 0 Success
+ * \retval non-zero Error
+ */
+typedef int (*manager_hook_t)(int category, const char *event, char *body);
 
 struct manager_custom_hook {
        /*! Identifier */
index 7e5c77a..f81574c 100644 (file)
@@ -25,8 +25,9 @@
  *
  * The purpose of this API is to provide support for text messages that
  * are not session based.  The messages are passed into the Asterisk core
- * to be routed through the dialplan and potentially sent back out through
- * a message technology that has been registered through this API.
+ * to be routed through the dialplan or another interface and potentially
+ * sent back out through a message technology that has been registered
+ * through this API.
  */
 
 #ifndef __AST_MESSAGE_H__
@@ -91,6 +92,64 @@ int ast_msg_tech_register(const struct ast_msg_tech *tech);
 int ast_msg_tech_unregister(const struct ast_msg_tech *tech);
 
 /*!
+ * \brief An external processor of received messages
+ * \since 12.5.0
+ */
+struct ast_msg_handler {
+       /*!
+        * \brief Name of the message handler
+        */
+       const char *name;
+
+       /*!
+        * \brief The function callback that will handle the message
+        *
+        * \param msg The message to handle
+        *
+        * \retval 0 The handler processed the message successfull
+        * \retval non-zero The handler passed or could not process the message
+        */
+       int (* const handle_msg)(struct ast_msg *msg);
+
+       /*!
+        * \brief Return whether or not the message has a valid destination
+        *
+        * A message may be delivered to the dialplan and/or other locations,
+        * depending on whether or not other handlers have been registered. This
+        * function is called by the message core to determine if any handler can
+        * process a message.
+        *
+        * \param msg The message to inspect
+        *
+        * \retval 0 The message does not have a valid destination
+        * \retval 1 The message has a valid destination
+        */
+       int (* const has_destination)(const struct ast_msg *msg);
+};
+
+/*!
+ * \brief Register a \c ast_msg_handler
+ * \since 12.5.0
+ *
+ * \param handler The handler to register
+ *
+ * \retval 0 Success
+ * \retval non-zero Error
+ */
+int ast_msg_handler_register(const struct ast_msg_handler *handler);
+
+/*!
+ * \brief Unregister a \c ast_msg_handler
+ * \since 12.5.0
+ *
+ * \param handler The handler to unregister
+ *
+ * \retval 0 Success
+ * \retval non-zero Error
+ */
+int ast_msg_handler_unregister(const struct ast_msg_handler *handler);
+
+/*!
  * \brief Allocate a message.
  *
  * Allocate a message for the purposes of passing it into the Asterisk core
@@ -162,7 +221,29 @@ int __attribute__((format(printf, 2, 3)))
  */
 int __attribute__((format(printf, 2, 3)))
                ast_msg_set_exten(struct ast_msg *msg, const char *fmt, ...);
-       
+
+/*!
+ * \brief Set the technology associated with this message
+ *
+ * \since 12.5.0
+ *
+ * \retval 0 success
+ * \retval -1 failure
+ */
+int __attribute__((format(printf, 2, 3)))
+               ast_msg_set_tech(struct ast_msg *msg, const char *fmt, ...);
+
+/*!
+ * \brief Set the technology's endpoint associated with this message
+ *
+ * \since 12.5.0
+ *
+ * \retval 0 success
+ * \retval -1 failure
+ */
+int __attribute__((format(printf, 2, 3)))
+               ast_msg_set_endpoint(struct ast_msg *msg, const char *fmt, ...);
+
 /*!
  * \brief Set a variable on the message going to the dialplan.
  * \note Setting a variable that already exists overwrites the existing variable value
@@ -209,6 +290,66 @@ const char *ast_msg_get_var(struct ast_msg *msg, const char *name);
 const char *ast_msg_get_body(const struct ast_msg *msg);
 
 /*!
+ * \brief Retrieve the source of this message
+ *
+ * \since 12.5.0
+ *
+ * \param msg The message to get the soure from
+ *
+ * \retval The source of the message
+ * \retval NULL or empty string if the message has no source
+ */
+const char *ast_msg_get_from(const struct ast_msg *msg);
+
+/*!
+ * \brief Retrieve the destination of this message
+ *
+ * \since 12.5.0
+ *
+ * \param msg The message to get the destination from
+ *
+ * \retval The destination of the message
+ * \retval NULL or empty string if the message has no destination
+ */
+const char *ast_msg_get_to(const struct ast_msg *msg);
+
+/*!
+ * \brief Retrieve the technology associated with this message
+ *
+ * \since 12.5.0
+ *
+ * \param msg The message to get the technology from
+ *
+ * \retval The technology of the message
+ * \retval NULL or empty string if the message has no associated technology
+ */
+const char *ast_msg_get_tech(const struct ast_msg *msg);
+
+/*!
+ * \brief Retrieve the endpoint associated with this message
+ *
+ * \since 12.5.0
+ *
+ * \param msg The message to get the endpoint from
+ *
+ * \retval The endpoint associated with the message
+ * \retval NULL or empty string if the message has no associated endpoint
+ */
+const char *ast_msg_get_endpoint(const struct ast_msg *msg);
+
+/*!
+ * \brief Determine if a particular message has a destination via some handler
+ *
+ * \since 12.5.0
+ *
+ * \param msg The message to check
+ *
+ * \retval 0 if the message has no handler that can find a destination
+ * \retval 1 if the message has a handler that can find a destination
+ */
+int ast_msg_has_destination(const struct ast_msg *msg);
+
+/*!
  * \brief Queue a message for routing through the dialplan.
  *
  * Regardless of the return value of this function, this funciton will take
index 259b41e..88e8077 100644 (file)
@@ -881,3 +881,33 @@ struct ast_json *ast_json_party_id(struct ast_party_id *party)
 
        return ast_json_ref(json_party_id);
 }
+
+int ast_json_to_ast_variables(struct ast_json *json_variables, struct ast_variable **variables)
+{
+       struct ast_json_iter *it_json_var;
+
+       *variables = NULL;
+
+       for (it_json_var = ast_json_object_iter(json_variables); it_json_var;
+                it_json_var = ast_json_object_iter_next(json_variables, it_json_var)) {
+               struct ast_variable *new_var;
+               const char *key = ast_json_object_iter_key(it_json_var);
+
+               if (ast_strlen_zero(key)) {
+                       continue;
+               }
+
+               new_var = ast_variable_new(key,
+                                          ast_json_string_get(ast_json_object_iter_value(it_json_var)),
+                                          "");
+               if (!new_var) {
+                       ast_variables_destroy(*variables);
+                       *variables = NULL;
+                       return -1;
+               }
+
+               ast_variable_list_append(variables, new_var);
+       }
+
+       return 0;
+}
index 8acfeb3..3a597b8 100644 (file)
@@ -39,6 +39,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
 #include "asterisk/manager.h"
 #include "asterisk/strings.h"
 #include "asterisk/astobj2.h"
+#include "asterisk/vector.h"
 #include "asterisk/app.h"
 #include "asterisk/taskprocessor.h"
 #include "asterisk/message.h"
@@ -201,37 +202,46 @@ struct msg_data {
                AST_STRING_FIELD(name);
                AST_STRING_FIELD(value);
        );
-       unsigned int send:1; /* Whether to send out on outbound messages */
+       unsigned int send; /* Whether to send out on outbound messages */
 };
 
 AST_LIST_HEAD_NOLOCK(outhead, msg_data);
 
 /*!
  * \brief A message.
- *
- * \todo Consider whether stringfields would be an appropriate optimization here.
  */
 struct ast_msg {
-       struct ast_str *to;
-       struct ast_str *from;
-       struct ast_str *body;
-       struct ast_str *context;
-       struct ast_str *exten;
+       AST_DECLARE_STRING_FIELDS(
+               /*! Where the message is going */
+               AST_STRING_FIELD(to);
+               /*! Where we "say" the message came from */
+               AST_STRING_FIELD(from);
+               /*! The text to send */
+               AST_STRING_FIELD(body);
+               /*! The dialplan context for the message */
+               AST_STRING_FIELD(context);
+               /*! The dialplan extension for the message */
+               AST_STRING_FIELD(exten);
+               /*! An endpoint associated with this message */
+               AST_STRING_FIELD(endpoint);
+               /*! The technology of the endpoint associated with this message */
+               AST_STRING_FIELD(tech);
+       );
+       /*! Technology/dialplan specific variables associated with the message */
        struct ao2_container *vars;
 };
 
-struct ast_msg_tech_holder {
-       const struct ast_msg_tech *tech;
-       /*!
-        * \brief A rwlock for this object
-        *
-        * a read/write lock must be used to protect the wrapper instead
-        * of the ao2 lock. A rdlock must be held to read tech_holder->tech.
-        */
-       ast_rwlock_t tech_lock;
-};
+/*! \brief Lock for \c msg_techs vector */
+static ast_rwlock_t msg_techs_lock;
+
+/*! \brief Vector of message technologies */
+AST_VECTOR(, const struct ast_msg_tech *) msg_techs;
+
+/*! \brief Lock for \c msg_handlers vector */
+static ast_rwlock_t msg_handlers_lock;
 
-static struct ao2_container *msg_techs;
+/*! \brief Vector of received message handlers */
+AST_VECTOR(, const struct ast_msg_handler *) msg_handlers;
 
 static struct ast_taskprocessor *msg_q_tp;
 
@@ -387,21 +397,7 @@ static void msg_destructor(void *obj)
 {
        struct ast_msg *msg = obj;
 
-       ast_free(msg->to);
-       msg->to = NULL;
-
-       ast_free(msg->from);
-       msg->from = NULL;
-
-       ast_free(msg->body);
-       msg->body = NULL;
-
-       ast_free(msg->context);
-       msg->context = NULL;
-
-       ast_free(msg->exten);
-       msg->exten = NULL;
-
+       ast_string_field_free_memory(msg);
        ao2_ref(msg->vars, -1);
 }
 
@@ -413,27 +409,7 @@ struct ast_msg *ast_msg_alloc(void)
                return NULL;
        }
 
-       if (!(msg->to = ast_str_create(32))) {
-               ao2_ref(msg, -1);
-               return NULL;
-       }
-
-       if (!(msg->from = ast_str_create(32))) {
-               ao2_ref(msg, -1);
-               return NULL;
-       }
-
-       if (!(msg->body = ast_str_create(128))) {
-               ao2_ref(msg, -1);
-               return NULL;
-       }
-
-       if (!(msg->context = ast_str_create(16))) {
-               ao2_ref(msg, -1);
-               return NULL;
-       }
-
-       if (!(msg->exten = ast_str_create(16))) {
+       if (ast_string_field_init(msg, 128)) {
                ao2_ref(msg, -1);
                return NULL;
        }
@@ -442,8 +418,7 @@ struct ast_msg *ast_msg_alloc(void)
                ao2_ref(msg, -1);
                return NULL;
        }
-
-       ast_str_set(&msg->context, 0, "default");
+       ast_string_field_set(msg, context, "default");
 
        return msg;
 }
@@ -457,73 +432,109 @@ struct ast_msg *ast_msg_ref(struct ast_msg *msg)
 struct ast_msg *ast_msg_destroy(struct ast_msg *msg)
 {
        ao2_ref(msg, -1);
-
        return NULL;
 }
 
 int ast_msg_set_to(struct ast_msg *msg, const char *fmt, ...)
 {
        va_list ap;
-       int res;
 
        va_start(ap, fmt);
-       res = ast_str_set_va(&msg->to, 0, fmt, ap);
+       ast_string_field_build_va(msg, to, fmt, ap);
        va_end(ap);
 
-       return res < 0 ? -1 : 0;
+       return 0;
 }
 
 int ast_msg_set_from(struct ast_msg *msg, const char *fmt, ...)
 {
        va_list ap;
-       int res;
 
        va_start(ap, fmt);
-       res = ast_str_set_va(&msg->from, 0, fmt, ap);
+       ast_string_field_build_va(msg, from, fmt, ap);
        va_end(ap);
 
-       return res < 0 ? -1 : 0;
+       return 0;
 }
 
 int ast_msg_set_body(struct ast_msg *msg, const char *fmt, ...)
 {
        va_list ap;
-       int res;
 
        va_start(ap, fmt);
-       res = ast_str_set_va(&msg->body, 0, fmt, ap);
+       ast_string_field_build_va(msg, body, fmt, ap);
        va_end(ap);
 
-       return res < 0 ? -1 : 0;
+       return 0;
 }
 
 int ast_msg_set_context(struct ast_msg *msg, const char *fmt, ...)
 {
        va_list ap;
-       int res;
 
        va_start(ap, fmt);
-       res = ast_str_set_va(&msg->context, 0, fmt, ap);
+       ast_string_field_build_va(msg, context, fmt, ap);
        va_end(ap);
 
-       return res < 0 ? -1 : 0;
+       return 0;
 }
 
 int ast_msg_set_exten(struct ast_msg *msg, const char *fmt, ...)
 {
        va_list ap;
-       int res;
 
        va_start(ap, fmt);
-       res = ast_str_set_va(&msg->exten, 0, fmt, ap);
+       ast_string_field_build_va(msg, exten, fmt, ap);
        va_end(ap);
 
-       return res < 0 ? -1 : 0;
+       return 0;
+}
+
+int ast_msg_set_tech(struct ast_msg *msg, const char *fmt, ...)
+{
+       va_list ap;
+
+       va_start(ap, fmt);
+       ast_string_field_build_va(msg, tech, fmt, ap);
+       va_end(ap);
+
+       return 0;
+}
+
+int ast_msg_set_endpoint(struct ast_msg *msg, const char *fmt, ...)
+{
+       va_list ap;
+
+       va_start(ap, fmt);
+       ast_string_field_build_va(msg, endpoint, fmt, ap);
+       va_end(ap);
+
+       return 0;
 }
 
 const char *ast_msg_get_body(const struct ast_msg *msg)
 {
-       return ast_str_buffer(msg->body);
+       return msg->body;
+}
+
+const char *ast_msg_get_from(const struct ast_msg *msg)
+{
+       return msg->from;
+}
+
+const char *ast_msg_get_to(const struct ast_msg *msg)
+{
+       return msg->to;
+}
+
+const char *ast_msg_get_tech(const struct ast_msg *msg)
+{
+       return msg->tech;
+}
+
+const char *ast_msg_get_endpoint(const struct ast_msg *msg)
+{
+       return msg->endpoint;
 }
 
 static struct msg_data *msg_data_alloc(void)
@@ -713,7 +724,7 @@ static void msg_route(struct ast_channel *chan, struct ast_msg *msg)
 {
        struct ast_pbx_args pbx_args;
 
-       ast_explicit_goto(chan, ast_str_buffer(msg->context), AS_OR(msg->exten, "s"), 1);
+       ast_explicit_goto(chan, msg->context, S_OR(msg->exten, "s"), 1);
 
        memset(&pbx_args, 0, sizeof(pbx_args));
        pbx_args.no_hangup_chan = 1,
@@ -787,18 +798,9 @@ static void destroy_msg_q_chan(void *data)
 
 AST_THREADSTORAGE_CUSTOM(msg_q_chan, NULL, destroy_msg_q_chan);
 
-/*!
- * \internal
- * \brief Message queue task processor callback
- *
- * \retval 0 success
- * \retval -1 failure
- *
- * \note Even though this returns a value, the taskprocessor code ignores the value.
- */
-static int msg_q_cb(void *data)
+/*! \internal \brief Handle a message bound for the dialplan */
+static int dialplan_handle_msg_cb(struct ast_msg *msg)
 {
-       struct ast_msg *msg = data;
        struct ast_channel **chan_p, *chan;
        struct ast_datastore *ds;
 
@@ -824,15 +826,90 @@ static int msg_q_cb(void *data)
        msg_route(chan, msg);
        chan_cleanup(chan);
 
+       return 0;
+}
+
+/*! \internal \brief Determine if a message has a destination in the dialplan */
+static int dialplan_has_destination_cb(const struct ast_msg *msg)
+{
+       if (ast_strlen_zero(msg->context)) {
+               return 0;
+       }
+
+       return ast_exists_extension(NULL, msg->context, S_OR(msg->exten, "s"), 1, NULL);
+}
+
+static struct ast_msg_handler dialplan_msg_handler = {
+       .name = "dialplan",
+       .handle_msg = dialplan_handle_msg_cb,
+       .has_destination = dialplan_has_destination_cb,
+};
+
+/*!
+ * \internal
+ * \brief Message queue task processor callback
+ *
+ * \retval 0 success
+ * \retval non-zero failure
+ *
+ * \note Even though this returns a value, the taskprocessor code ignores the value.
+ */
+static int msg_q_cb(void *data)
+{
+       struct ast_msg *msg = data;
+       int res = 1;
+       int i;
+
+       ast_rwlock_rdlock(&msg_handlers_lock);
+       for (i = 0; i < AST_VECTOR_SIZE(&msg_handlers); i++) {
+               const struct ast_msg_handler *handler = AST_VECTOR_GET(&msg_handlers, i);
+
+               if (!handler->has_destination(msg)) {
+                       ast_debug(5, "Handler %s doesn't want message, moving on\n", handler->name);
+                       continue;
+               }
+
+               ast_debug(5, "Dispatching message to %s handler", handler->name);
+               res &= handler->handle_msg(msg);
+       }
+       ast_rwlock_unlock(&msg_handlers_lock);
+
+       if (res != 0) {
+               ast_log(LOG_WARNING, "No handler processed message from %s to %s\n",
+                       S_OR(msg->from, "<unknown>"), S_OR(msg->to, "<unknown>"));
+       }
+
        ao2_ref(msg, -1);
 
-       return 0;
+       return res;
+}
+
+int ast_msg_has_destination(const struct ast_msg *msg)
+{
+       int i;
+       int result = 0;
+
+       ast_rwlock_rdlock(&msg_handlers_lock);
+       for (i = 0; i < AST_VECTOR_SIZE(&msg_handlers); i++) {
+               const struct ast_msg_handler *handler = AST_VECTOR_GET(&msg_handlers, i);
+
+               ast_debug(5, "Seeing if %s can handle message\n", handler->name);
+               if (handler->has_destination(msg)) {
+                       ast_debug(5, "%s can handle message\n", handler->name);
+                       result = 1;
+                       break;
+               }
+       }
+       ast_rwlock_unlock(&msg_handlers_lock);
+
+       return result;
 }
 
 int ast_msg_queue(struct ast_msg *msg)
 {
        int res;
-
+       ast_log(LOG_ERROR, "@@@@@ to: %s from: %s exten: %s context: %s\n",
+               msg->to, msg->from, msg->exten, msg->context);
        res = ast_taskprocessor_push(msg_q_tp, msg_q_cb, msg);
        if (res == -1) {
                ao2_ref(msg, -1);
@@ -899,11 +976,11 @@ static int msg_func_read(struct ast_channel *chan, const char *function,
        ao2_lock(msg);
 
        if (!strcasecmp(data, "to")) {
-               ast_copy_string(buf, ast_str_buffer(msg->to), len);
+               ast_copy_string(buf, msg->to, len);
        } else if (!strcasecmp(data, "from")) {
-               ast_copy_string(buf, ast_str_buffer(msg->from), len);
+               ast_copy_string(buf, msg->from, len);
        } else if (!strcasecmp(data, "body")) {
-               ast_copy_string(buf, ast_msg_get_body(msg), len);
+               ast_copy_string(buf, msg->body, len);
        } else {
                ast_log(LOG_WARNING, "Invalid argument to MESSAGE(): '%s'\n", data);
        }
@@ -1041,57 +1118,57 @@ static int msg_data_func_write(struct ast_channel *chan, const char *function,
 
        return 0;
 }
-static int msg_tech_hash(const void *obj, const int flags)
-{
-       struct ast_msg_tech_holder *tech_holder = (struct ast_msg_tech_holder *) obj;
-       int res = 0;
 
-       ast_rwlock_rdlock(&tech_holder->tech_lock);
-       if (tech_holder->tech) {
-               res = ast_str_case_hash(tech_holder->tech->name);
-       }
-       ast_rwlock_unlock(&tech_holder->tech_lock);
-
-       return res;
-}
-
-static int msg_tech_cmp(void *obj, void *arg, int flags)
+/*!
+ * \internal \brief Find a \c ast_msg_tech by its technology name
+ *
+ * \param tech_name The name of the message technology
+ *
+ * \note \c msg_techs should be locked via \c msg_techs_lock prior to
+ *       calling this function
+ *
+ * \retval NULL if no \c ast_msg_tech has been registered
+ * \retval \c ast_msg_tech if registered
+ */
+static const struct ast_msg_tech *msg_find_by_tech_name(const char *tech_name)
 {
-       struct ast_msg_tech_holder *tech_holder = obj;
-       const struct ast_msg_tech_holder *tech_holder2 = arg;
-       int res = 1;
+       const struct ast_msg_tech *current;
+       int i;
 
-       ast_rwlock_rdlock(&tech_holder->tech_lock);
-       /*
-        * tech_holder2 is a temporary fake tech_holder.
-        */
-       if (tech_holder->tech) {
-               res = strcasecmp(tech_holder->tech->name, tech_holder2->tech->name) ? 0 : CMP_MATCH | CMP_STOP;
+       for (i = 0; i < AST_VECTOR_SIZE(&msg_techs); i++) {
+               current = AST_VECTOR_GET(&msg_techs, i);
+               if (!strcmp(current->name, tech_name)) {
+                       return current;
+               }
        }
-       ast_rwlock_unlock(&tech_holder->tech_lock);
 
-       return res;
+       return NULL;
 }
 
-static struct ast_msg_tech_holder *msg_find_by_tech(const struct ast_msg_tech *msg_tech, int ao2_flags)
+/*!
+ * \internal \brief Find a \c ast_msg_handler by its technology name
+ *
+ * \param tech_name The name of the message technology
+ *
+ * \note \c msg_handlers should be locked via \c msg_handlers_lock
+ *       prior to calling this function
+ *
+ * \retval NULL if no \c ast_msg_handler has been registered
+ * \retval \c ast_msg_handler if registered
+ */
+static const struct ast_msg_handler *msg_handler_find_by_tech_name(const char *tech_name)
 {
-       struct ast_msg_tech_holder *tech_holder;
-       struct ast_msg_tech_holder tmp_tech_holder = {
-               .tech = msg_tech,
-       };
+       const struct ast_msg_handler *current;
+       int i;
 
-       ast_rwlock_init(&tmp_tech_holder.tech_lock);
-       tech_holder = ao2_find(msg_techs, &tmp_tech_holder, ao2_flags);
-       ast_rwlock_destroy(&tmp_tech_holder.tech_lock);
-       return tech_holder;
-}
+       for (i = 0; i < AST_VECTOR_SIZE(&msg_handlers); i++) {
+               current = AST_VECTOR_GET(&msg_handlers, i);
+               if (!strcmp(current->name, tech_name)) {
+                       return current;
+               }
+       }
 
-static struct ast_msg_tech_holder *msg_find_by_tech_name(const char *tech_name, int ao2_flags)
-{
-       struct ast_msg_tech tmp_msg_tech = {
-               .name = tech_name,
-       };
-       return msg_find_by_tech(&tmp_msg_tech, ao2_flags);
+       return NULL;
 }
 
 /*!
@@ -1103,7 +1180,7 @@ static int msg_send_exec(struct ast_channel *chan, const char *data)
        struct ast_datastore *ds;
        struct ast_msg *msg;
        char *tech_name;
-       struct ast_msg_tech_holder *tech_holder = NULL;
+       const struct ast_msg_tech *msg_tech;
        char *parse;
        int res = -1;
        AST_DECLARE_APP_ARGS(args,
@@ -1142,9 +1219,10 @@ static int msg_send_exec(struct ast_channel *chan, const char *data)
        tech_name = ast_strdupa(args.to);
        tech_name = strsep(&tech_name, ":");
 
-       tech_holder = msg_find_by_tech_name(tech_name, OBJ_POINTER);
+       ast_rwlock_rdlock(&msg_techs_lock);
+       msg_tech = msg_find_by_tech_name(tech_name);
 
-       if (!tech_holder) {
+       if (!msg_tech) {
                ast_log(LOG_WARNING, "No message technology '%s' found.\n", tech_name);
                pbx_builtin_setvar_helper(chan, "MESSAGE_SEND_STATUS", "INVALID_PROTOCOL");
                goto exit_cleanup;
@@ -1156,22 +1234,13 @@ static int msg_send_exec(struct ast_channel *chan, const char *data)
         * that they could change.
         */
        ao2_lock(msg);
-       ast_rwlock_rdlock(&tech_holder->tech_lock);
-       if (tech_holder->tech) {
-               res = tech_holder->tech->msg_send(msg, S_OR(args.to, ""),
-                                                       S_OR(args.from, ""));
-       }
-       ast_rwlock_unlock(&tech_holder->tech_lock);
+       res = msg_tech->msg_send(msg, S_OR(args.to, ""), S_OR(args.from, ""));
        ao2_unlock(msg);
 
        pbx_builtin_setvar_helper(chan, "MESSAGE_SEND_STATUS", res ? "FAILURE" : "SUCCESS");
 
 exit_cleanup:
-       if (tech_holder) {
-               ao2_ref(tech_holder, -1);
-               tech_holder = NULL;
-       }
-
+       ast_rwlock_unlock(&msg_techs_lock);
        ao2_ref(msg, -1);
 
        return 0;
@@ -1187,7 +1256,7 @@ static int action_messagesend(struct mansession *s, const struct message *m)
        char *tech_name = NULL;
        struct ast_variable *vars = NULL;
        struct ast_variable *data = NULL;
-       struct ast_msg_tech_holder *tech_holder = NULL;
+       const struct ast_msg_tech *msg_tech;
        struct ast_msg *msg;
        int res = -1;
 
@@ -1204,15 +1273,16 @@ static int action_messagesend(struct mansession *s, const struct message *m)
        tech_name = ast_strdupa(to);
        tech_name = strsep(&tech_name, ":");
 
-       tech_holder = msg_find_by_tech_name(tech_name, OBJ_POINTER);
-
-       if (!tech_holder) {
+       ast_rwlock_rdlock(&msg_techs_lock);
+       msg_tech = msg_find_by_tech_name(tech_name);
+       if (!msg_tech) {
+               ast_rwlock_unlock(&msg_techs_lock);
                astman_send_error(s, m, "Message technology not found.");
-               return -1;
+               return 0;
        }
 
        if (!(msg = ast_msg_alloc())) {
-               ao2_ref(tech_holder, -1);
+               ast_rwlock_unlock(&msg_techs_lock);
                astman_send_error(s, m, "Internal failure\n");
                return -1;
        }
@@ -1224,14 +1294,11 @@ static int action_messagesend(struct mansession *s, const struct message *m)
 
        ast_msg_set_body(msg, "%s", body);
 
-       ast_rwlock_rdlock(&tech_holder->tech_lock);
-       if (tech_holder->tech) {
-               res = tech_holder->tech->msg_send(msg, S_OR(to, ""), S_OR(from, ""));
-       }
-       ast_rwlock_unlock(&tech_holder->tech_lock);
+       res = msg_tech->msg_send(msg, S_OR(to, ""), S_OR(from, ""));
+
+       ast_rwlock_unlock(&msg_techs_lock);
 
        ast_variables_destroy(vars);
-       ao2_ref(tech_holder, -1);
        ao2_ref(msg, -1);
 
        if (res) {
@@ -1245,7 +1312,7 @@ static int action_messagesend(struct mansession *s, const struct message *m)
 int ast_msg_send(struct ast_msg *msg, const char *to, const char *from)
 {
        char *tech_name = NULL;
-       struct ast_msg_tech_holder *tech_holder = NULL;
+       const struct ast_msg_tech *msg_tech;
        int res = -1;
 
        if (ast_strlen_zero(to)) {
@@ -1256,20 +1323,19 @@ int ast_msg_send(struct ast_msg *msg, const char *to, const char *from)
        tech_name = ast_strdupa(to);
        tech_name = strsep(&tech_name, ":");
 
-       tech_holder = msg_find_by_tech_name(tech_name, OBJ_POINTER);
+       ast_rwlock_rdlock(&msg_techs_lock);
+       msg_tech = msg_find_by_tech_name(tech_name);
 
-       if (!tech_holder) {
-               ao2_ref(msg, -1);
+       if (!msg_tech) {
+               ast_log(LOG_ERROR, "Unknown message tech: %s\n", tech_name);
+               ast_rwlock_unlock(&msg_techs_lock);
                return -1;
        }
 
-       ast_rwlock_rdlock(&tech_holder->tech_lock);
-       if (tech_holder->tech) {
-               res = tech_holder->tech->msg_send(msg, S_OR(to, ""), S_OR(from, ""));
-       }
-       ast_rwlock_unlock(&tech_holder->tech_lock);
+       res = msg_tech->msg_send(msg, S_OR(to, ""), S_OR(from, ""));
+
+       ast_rwlock_unlock(&msg_techs_lock);
 
-       ao2_ref(tech_holder, -1);
        ao2_ref(msg, -1);
 
        return res;
@@ -1277,52 +1343,111 @@ int ast_msg_send(struct ast_msg *msg, const char *to, const char *from)
 
 int ast_msg_tech_register(const struct ast_msg_tech *tech)
 {
-       struct ast_msg_tech_holder *tech_holder;
+       const struct ast_msg_tech *match;
 
-       if ((tech_holder = msg_find_by_tech(tech, OBJ_POINTER))) {
-               ao2_ref(tech_holder, -1);
+       ast_rwlock_wrlock(&msg_techs_lock);
+
+       match = msg_find_by_tech_name(tech->name);
+       if (match) {
                ast_log(LOG_ERROR, "Message technology already registered for '%s'\n",
-                               tech->name);
+                       tech->name);
+               ast_rwlock_unlock(&msg_techs_lock);
                return -1;
        }
 
-       if (!(tech_holder = ao2_alloc(sizeof(*tech_holder), NULL))) {
-               return -1;
-       }
+       AST_VECTOR_APPEND(&msg_techs, tech);
+       ast_verb(3, "Message technology '%s' registered.\n", tech->name);
+
+       ast_rwlock_unlock(&msg_techs_lock);
+
+       return 0;
+}
 
-       ast_rwlock_init(&tech_holder->tech_lock);
-       tech_holder->tech = tech;
+/*!
+ * \brief Comparison callback for \c ast_msg_tech vector removal
+ *
+ * \param vec_elem The element in the vector being compared
+ * \param srch The element being looked up
+ *
+ * \retval non-zero The items are equal
+ * \retval 0 The items are not equal
+ */
+static int msg_tech_cmp(const struct ast_msg_tech *vec_elem, const struct ast_msg_tech *srch)
+{
+       return !strcmp(vec_elem->name, srch->name);
+}
 
-       ao2_link(msg_techs, tech_holder);
+int ast_msg_tech_unregister(const struct ast_msg_tech *tech)
+{
+       int match;
+
+       ast_rwlock_wrlock(&msg_techs_lock);
+       match = AST_VECTOR_REMOVE_CMP_UNORDERED(&msg_techs, tech, msg_tech_cmp,
+                                               AST_VECTOR_ELEM_CLEANUP_NOOP);
+       ast_rwlock_unlock(&msg_techs_lock);
 
-       ao2_ref(tech_holder, -1);
-       tech_holder = NULL;
+       if (match) {
+               ast_log(LOG_ERROR, "No '%s' message technology found.\n", tech->name);
+               return -1;
+       }
 
-       ast_verb(3, "Message technology handler '%s' registered.\n", tech->name);
+       ast_verb(2, "Message technology '%s' unregistered.\n", tech->name);
 
        return 0;
 }
 
-int ast_msg_tech_unregister(const struct ast_msg_tech *tech)
+int ast_msg_handler_register(const struct ast_msg_handler *handler)
 {
-       struct ast_msg_tech_holder *tech_holder;
+       const struct ast_msg_handler *match;
 
-       tech_holder = msg_find_by_tech(tech, OBJ_POINTER | OBJ_UNLINK);
+       ast_rwlock_wrlock(&msg_handlers_lock);
 
-       if (!tech_holder) {
-               ast_log(LOG_ERROR, "No '%s' message technology found.\n", tech->name);
+       match = msg_handler_find_by_tech_name(handler->name);
+       if (match) {
+               ast_log(LOG_ERROR, "Message handler already registered for '%s'\n",
+                       handler->name);
+               ast_rwlock_unlock(&msg_handlers_lock);
                return -1;
        }
 
-       ast_rwlock_wrlock(&tech_holder->tech_lock);
-       tech_holder->tech = NULL;
-       ast_rwlock_unlock(&tech_holder->tech_lock);
+       AST_VECTOR_APPEND(&msg_handlers, handler);
+       ast_verb(2, "Message handler '%s' registered.\n", handler->name);
 
-       ao2_ref(tech_holder, -1);
-       tech_holder = NULL;
+       ast_rwlock_unlock(&msg_handlers_lock);
 
-       ast_verb(3, "Message technology handler '%s' unregistered.\n", tech->name);
+       return 0;
 
+}
+
+/*!
+ * \brief Comparison callback for \c ast_msg_handler vector removal
+ *
+ * \param vec_elem The element in the vector being compared
+ * \param srch The element being looked up
+ *
+ * \retval non-zero The items are equal
+ * \retval 0 The items are not equal
+ */
+static int msg_handler_cmp(const struct ast_msg_handler *vec_elem, const struct ast_msg_handler *srch)
+{
+       return !strcmp(vec_elem->name, srch->name);
+}
+
+int ast_msg_handler_unregister(const struct ast_msg_handler *handler)
+{
+       int match;
+
+       ast_rwlock_wrlock(&msg_handlers_lock);
+       match = AST_VECTOR_REMOVE_CMP_UNORDERED(&msg_handlers, handler, msg_handler_cmp,
+                                               AST_VECTOR_ELEM_CLEANUP_NOOP);
+       ast_rwlock_unlock(&msg_handlers_lock);
+
+       if (match) {
+               ast_log(LOG_ERROR, "No '%s' message handler found.\n", handler->name);
+               return -1;
+       }
+
+       ast_verb(3, "Message handler '%s' unregistered.\n", handler->name);
        return 0;
 }
 
@@ -1343,15 +1468,18 @@ void ast_msg_shutdown(void)
  */
 static void message_shutdown(void)
 {
+       ast_msg_handler_unregister(&dialplan_msg_handler);
+
        ast_custom_function_unregister(&msg_function);
        ast_custom_function_unregister(&msg_data_function);
        ast_unregister_application(app_msg_send);
        ast_manager_unregister("MessageSend");
 
-       if (msg_techs) {
-               ao2_ref(msg_techs, -1);
-               msg_techs = NULL;
-       }
+       AST_VECTOR_FREE(&msg_techs);
+       ast_rwlock_destroy(&msg_techs_lock);
+
+       AST_VECTOR_FREE(&msg_handlers);
+       ast_rwlock_destroy(&msg_handlers_lock);
 }
 
 /*
@@ -1373,12 +1501,19 @@ int ast_msg_init(void)
                return -1;
        }
 
-       msg_techs = ao2_container_alloc(17, msg_tech_hash, msg_tech_cmp);
-       if (!msg_techs) {
+       ast_rwlock_init(&msg_techs_lock);
+       if (AST_VECTOR_INIT(&msg_techs, 8)) {
+               return -1;
+       }
+
+       ast_rwlock_init(&msg_handlers_lock);
+       if (AST_VECTOR_INIT(&msg_handlers, 4)) {
                return -1;
        }
 
-       res = __ast_custom_function_register(&msg_function, NULL);
+       res = ast_msg_handler_register(&dialplan_msg_handler);
+
+       res |= __ast_custom_function_register(&msg_function, NULL);
        res |= __ast_custom_function_register(&msg_data_function, NULL);
        res |= ast_register_application2(app_msg_send, msg_send_exec, NULL, NULL, NULL);
        res |= ast_manager_register_xml_core("MessageSend", EVENT_FLAG_MESSAGE, action_messagesend);
index 0302db3..10fd3bd 100644 (file)
@@ -588,6 +588,140 @@ ari_validator ast_ari_validate_endpoint_fn(void)
        return ast_ari_validate_endpoint;
 }
 
+int ast_ari_validate_text_message(struct ast_json *json)
+{
+       int res = 1;
+       struct ast_json_iter *iter;
+       int has_body = 0;
+       int has_from = 0;
+       int has_to = 0;
+
+       for (iter = ast_json_object_iter(json); iter; iter = ast_json_object_iter_next(json, iter)) {
+               if (strcmp("body", ast_json_object_iter_key(iter)) == 0) {
+                       int prop_is_valid;
+                       has_body = 1;
+                       prop_is_valid = ast_ari_validate_string(
+                               ast_json_object_iter_value(iter));
+                       if (!prop_is_valid) {
+                               ast_log(LOG_ERROR, "ARI TextMessage field body failed validation\n");
+                               res = 0;
+                       }
+               } else
+               if (strcmp("from", ast_json_object_iter_key(iter)) == 0) {
+                       int prop_is_valid;
+                       has_from = 1;
+                       prop_is_valid = ast_ari_validate_string(
+                               ast_json_object_iter_value(iter));
+                       if (!prop_is_valid) {
+                               ast_log(LOG_ERROR, "ARI TextMessage field from failed validation\n");
+                               res = 0;
+                       }
+               } else
+               if (strcmp("to", ast_json_object_iter_key(iter)) == 0) {
+                       int prop_is_valid;
+                       has_to = 1;
+                       prop_is_valid = ast_ari_validate_string(
+                               ast_json_object_iter_value(iter));
+                       if (!prop_is_valid) {
+                               ast_log(LOG_ERROR, "ARI TextMessage field to failed validation\n");
+                               res = 0;
+                       }
+               } else
+               if (strcmp("variables", ast_json_object_iter_key(iter)) == 0) {
+                       int prop_is_valid;
+                       prop_is_valid = ast_ari_validate_list(
+                               ast_json_object_iter_value(iter),
+                               ast_ari_validate_text_message_variable);
+                       if (!prop_is_valid) {
+                               ast_log(LOG_ERROR, "ARI TextMessage field variables failed validation\n");
+                               res = 0;
+                       }
+               } else
+               {
+                       ast_log(LOG_ERROR,
+                               "ARI TextMessage has undocumented field %s\n",
+                               ast_json_object_iter_key(iter));
+                       res = 0;
+               }
+       }
+
+       if (!has_body) {
+               ast_log(LOG_ERROR, "ARI TextMessage missing required field body\n");
+               res = 0;
+       }
+
+       if (!has_from) {
+               ast_log(LOG_ERROR, "ARI TextMessage missing required field from\n");
+               res = 0;
+       }
+
+       if (!has_to) {
+               ast_log(LOG_ERROR, "ARI TextMessage missing required field to\n");
+               res = 0;
+       }
+
+       return res;
+}
+
+ari_validator ast_ari_validate_text_message_fn(void)
+{
+       return ast_ari_validate_text_message;
+}
+
+int ast_ari_validate_text_message_variable(struct ast_json *json)
+{
+       int res = 1;
+       struct ast_json_iter *iter;
+       int has_key = 0;
+       int has_value = 0;
+
+       for (iter = ast_json_object_iter(json); iter; iter = ast_json_object_iter_next(json, iter)) {
+               if (strcmp("key", ast_json_object_iter_key(iter)) == 0) {
+                       int prop_is_valid;
+                       has_key = 1;
+                       prop_is_valid = ast_ari_validate_string(
+                               ast_json_object_iter_value(iter));
+                       if (!prop_is_valid) {
+                               ast_log(LOG_ERROR, "ARI TextMessageVariable field key failed validation\n");
+                               res = 0;
+                       }
+               } else
+               if (strcmp("value", ast_json_object_iter_key(iter)) == 0) {
+                       int prop_is_valid;
+                       has_value = 1;
+                       prop_is_valid = ast_ari_validate_string(
+                               ast_json_object_iter_value(iter));
+                       if (!prop_is_valid) {
+                               ast_log(LOG_ERROR, "ARI TextMessageVariable field value failed validation\n");
+                               res = 0;
+                       }
+               } else
+               {
+                       ast_log(LOG_ERROR,
+                               "ARI TextMessageVariable has undocumented field %s\n",
+                               ast_json_object_iter_key(iter));
+                       res = 0;
+               }
+       }
+
+       if (!has_key) {
+               ast_log(LOG_ERROR, "ARI TextMessageVariable missing required field key\n");
+               res = 0;
+       }
+
+       if (!has_value) {
+               ast_log(LOG_ERROR, "ARI TextMessageVariable missing required field value\n");
+               res = 0;
+       }
+
+       return res;
+}
+
+ari_validator ast_ari_validate_text_message_variable_fn(void)
+{
+       return ast_ari_validate_text_message_variable;
+}
+
 int ast_ari_validate_caller_id(struct ast_json *json)
 {
        int res = 1;
@@ -3890,6 +4024,9 @@ int ast_ari_validate_event(struct ast_json *json)
        if (strcmp("StasisStart", discriminator) == 0) {
                return ast_ari_validate_stasis_start(json);
        } else
+       if (strcmp("TextMessageReceived", discriminator) == 0) {
+               return ast_ari_validate_text_message_received(json);
+       } else
        {
                ast_log(LOG_ERROR, "ARI Event has undocumented subtype %s\n",
                        discriminator);
@@ -4061,6 +4198,9 @@ int ast_ari_validate_message(struct ast_json *json)
        if (strcmp("StasisStart", discriminator) == 0) {
                return ast_ari_validate_stasis_start(json);
        } else
+       if (strcmp("TextMessageReceived", discriminator) == 0) {
+               return ast_ari_validate_text_message_received(json);
+       } else
        {
                ast_log(LOG_ERROR, "ARI Message has undocumented subtype %s\n",
                        discriminator);
@@ -4724,6 +4864,94 @@ ari_validator ast_ari_validate_stasis_start_fn(void)
        return ast_ari_validate_stasis_start;
 }
 
+int ast_ari_validate_text_message_received(struct ast_json *json)
+{
+       int res = 1;
+       struct ast_json_iter *iter;
+       int has_type = 0;
+       int has_application = 0;
+       int has_message = 0;
+
+       for (iter = ast_json_object_iter(json); iter; iter = ast_json_object_iter_next(json, iter)) {
+               if (strcmp("type", ast_json_object_iter_key(iter)) == 0) {
+                       int prop_is_valid;
+                       has_type = 1;
+                       prop_is_valid = ast_ari_validate_string(
+                               ast_json_object_iter_value(iter));
+                       if (!prop_is_valid) {
+                               ast_log(LOG_ERROR, "ARI TextMessageReceived field type failed validation\n");
+                               res = 0;
+                       }
+               } else
+               if (strcmp("application", ast_json_object_iter_key(iter)) == 0) {
+                       int prop_is_valid;
+                       has_application = 1;
+                       prop_is_valid = ast_ari_validate_string(
+                               ast_json_object_iter_value(iter));
+                       if (!prop_is_valid) {
+                               ast_log(LOG_ERROR, "ARI TextMessageReceived field application failed validation\n");
+                               res = 0;
+                       }
+               } else
+               if (strcmp("timestamp", ast_json_object_iter_key(iter)) == 0) {
+                       int prop_is_valid;
+                       prop_is_valid = ast_ari_validate_date(
+                               ast_json_object_iter_value(iter));
+                       if (!prop_is_valid) {
+                               ast_log(LOG_ERROR, "ARI TextMessageReceived field timestamp failed validation\n");
+                               res = 0;
+                       }
+               } else
+               if (strcmp("endpoint", ast_json_object_iter_key(iter)) == 0) {
+                       int prop_is_valid;
+                       prop_is_valid = ast_ari_validate_endpoint(
+                               ast_json_object_iter_value(iter));
+                       if (!prop_is_valid) {
+                               ast_log(LOG_ERROR, "ARI TextMessageReceived field endpoint failed validation\n");
+                               res = 0;
+                       }
+               } else
+               if (strcmp("message", ast_json_object_iter_key(iter)) == 0) {
+                       int prop_is_valid;
+                       has_message = 1;
+                       prop_is_valid = ast_ari_validate_text_message(
+                               ast_json_object_iter_value(iter));
+                       if (!prop_is_valid) {
+                               ast_log(LOG_ERROR, "ARI TextMessageReceived field message failed validation\n");
+                               res = 0;
+                       }
+               } else
+               {
+                       ast_log(LOG_ERROR,
+                               "ARI TextMessageReceived has undocumented field %s\n",
+                               ast_json_object_iter_key(iter));
+                       res = 0;
+               }
+       }
+
+       if (!has_type) {
+               ast_log(LOG_ERROR, "ARI TextMessageReceived missing required field type\n");
+               res = 0;
+       }
+
+       if (!has_application) {
+               ast_log(LOG_ERROR, "ARI TextMessageReceived missing required field application\n");
+               res = 0;
+       }
+
+       if (!has_message) {
+               ast_log(LOG_ERROR, "ARI TextMessageReceived missing required field message\n");
+               res = 0;
+       }
+
+       return res;
+}
+
+ari_validator ast_ari_validate_text_message_received_fn(void)
+{
+       return ast_ari_validate_text_message_received;
+}
+
 int ast_ari_validate_application(struct ast_json *json)
 {
        int res = 1;
index 0186168..beace67 100644 (file)
@@ -299,6 +299,42 @@ int ast_ari_validate_endpoint(struct ast_json *json);
 ari_validator ast_ari_validate_endpoint_fn(void);
 
 /*!
+ * \brief Validator for TextMessage.
+ *
+ * A text message.
+ *
+ * \param json JSON object to validate.
+ * \returns True (non-zero) if valid.
+ * \returns False (zero) if invalid.
+ */
+int ast_ari_validate_text_message(struct ast_json *json);
+
+/*!
+ * \brief Function pointer to ast_ari_validate_text_message().
+ *
+ * See \ref ast_ari_model_validators.h for more details.
+ */
+ari_validator ast_ari_validate_text_message_fn(void);
+
+/*!
+ * \brief Validator for TextMessageVariable.
+ *
+ * A key/value pair variable in a text message.
+ *
+ * \param json JSON object to validate.
+ * \returns True (non-zero) if valid.
+ * \returns False (zero) if invalid.
+ */
+int ast_ari_validate_text_message_variable(struct ast_json *json);
+
+/*!
+ * \brief Function pointer to ast_ari_validate_text_message_variable().
+ *
+ * See \ref ast_ari_model_validators.h for more details.
+ */
+ari_validator ast_ari_validate_text_message_variable_fn(void);
+
+/*!
  * \brief Validator for CallerID.
  *
  * Caller identification
@@ -1097,6 +1133,24 @@ int ast_ari_validate_stasis_start(struct ast_json *json);
 ari_validator ast_ari_validate_stasis_start_fn(void);
 
 /*!
+ * \brief Validator for TextMessageReceived.
+ *
+ * A text message was received from an endpoint.
+ *
+ * \param json JSON object to validate.
+ * \returns True (non-zero) if valid.
+ * \returns False (zero) if invalid.
+ */
+int ast_ari_validate_text_message_received(struct ast_json *json);
+
+/*!
+ * \brief Function pointer to ast_ari_validate_text_message_received().
+ *
+ * See \ref ast_ari_model_validators.h for more details.
+ */
+ari_validator ast_ari_validate_text_message_received_fn(void);
+
+/*!
  * \brief Validator for Application.
  *
  * Details of a Stasis application
@@ -1152,6 +1206,14 @@ ari_validator ast_ari_validate_application_fn(void);
  * - resource: string (required)
  * - state: string
  * - technology: string (required)
+ * TextMessage
+ * - body: string (required)
+ * - from: string (required)
+ * - to: string (required)
+ * - variables: List[TextMessageVariable]
+ * TextMessageVariable
+ * - key: string (required)
+ * - value: string (required)
  * CallerID
  * - name: string (required)
  * - number: string (required)
@@ -1405,6 +1467,12 @@ ari_validator ast_ari_validate_application_fn(void);
  * - timestamp: Date
  * - args: List[string] (required)
  * - channel: Channel (required)
+ * TextMessageReceived
+ * - type: string (required)
+ * - application: string (required)
+ * - timestamp: Date
+ * - endpoint: Endpoint
+ * - message: TextMessage (required)
  * Application
  * - bridge_ids: List[string] (required)
  * - channel_ids: List[string] (required)
index cef1e71..d0a1be3 100644 (file)
@@ -723,36 +723,6 @@ void ast_ari_channels_list(struct ast_variable *headers,
        ast_ari_response_ok(response, ast_json_ref(json));
 }
 
-static int json_to_ast_variables(struct ast_json *json_variables, struct ast_variable **variables)
-{
-       struct ast_variable *current = NULL;
-       struct ast_json_iter *it_json_var;
-
-       for (it_json_var = ast_json_object_iter(json_variables); it_json_var;
-                it_json_var = ast_json_object_iter_next(json_variables, it_json_var)) {
-               struct ast_variable *new_var;
-
-               new_var = ast_variable_new(ast_json_object_iter_key(it_json_var),
-                                                                  ast_json_string_get(ast_json_object_iter_value(it_json_var)),
-                                                                  "");
-               if (!new_var) {
-                       ast_variables_destroy(*variables);
-                       *variables = NULL;
-                       return 1;
-               }
-
-               if (!current) {
-                       *variables = new_var;
-                       current = *variables;
-               } else {
-                       current->next = new_var;
-                       current = new_var;
-               }
-       }
-
-       return 0;
-}
-
 static void ari_channels_handle_originate_with_id(const char *args_endpoint,
        const char *args_extension,
        const char *args_context,
@@ -894,7 +864,7 @@ void ast_ari_channels_originate_with_id(struct ast_variable *headers,
                ast_ari_channels_originate_with_id_parse_body(args->variables, args);
                json_variables = ast_json_object_get(args->variables, "variables");
                if (json_variables) {
-                       if (json_to_ast_variables(json_variables, &variables)) {
+                       if (ast_json_to_ast_variables(json_variables, &variables)) {
                                ast_log(AST_LOG_ERROR, "Unable to convert 'variables' in JSON body to channel variables\n");
                                ast_ari_response_alloc_failed(response);
                                return;
@@ -930,7 +900,7 @@ void ast_ari_channels_originate(struct ast_variable *headers,
                ast_ari_channels_originate_parse_body(args->variables, args);
                json_variables = ast_json_object_get(args->variables, "variables");
                if (json_variables) {
-                       if (json_to_ast_variables(json_variables, &variables)) {
+                       if (ast_json_to_ast_variables(json_variables, &variables)) {
                                ast_log(AST_LOG_ERROR, "Unable to convert 'variables' in JSON body to channel variables\n");
                                ast_ari_response_alloc_failed(response);
                                return;
index ff2b150..4f91e78 100644 (file)
@@ -174,3 +174,108 @@ void ast_ari_endpoints_get(struct ast_variable *headers,
 
        ast_ari_response_ok(response, json);
 }
+
+static void send_message(const char *to, const char *from, const char *body, struct ast_variable *variables, struct ast_ari_response *response)
+{
+       struct ast_variable *current;
+       struct ast_msg *msg;
+       int res = 0;
+
+       if (ast_strlen_zero(to)) {
+               ast_ari_response_error(response, 400, "Bad Request",
+                       "To must be specified");
+               return;
+       }
+
+       msg = ast_msg_alloc();
+       if (!msg) {
+               ast_ari_response_alloc_failed(response);
+               return;
+       }
+
+       res |= ast_msg_set_from(msg, "%s", from);
+       res |= ast_msg_set_to(msg, "%s", to);
+
+       if (!ast_strlen_zero(body)) {
+               res |= ast_msg_set_body(msg, "%s", body);
+       }
+
+       for (current = variables; current; current = current->next) {
+               res |= ast_msg_set_var_outbound(msg, current->name, current->value);
+       }
+
+       if (res) {
+               ast_ari_response_alloc_failed(response);
+               ast_msg_destroy(msg);
+               return;
+       }
+
+       if (ast_msg_send(msg, to, from)) {
+               ast_ari_response_error(response, 404, "Not Found",
+                       "Endpoint not found");
+       }
+
+       response->message = ast_json_null();
+       response->response_code = 202;
+       response->response_text = "Accepted";
+}
+
+void ast_ari_endpoints_send_message(struct ast_variable *headers,
+       struct ast_ari_endpoints_send_message_args *args,
+       struct ast_ari_response *response)
+{
+       RAII_VAR(struct ast_variable *, variables, NULL, ast_variables_destroy);
+
+       if (args->variables) {
+               struct ast_json *json_variables;
+
+               ast_ari_endpoints_send_message_parse_body(args->variables, args);
+               json_variables = ast_json_object_get(args->variables, "variables");
+               if (json_variables) {
+                       if (ast_json_to_ast_variables(json_variables, &variables)) {
+                               ast_log(AST_LOG_ERROR, "Unable to convert 'variables' in JSON body to Asterisk variables\n");
+                               ast_ari_response_alloc_failed(response);
+                               return;
+                       }
+               }
+       }
+
+       send_message(args->to, args->from, args->body, variables, response);
+}
+
+void ast_ari_endpoints_send_message_to_endpoint(struct ast_variable *headers,
+       struct ast_ari_endpoints_send_message_to_endpoint_args *args,
+       struct ast_ari_response *response)
+{
+       RAII_VAR(struct ast_variable *, variables, NULL, ast_variables_destroy);
+       RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
+       char msg_to[128];
+       char *tech = ast_strdupa(args->tech);
+
+       /* Really, we just want to know if this thing exists */
+       snapshot = ast_endpoint_latest_snapshot(args->tech, args->resource);
+       if (!snapshot) {
+               ast_ari_response_error(response, 404, "Not Found",
+                       "Endpoint not found");
+               return;
+       }
+
+       if (args->variables) {
+               struct ast_json *json_variables;
+
+               ast_ari_endpoints_send_message_to_endpoint_parse_body(args->variables, args);
+               json_variables = ast_json_object_get(args->variables, "variables");
+
+               if (json_variables) {
+                       if (ast_json_to_ast_variables(json_variables, &variables)) {
+                               ast_log(AST_LOG_ERROR, "Unable to convert 'variables' in JSON body to Asterisk variables\n");
+                               ast_ari_response_alloc_failed(response);
+                               return;
+                       }
+               }
+       }
+
+       snprintf(msg_to, sizeof(msg_to), "%s:%s", ast_str_to_lower(tech), args->resource);
+
+       send_message(msg_to, args->from, args->body, variables, response);
+}
index 3af81a6..4391b36 100644 (file)
@@ -50,6 +50,35 @@ struct ast_ari_endpoints_list_args {
  * \param[out] response HTTP response
  */
 void ast_ari_endpoints_list(struct ast_variable *headers, struct ast_ari_endpoints_list_args *args, struct ast_ari_response *response);
+/*! Argument struct for ast_ari_endpoints_send_message() */
+struct ast_ari_endpoints_send_message_args {
+       /*! The endpoint resource or technology specific URI to send the message to. Valid resources are sip, pjsip, and xmpp. */
+       const char *to;
+       /*! The endpoint resource or technology specific identity to send this message from. Valid resources are sip, pjsip, and xmpp. */
+       const char *from;
+       /*! The body of the message */
+       const char *body;
+       struct ast_json *variables;
+};
+/*!
+ * \brief Body parsing function for /endpoints/sendMessage.
+ * \param body The JSON body from which to parse parameters.
+ * \param[out] args The args structure to parse into.
+ * \retval zero on success
+ * \retval non-zero on failure
+ */
+int ast_ari_endpoints_send_message_parse_body(
+       struct ast_json *body,
+       struct ast_ari_endpoints_send_message_args *args);
+
+/*!
+ * \brief Send a message to some technology URI or endpoint.
+ *
+ * \param headers HTTP headers
+ * \param args Swagger parameters
+ * \param[out] response HTTP response
+ */
+void ast_ari_endpoints_send_message(struct ast_variable *headers, struct ast_ari_endpoints_send_message_args *args, struct ast_ari_response *response);
 /*! Argument struct for ast_ari_endpoints_list_by_tech() */
 struct ast_ari_endpoints_list_by_tech_args {
        /*! Technology of the endpoints (sip,iax2,...) */
@@ -78,5 +107,36 @@ struct ast_ari_endpoints_get_args {
  * \param[out] response HTTP response
  */
 void ast_ari_endpoints_get(struct ast_variable *headers, struct ast_ari_endpoints_get_args *args, struct ast_ari_response *response);
+/*! Argument struct for ast_ari_endpoints_send_message_to_endpoint() */
+struct ast_ari_endpoints_send_message_to_endpoint_args {
+       /*! Technology of the endpoint */
+       const char *tech;
+       /*! ID of the endpoint */
+       const char *resource;
+       /*! The endpoint resource or technology specific identity to send this message from. Valid resources are sip, pjsip, and xmpp. */
+       const char *from;
+       /*! The body of the message */
+       const char *body;
+       struct ast_json *variables;
+};
+/*!
+ * \brief Body parsing function for /endpoints/{tech}/{resource}/sendMessage.
+ * \param body The JSON body from which to parse parameters.
+ * \param[out] args The args structure to parse into.
+ * \retval zero on success
+ * \retval non-zero on failure
+ */
+int ast_ari_endpoints_send_message_to_endpoint_parse_body(
+       struct ast_json *body,
+       struct ast_ari_endpoints_send_message_to_endpoint_args *args);
+
+/*!
+ * \brief Send a message to some endpoint in a technology.
+ *
+ * \param headers HTTP headers
+ * \param args Swagger parameters
+ * \param[out] response HTTP response
+ */
+void ast_ari_endpoints_send_message_to_endpoint(struct ast_variable *headers, struct ast_ari_endpoints_send_message_to_endpoint_args *args, struct ast_ari_response *response);
 
 #endif /* _ASTERISK_RESOURCE_ENDPOINTS_H */
index f973c7a..071d66b 100644 (file)
@@ -102,6 +102,108 @@ static void ast_ari_endpoints_list_cb(
 fin: __attribute__((unused))
        return;
 }
+int ast_ari_endpoints_send_message_parse_body(
+       struct ast_json *body,
+       struct ast_ari_endpoints_send_message_args *args)
+{
+       struct ast_json *field;
+       /* Parse query parameters out of it */
+       field = ast_json_object_get(body, "to");
+       if (field) {
+               args->to = ast_json_string_get(field);
+       }
+       field = ast_json_object_get(body, "from");
+       if (field) {
+               args->from = ast_json_string_get(field);
+       }
+       field = ast_json_object_get(body, "body");
+       if (field) {
+               args->body = ast_json_string_get(field);
+       }
+       return 0;
+}
+
+/*!
+ * \brief Parameter parsing callback for /endpoints/sendMessage.
+ * \param get_params GET parameters in the HTTP request.
+ * \param path_vars Path variables extracted from the request.
+ * \param headers HTTP headers.
+ * \param[out] response Response to the HTTP request.
+ */
+static void ast_ari_endpoints_send_message_cb(
+       struct ast_tcptls_session_instance *ser,
+       struct ast_variable *get_params, struct ast_variable *path_vars,
+       struct ast_variable *headers, struct ast_ari_response *response)
+{
+       struct ast_ari_endpoints_send_message_args args = {};
+       struct ast_variable *i;
+       RAII_VAR(struct ast_json *, body, NULL, ast_json_unref);
+#if defined(AST_DEVMODE)
+       int is_valid;
+       int code;
+#endif /* AST_DEVMODE */
+
+       for (i = get_params; i; i = i->next) {
+               if (strcmp(i->name, "to") == 0) {
+                       args.to = (i->value);
+               } else
+               if (strcmp(i->name, "from") == 0) {
+                       args.from = (i->value);
+               } else
+               if (strcmp(i->name, "body") == 0) {
+                       args.body = (i->value);
+               } else
+               {}
+       }
+       /* Look for a JSON request entity */
+       body = ast_http_get_json(ser, headers);
+       if (!body) {
+               switch (errno) {
+               case EFBIG:
+                       ast_ari_response_error(response, 413, "Request Entity Too Large", "Request body too large");
+                       goto fin;
+               case ENOMEM:
+                       ast_ari_response_error(response, 500, "Internal Server Error", "Error processing request");
+                       goto fin;
+               case EIO:
+                       ast_ari_response_error(response, 400, "Bad Request", "Error parsing request body");
+                       goto fin;
+               }
+       }
+       args.variables = ast_json_ref(body);
+       ast_ari_endpoints_send_message(headers, &args, response);
+#if defined(AST_DEVMODE)
+       code = response->response_code;
+
+       switch (code) {
+       case 0: /* Implementation is still a stub, or the code wasn't set */
+               is_valid = response->message == NULL;
+               break;
+       case 500: /* Internal Server Error */
+       case 501: /* Not Implemented */
+       case 404: /* Endpoint not found */
+               is_valid = 1;
+               break;
+       default:
+               if (200 <= code && code <= 299) {
+                       is_valid = ast_ari_validate_void(
+                               response->message);
+               } else {
+                       ast_log(LOG_ERROR, "Invalid error response %d for /endpoints/sendMessage\n", code);
+                       is_valid = 0;
+               }
+       }
+
+       if (!is_valid) {
+               ast_log(LOG_ERROR, "Response validation failed for /endpoints/sendMessage\n");
+               ast_ari_response_error(response, 500,
+                       "Internal Server Error", "Response validation failed");
+       }
+#endif /* AST_DEVMODE */
+
+fin: __attribute__((unused))
+       return;
+}
 /*!
  * \brief Parameter parsing callback for /endpoints/{tech}.
  * \param get_params GET parameters in the HTTP request.
@@ -200,6 +302,7 @@ static void ast_ari_endpoints_get_cb(
                break;
        case 500: /* Internal Server Error */
        case 501: /* Not Implemented */
+       case 400: /* Invalid parameters for sending a message. */
        case 404: /* Endpoints not found */
                is_valid = 1;
                break;
@@ -223,16 +326,139 @@ static void ast_ari_endpoints_get_cb(
 fin: __attribute__((unused))
        return;
 }
+int ast_ari_endpoints_send_message_to_endpoint_parse_body(
+       struct ast_json *body,
+       struct ast_ari_endpoints_send_message_to_endpoint_args *args)
+{
+       struct ast_json *field;
+       /* Parse query parameters out of it */
+       field = ast_json_object_get(body, "from");
+       if (field) {
+               args->from = ast_json_string_get(field);
+       }
+       field = ast_json_object_get(body, "body");
+       if (field) {
+               args->body = ast_json_string_get(field);
+       }
+       return 0;
+}
+
+/*!
+ * \brief Parameter parsing callback for /endpoints/{tech}/{resource}/sendMessage.
+ * \param get_params GET parameters in the HTTP request.
+ * \param path_vars Path variables extracted from the request.
+ * \param headers HTTP headers.
+ * \param[out] response Response to the HTTP request.
+ */
+static void ast_ari_endpoints_send_message_to_endpoint_cb(
+       struct ast_tcptls_session_instance *ser,
+       struct ast_variable *get_params, struct ast_variable *path_vars,
+       struct ast_variable *headers, struct ast_ari_response *response)
+{
+       struct ast_ari_endpoints_send_message_to_endpoint_args args = {};
+       struct ast_variable *i;
+       RAII_VAR(struct ast_json *, body, NULL, ast_json_unref);
+#if defined(AST_DEVMODE)
+       int is_valid;
+       int code;
+#endif /* AST_DEVMODE */
+
+       for (i = get_params; i; i = i->next) {
+               if (strcmp(i->name, "from") == 0) {
+                       args.from = (i->value);
+               } else
+               if (strcmp(i->name, "body") == 0) {
+                       args.body = (i->value);
+               } else
+               {}
+       }
+       for (i = path_vars; i; i = i->next) {
+               if (strcmp(i->name, "tech") == 0) {
+                       args.tech = (i->value);
+               } else
+               if (strcmp(i->name, "resource") == 0) {
+                       args.resource = (i->value);
+               } else
+               {}
+       }
+       /* Look for a JSON request entity */
+       body = ast_http_get_json(ser, headers);
+       if (!body) {
+               switch (errno) {
+               case EFBIG:
+                       ast_ari_response_error(response, 413, "Request Entity Too Large", "Request body too large");
+                       goto fin;
+               case ENOMEM:
+                       ast_ari_response_error(response, 500, "Internal Server Error", "Error processing request");
+                       goto fin;
+               case EIO:
+                       ast_ari_response_error(response, 400, "Bad Request", "Error parsing request body");
+                       goto fin;
+               }
+       }
+       args.variables = ast_json_ref(body);
+       ast_ari_endpoints_send_message_to_endpoint(headers, &args, response);
+#if defined(AST_DEVMODE)
+       code = response->response_code;
+
+       switch (code) {
+       case 0: /* Implementation is still a stub, or the code wasn't set */
+               is_valid = response->message == NULL;
+               break;
+       case 500: /* Internal Server Error */
+       case 501: /* Not Implemented */
+       case 400: /* Invalid parameters for sending a message. */
+       case 404: /* Endpoint not found */
+               is_valid = 1;
+               break;
+       default:
+               if (200 <= code && code <= 299) {
+                       is_valid = ast_ari_validate_void(
+                               response->message);
+               } else {
+                       ast_log(LOG_ERROR, "Invalid error response %d for /endpoints/{tech}/{resource}/sendMessage\n", code);
+                       is_valid = 0;
+               }
+       }
+
+       if (!is_valid) {
+               ast_log(LOG_ERROR, "Response validation failed for /endpoints/{tech}/{resource}/sendMessage\n");
+               ast_ari_response_error(response, 500,
+                       "Internal Server Error", "Response validation failed");
+       }
+#endif /* AST_DEVMODE */
+
+fin: __attribute__((unused))
+       return;
+}
 
 /*! \brief REST handler for /api-docs/endpoints.{format} */
+static struct stasis_rest_handlers endpoints_sendMessage = {
+       .path_segment = "sendMessage",
+       .callbacks = {
+               [AST_HTTP_PUT] = ast_ari_endpoints_send_message_cb,
+       },
+       .num_children = 0,
+       .children = {  }
+};
+/*! \brief REST handler for /api-docs/endpoints.{format} */
+static struct stasis_rest_handlers endpoints_tech_resource_sendMessage = {
+       .path_segment = "sendMessage",
+       .callbacks = {
+               [AST_HTTP_PUT] = ast_ari_endpoints_send_message_to_endpoint_cb,
+       },
+       .num_children = 0,
+       .children = {  }
+};
+/*! \brief REST handler for /api-docs/endpoints.{format} */
 static struct stasis_rest_handlers endpoints_tech_resource = {
        .path_segment = "resource",
        .is_wildcard = 1,
        .callbacks = {
                [AST_HTTP_GET] = ast_ari_endpoints_get_cb,
        },
-       .num_children = 0,
-       .children = {  }
+       .num_children = 1,
+       .children = { &endpoints_tech_resource_sendMessage, }
 };
 /*! \brief REST handler for /api-docs/endpoints.{format} */
 static struct stasis_rest_handlers endpoints_tech = {
@@ -250,8 +476,8 @@ static struct stasis_rest_handlers endpoints = {
        .callbacks = {
                [AST_HTTP_GET] = ast_ari_endpoints_list_cb,
        },
-       .num_children = 1,
-       .children = { &endpoints_tech, }
+       .num_children = 2,
+       .children = { &endpoints_sendMessage,&endpoints_tech, }
 };
 
 static int load_module(void)
index f802614..db97525 100644 (file)
@@ -47,40 +47,10 @@ const pjsip_method pjsip_message_method = {PJSIP_OTHER_METHOD, {"MESSAGE", 7} };
 
 #define MAX_HDR_SIZE 512
 #define MAX_BODY_SIZE 1024
-#define MAX_EXTEN_SIZE 256
 #define MAX_USER_SIZE 128
 
 /*!
  * \internal
- * \brief Determine where in the dialplan a call should go
- *
- * \details This uses the username in the request URI to try to match
- * an extension in an endpoint's context in order to route the call.
- *
- * \param rdata The SIP request
- * \param context The context to use
- * \param exten The extension to use
- */
-static enum pjsip_status_code get_destination(const pjsip_rx_data *rdata, const char *context, char *exten)
-{
-       pjsip_uri *ruri = rdata->msg_info.msg->line.req.uri;
-       pjsip_sip_uri *sip_ruri;
-
-       if (!PJSIP_URI_SCHEME_IS_SIP(ruri) && !PJSIP_URI_SCHEME_IS_SIPS(ruri)) {
-               return PJSIP_SC_UNSUPPORTED_URI_SCHEME;
-       }
-
-       sip_ruri = pjsip_uri_get_uri(ruri);
-       ast_copy_pj_str(exten, &sip_ruri->user, MAX_EXTEN_SIZE);
-
-       if (ast_exists_extension(NULL, context, exten, 1, NULL)) {
-               return PJSIP_SC_OK;
-       }
-       return PJSIP_SC_NOT_FOUND;
-}
-
-/*!
- * \internal
  * \brief Checks to make sure the request has the correct content type.
  *
  * \details This module supports the following media types: "text/plain".
@@ -244,7 +214,6 @@ static void update_from(pjsip_tx_data *tdata, char *from)
                                      PJSIP_PARSE_URI_AS_NAMEADDR))) {
                pjsip_name_addr *parsed_name_addr = (pjsip_name_addr *)parsed;
                pjsip_sip_uri *parsed_uri = pjsip_uri_get_uri(parsed_name_addr->uri);
-
                if (pj_strlen(&parsed_name_addr->display)) {
                        pj_strdup(tdata->pool, &name_addr->display,
                                  &parsed_name_addr->display);
@@ -458,58 +427,62 @@ static char *sip_to_pjsip(char *buf, int size, int capacity)
  */
 static enum pjsip_status_code rx_data_to_ast_msg(pjsip_rx_data *rdata, struct ast_msg *msg)
 {
-
-#define CHECK_RES(z_) do { if (z_) { ast_msg_destroy(msg); \
-               return PJSIP_SC_INTERNAL_SERVER_ERROR; } } while (0)
-
-       int size;
-       char buf[MAX_BODY_SIZE];
+       struct ast_sip_endpoint *endpt = ast_pjsip_rdata_get_endpoint(rdata);
+       pjsip_uri *ruri = rdata->msg_info.msg->line.req.uri;
+       pjsip_sip_uri *sip_ruri;
        pjsip_name_addr *name_addr;
+       char buf[MAX_BODY_SIZE];
        const char *field;
-       pjsip_status_code code;
-       struct ast_sip_endpoint *endpt = ast_pjsip_rdata_get_endpoint(rdata);
        const char *context = S_OR(endpt->message_context, endpt->context);
+       char exten[AST_MAX_EXTENSION];
+       int res = 0;
+       int size;
 
-       /* make sure there is an appropriate context and extension*/
-       if ((code = get_destination(rdata, context, buf)) != PJSIP_SC_OK) {
-               return code;
+       if (!PJSIP_URI_SCHEME_IS_SIP(ruri) && !PJSIP_URI_SCHEME_IS_SIPS(ruri)) {
+               return PJSIP_SC_UNSUPPORTED_URI_SCHEME;
        }
 
-       CHECK_RES(ast_msg_set_context(msg, "%s", context));
-       CHECK_RES(ast_msg_set_exten(msg, "%s", buf));
+       sip_ruri = pjsip_uri_get_uri(ruri);
+       ast_copy_pj_str(exten, &sip_ruri->user, AST_MAX_EXTENSION);
+
+       res |= ast_msg_set_context(msg, "%s", context);
+       res |= ast_msg_set_exten(msg, "%s", exten);
 
        /* to header */
        name_addr = (pjsip_name_addr *)rdata->msg_info.to->uri;
-       if ((size = pjsip_uri_print(PJSIP_URI_IN_FROMTO_HDR, name_addr, buf, sizeof(buf)-1)) > 0) {
-               buf[size] = '\0';
-               /* prepend the tech */
-               CHECK_RES(ast_msg_set_to(msg, "%s", sip_to_pjsip(buf, ++size, sizeof(buf)-1)));
+       size = pjsip_uri_print(PJSIP_URI_IN_FROMTO_HDR, name_addr, buf, sizeof(buf) - 1);
+       if (size <= 0) {
+               return PJSIP_SC_INTERNAL_SERVER_ERROR;
        }
+       buf[size] = '\0';
+       res |= ast_msg_set_to(msg, "%s", sip_to_pjsip(buf, ++size, sizeof(buf) - 1));
 
        /* from header */
        name_addr = (pjsip_name_addr *)rdata->msg_info.from->uri;
-       if ((size = pjsip_uri_print(PJSIP_URI_IN_FROMTO_HDR, name_addr, buf, sizeof(buf)-1)) > 0) {
-               buf[size] = '\0';
-               CHECK_RES(ast_msg_set_from(msg, "%s", buf));
+       size = pjsip_uri_print(PJSIP_URI_IN_FROMTO_HDR, name_addr, buf, sizeof(buf) - 1);
+       if (size <= 0) {
+               return PJSIP_SC_INTERNAL_SERVER_ERROR;
        }
+       buf[size] = '\0';
+       res |= ast_msg_set_from(msg, "%s", buf);
 
-       /* receive address */
-       field = pj_sockaddr_print(&rdata->pkt_info.src_addr, buf, sizeof(buf)-1, 1);
-       CHECK_RES(ast_msg_set_var(msg, "PJSIP_RECVADDR", field));
+       field = pj_sockaddr_print(&rdata->pkt_info.src_addr, buf, sizeof(buf) - 1, 1);
+       res |= ast_msg_set_var(msg, "PJSIP_RECVADDR", field);
 
-       /* body */
        if (print_body(rdata, buf, sizeof(buf) - 1) > 0) {
-               CHECK_RES(ast_msg_set_body(msg, "%s", buf));
+               res |= ast_msg_set_body(msg, "%s", buf);
        }
 
        /* endpoint name */
+       res |= ast_msg_set_tech(msg, "%s", "PJSIP");
+       res |= ast_msg_set_endpoint(msg, "%s", ast_sorcery_object_get_id(endpt));
        if (endpt->id.self.name.valid) {
-               CHECK_RES(ast_msg_set_var(msg, "PJSIP_PEERNAME", endpt->id.self.name.str));
+               res |= ast_msg_set_var(msg, "PJSIP_ENDPOINT", endpt->id.self.name.str);
        }
 
-       CHECK_RES(headers_to_vars(rdata, msg));
+       res |= headers_to_vars(rdata, msg);
 
-       return PJSIP_SC_OK;
+       return !res ? PJSIP_SC_OK : PJSIP_SC_INTERNAL_SERVER_ERROR;
 }
 
 struct msg_data {
@@ -547,17 +520,14 @@ static struct msg_data* msg_data_create(const struct ast_msg *msg, const char *t
                return NULL;
        }
 
-       /* if there is another sip in the uri then we are good,
-          otherwise it needs a sip: in front */
-       mdata->to = to == skip_sip(to) ? ast_strdup(to - 3) :
-               ast_strdup(++to);
+       /* Make sure we start with sip: */
+       mdata->to = ast_begins_with(to, "sip:") ? ast_strdup(++to) : ast_strdup(to - 3);
        mdata->from = ast_strdup(from);
 
        /* sometimes from can still contain the tag at this point, so remove it */
        if ((tag = strchr(mdata->from, ';'))) {
                *tag = '\0';
        }
-
        return mdata;
 }
 
@@ -577,8 +547,8 @@ static int msg_send(void *data)
                         mdata->to, &uri), ao2_cleanup);
 
        if (!endpoint) {
-               ast_log(LOG_ERROR, "PJSIP MESSAGE - Could not find endpoint and "
-                       "no default outbound endpoint configured\n");
+               ast_log(LOG_ERROR, "PJSIP MESSAGE - Could not find endpoint '%s' and "
+                       "no default outbound endpoint configured\n", mdata->to);
                return -1;
        }
 
@@ -598,6 +568,9 @@ static int msg_send(void *data)
 
        vars_to_headers(mdata->msg, tdata);
 
+       ast_debug(1, "Sending message to '%s' (via endpoint %s) from '%s'\n",
+               mdata->to, ast_sorcery_object_get_id(endpoint), mdata->from);
+
        if (ast_sip_send_request(tdata, NULL, endpoint, NULL, NULL)) {
                ast_log(LOG_ERROR, "PJSIP MESSAGE - Could not send request\n");
                return -1;
@@ -670,24 +643,36 @@ static pj_bool_t module_on_rx_request(pjsip_rx_data *rdata)
                return PJ_FALSE;
        }
 
+       code = check_content_type(rdata);
+       if (code != PJSIP_SC_OK) {
+               send_response(rdata, code, NULL, NULL);
+               return PJ_TRUE;
+       }
+
        msg = ast_msg_alloc();
        if (!msg) {
                send_response(rdata, PJSIP_SC_INTERNAL_SERVER_ERROR, NULL, NULL);
                return PJ_TRUE;
        }
 
-       if ((code = check_content_type(rdata)) != PJSIP_SC_OK) {
+       code = rx_data_to_ast_msg(rdata, msg);
+       if (code != PJSIP_SC_OK) {
                send_response(rdata, code, NULL, NULL);
+               ast_msg_destroy(msg);
                return PJ_TRUE;
        }
 
-       if ((code = rx_data_to_ast_msg(rdata, msg)) == PJSIP_SC_OK) {
-               /* send it to the dialplan */
-               ast_msg_queue(msg);
-               code = PJSIP_SC_ACCEPTED;
+       if (!ast_msg_has_destination(msg)) {
+               ast_debug(1, "MESSAGE request received, but no handler wanted it\n");
+               send_response(rdata, PJSIP_SC_NOT_FOUND, NULL, NULL);
+               ast_msg_destroy(msg);
+               return PJ_TRUE;
        }
 
-       send_response(rdata, code, NULL, NULL);
+       /* send it to the messaging core */
+       ast_msg_queue(msg);
+       send_response(rdata, PJSIP_SC_ACCEPTED, NULL, NULL);
+
        return PJ_TRUE;
 }
 
index a64feee..7d53731 100644 (file)
@@ -66,6 +66,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
 #include "asterisk/strings.h"
 #include "stasis/app.h"
 #include "stasis/control.h"
+#include "stasis/messaging.h"
 #include "stasis/stasis_bridge.h"
 #include "asterisk/core_unreal.h"
 #include "asterisk/musiconhold.h"
@@ -1433,6 +1434,8 @@ static int unload_module(void)
 {
        stasis_app_unregister_event_sources();
 
+       messaging_cleanup();
+
        ao2_cleanup(apps_registry);
        apps_registry = NULL;
 
@@ -1495,6 +1498,11 @@ static int load_module(void)
                return AST_MODULE_LOAD_FAILURE;
        }
 
+       if (messaging_init()) {
+               unload_module();
+               return AST_MODULE_LOAD_FAILURE;
+       }
+
        bridge_stasis_init();
 
        stasis_app_register_event_sources();
index f5734ce..b3c3748 100644 (file)
@@ -3174,16 +3174,27 @@ static int xmpp_pak_message(struct ast_xmpp_client *client, struct ast_xmpp_clie
 
        if (ast_test_flag(&cfg->flags, XMPP_SEND_TO_DIALPLAN)) {
                struct ast_msg *msg;
+               struct ast_xmpp_buddy *buddy;
 
                if ((msg = ast_msg_alloc())) {
                        int res;
 
                        ast_xmpp_client_lock(client);
 
+                       buddy = ao2_find(client->buddies, pak->from->partial, OBJ_KEY | OBJ_NOLOCK);
+
                        res = ast_msg_set_to(msg, "xmpp:%s", cfg->user);
                        res |= ast_msg_set_from(msg, "xmpp:%s", message->from);
                        res |= ast_msg_set_body(msg, "%s", message->message);
                        res |= ast_msg_set_context(msg, "%s", cfg->context);
+                       res |= ast_msg_set_tech(msg, "%s", "XMPP");
+                       res |= ast_msg_set_endpoint(msg, "%s", client->name);
+
+                       if (buddy) {
+                               res |= ast_msg_set_var(msg, "XMPP_BUDDY", buddy->id);
+                       }
+
+                       ao2_cleanup(buddy);
 
                        ast_xmpp_client_unlock(client);
 
index 41f6ccf..7e7911b 100644 (file)
@@ -28,6 +28,7 @@
 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
 
 #include "app.h"
+#include "messaging.h"
 
 #include "asterisk/callerid.h"
 #include "asterisk/stasis_app.h"
@@ -511,6 +512,44 @@ static struct ast_json *simple_endpoint_event(
                "endpoint", json_endpoint);
 }
 
+static int message_received_handler(const char *endpoint_id, struct ast_json *json_msg, void *pvt)
+{
+       RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
+       struct ast_json *json_endpoint;
+       struct stasis_app *app = pvt;
+       char *tech;
+       char *resource;
+
+       tech = ast_strdupa(endpoint_id);
+       resource = strchr(tech, '/');
+       if (resource) {
+               resource[0] = '\0';
+               resource++;
+       }
+
+       if (ast_strlen_zero(tech) || ast_strlen_zero(resource)) {
+               return -1;
+       }
+
+       snapshot = ast_endpoint_latest_snapshot(tech, resource);
+       if (!snapshot) {
+               return -1;
+       }
+
+       json_endpoint = ast_endpoint_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
+       if (!json_endpoint) {
+               return -1;
+       }
+
+       app_send(app, ast_json_pack("{s: s, s: o, s: o, s: O}",
+               "type", "TextMessageReceived",
+               "timestamp", ast_json_timeval(ast_tvnow(), NULL),
+               "endpoint", json_endpoint,
+               "message", json_msg));
+
+       return 0;
+}
+
 static void sub_endpoint_update_handler(void *data,
        struct stasis_subscription *sub,
        struct stasis_message *message)
@@ -1018,6 +1057,10 @@ static int unsubscribe(struct stasis_app *app, const char *kind, const char *id,
                ao2_find(app->forwards, forwards,
                        OBJ_POINTER | OBJ_NOLOCK | OBJ_UNLINK |
                        OBJ_NODATA);
+
+               if (!strcmp(kind, "endpoint")) {
+                       messaging_app_unsubscribe_endpoint(app->name, id);
+               }
        }
 
        return 0;
@@ -1148,6 +1191,9 @@ int app_subscribe_endpoint(struct stasis_app *app, struct ast_endpoint *endpoint
                                return -1;
                        }
                        ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
+
+                       /* Subscribe for messages */
+                       messaging_app_subscribe_endpoint(app->name, endpoint, &message_received_handler, app);
                }
 
                ++forwards->interested;
diff --git a/res/stasis/messaging.c b/res/stasis/messaging.c
new file mode 100644 (file)
index 0000000..4773085
--- /dev/null
@@ -0,0 +1,531 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2014, Digium, Inc.
+ *
+ * Matt Jordan <mjordan@digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*!
+ * \file
+ *
+ * \brief Stasis out-of-call text message support
+ *
+ * \author Matt Jordan <mjordan@digium.com>
+ */
+
+#include "asterisk.h"
+
+ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
+
+#include "asterisk/message.h"
+#include "asterisk/endpoints.h"
+#include "asterisk/astobj2.h"
+#include "asterisk/vector.h"
+#include "asterisk/lock.h"
+#include "asterisk/utils.h"
+#include "asterisk/test.h"
+#include "messaging.h"
+
+/*!
+ * \brief Number of buckets for the \ref endpoint_subscriptions container
+ */
+#define ENDPOINTS_NUM_BUCKETS 127
+
+/*! \brief Storage object for an application */
+struct application_tuple {
+       /*! ao2 ref counted private object to pass to the callback */
+       void *pvt;
+       /*! The callback to call when this application has a message */
+       message_received_cb callback;
+       /*! The name (key) of the application */
+       char app_name[];
+};
+
+/*! \brief A subscription to some endpoint or technology */
+struct message_subscription {
+       /*! The applications that have subscribed to this endpoint or tech */
+       AST_VECTOR(, struct application_tuple *) applications;
+       /*! The name of this endpoint or tech */
+       char token[];
+};
+
+/*! \brief The subscriptions to endpoints */
+static struct ao2_container *endpoint_subscriptions;
+
+/*!
+ * \brief The subscriptions to technologies
+ *
+ * \note These are stored separately from standard endpoints, given how
+ * relatively few of them there are.
+ */
+static AST_VECTOR(,struct message_subscription *) tech_subscriptions;
+
+/*! \brief RWLock for \c tech_subscriptions */
+static ast_rwlock_t tech_subscriptions_lock;
+
+/*! \internal \brief Destructor for \c application_tuple */
+static void application_tuple_dtor(void *obj)
+{
+       struct application_tuple *tuple = obj;
+
+       ao2_cleanup(tuple->pvt);
+}
+
+/*! \internal \brief Constructor for \c application_tuple */
+static struct application_tuple *application_tuple_alloc(const char *app_name, message_received_cb callback, void *pvt)
+{
+       struct application_tuple *tuple;
+       size_t size = sizeof(*tuple) + strlen(app_name) + 1;
+
+       ast_assert(callback != NULL);
+
+       tuple = ao2_t_alloc(size, application_tuple_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
+       if (!tuple) {
+               return NULL;
+       }
+
+       strcpy(tuple->app_name, app_name); /* Safe */
+       tuple->pvt = ao2_bump(pvt);
+       tuple->callback = callback;
+
+       return tuple;
+}
+
+/*! \internal \brief Destructor for \ref message_subscription */
+static void message_subscription_dtor(void *obj)
+{
+       struct message_subscription *sub = obj;
+       int i;
+
+       for (i = 0; i < AST_VECTOR_SIZE(&sub->applications); i++) {
+               struct application_tuple *tuple = AST_VECTOR_GET(&sub->applications, i);
+
+               ao2_cleanup(tuple);
+       }
+       AST_VECTOR_FREE(&sub->applications);
+}
+
+/*! \internal \brief Constructor for \ref message_subscription */
+static struct message_subscription *message_subscription_alloc(const char *token)
+{
+       struct message_subscription *sub;
+       size_t size = sizeof(*sub) + strlen(token) + 1;
+
+       sub = ao2_t_alloc(size, message_subscription_dtor, AO2_ALLOC_OPT_LOCK_RWLOCK);
+       if (!sub) {
+               return NULL;
+       }
+       strcpy(sub->token, token); /* Safe */
+
+       return sub;
+}
+
+/*! AO2 hash function for \ref message_subscription */
+static int message_subscription_hash_cb(const void *obj, const int flags)
+{
+       const struct message_subscription *sub;
+       const char *key;
+
+       switch (flags & OBJ_SEARCH_MASK) {
+       case OBJ_SEARCH_KEY:
+               key = obj;
+               break;
+       case OBJ_SEARCH_OBJECT:
+               sub = obj;
+               key = sub->token;
+               break;
+       default:
+               /* Hash can only work on something with a full key. */
+               ast_assert(0);
+               return 0;
+       }
+       return ast_str_hash(key);
+}
+
+/*! AO2 comparison function for \ref message_subscription */
+static int message_subscription_compare_cb(void *obj, void *arg, int flags)
+{
+       const struct message_subscription *object_left = obj;
+       const struct message_subscription *object_right = arg;
+       const char *right_key = arg;
+       int cmp;
+
+       switch (flags & OBJ_SEARCH_MASK) {
+       case OBJ_SEARCH_OBJECT:
+               right_key = object_right->token;
+               /* Fall through */
+       case OBJ_SEARCH_KEY:
+               cmp = strcmp(object_left->token, right_key);
+               break;
+       case OBJ_SEARCH_PARTIAL_KEY:
+               /*
+                * We could also use a partial key struct containing a length
+                * so strlen() does not get called for every comparison instead.
+                */
+               cmp = strncmp(object_left->token, right_key, strlen(right_key));
+               break;
+       default:
+               /*
+                * What arg points to is specific to this traversal callback
+                * and has no special meaning to astobj2.
+                */
+               cmp = 0;
+               break;
+       }
+       if (cmp) {
+               return 0;
+       }
+       /*
+        * At this point the traversal callback is identical to a sorted
+        * container.
+        */
+       return CMP_MATCH;
+}
+
+/*! \internal \brief Convert a \c ast_msg To/From URI to a Stasis endpoint name */
+static void msg_to_endpoint(const struct ast_msg *msg, char *buf, size_t len)
+{
+       const char *endpoint = ast_msg_get_endpoint(msg);
+
+       snprintf(buf, len, "%s%s%s", ast_msg_get_tech(msg),
+               ast_strlen_zero(endpoint) ? "" : "/",
+               S_OR(endpoint, ""));
+}
+
+/*! \internal
+ * \brief Callback from the \c message API that determines if we can handle
+ * this message
+ */
+static int has_destination_cb(const struct ast_msg *msg)
+{
+       struct message_subscription *sub;
+       int i;
+       char buf[256];
+
+       msg_to_endpoint(msg, buf, sizeof(buf));
+
+       ast_rwlock_rdlock(&tech_subscriptions_lock);
+       for (i = 0; i < AST_VECTOR_SIZE(&tech_subscriptions); i++) {
+               sub = AST_VECTOR_GET(&tech_subscriptions, i);
+
+               if (sub && (!strncasecmp(sub->token, buf, strlen(sub->token))
+                           || !strncasecmp(sub->token, buf, strlen(sub->token)))) {
+                       ast_rwlock_unlock(&tech_subscriptions_lock);
+                       sub = NULL; /* No ref bump! */
+                       goto match;
+               }
+
+       }
+       ast_rwlock_unlock(&tech_subscriptions_lock);
+
+       sub = ao2_find(endpoint_subscriptions, buf, OBJ_SEARCH_KEY);
+       if (sub) {
+               goto match;
+       }
+
+       ast_debug(1, "No subscription found for %s\n", buf);
+       return 0;
+
+match:
+       ao2_cleanup(sub);
+       return 1;
+}
+
+static struct ast_json *msg_to_json(struct ast_msg *msg)
+{
+       struct ast_json *json_obj;
+       struct ast_json *json_vars;
+       struct ast_msg_var_iterator *it_vars;
+       const char *name;
+       const char *value;
+
+       it_vars = ast_msg_var_iterator_init(msg);
+       if (!it_vars) {
+               return NULL;
+       }
+
+       json_vars = ast_json_array_create();
+       if (!json_vars) {
+               return NULL;
+       }
+
+       while (ast_msg_var_iterator_next(msg, it_vars, &name, &value)) {
+               struct ast_json *json_tuple;
+
+               json_tuple = ast_json_pack("{s: s}", name, value);
+               if (!json_tuple) {
+                       ast_json_free(json_vars);
+                       return NULL;
+               }
+
+               ast_json_array_append(json_vars, json_tuple);
+               ast_msg_var_unref_current(it_vars);
+       }
+       ast_msg_var_iterator_destroy(it_vars);
+
+       json_obj = ast_json_pack("{s: s, s: s, s: s, s: o}",
+               "from", ast_msg_get_from(msg),
+               "to", ast_msg_get_to(msg),
+               "body", ast_msg_get_body(msg),
+               "variables", json_vars);
+
+       return json_obj;
+}
+
+static int handle_msg_cb(struct ast_msg *msg)
+{
+       struct message_subscription *sub;
+       int i;
+       char buf[256];
+       const char *endpoint_name;
+       struct ast_json *json_msg;
+
+       msg_to_endpoint(msg, buf, sizeof(buf));
+
+       ast_rwlock_rdlock(&tech_subscriptions_lock);
+       for (i = 0; i < AST_VECTOR_SIZE(&tech_subscriptions); i++) {
+               sub = AST_VECTOR_GET(&tech_subscriptions, i);
+
+               if (!sub) {
+                       continue;
+               }
+
+               if (!strncasecmp(sub->token, buf, strlen(sub->token))) {
+                       ast_rwlock_unlock(&tech_subscriptions_lock);
+                       ao2_bump(sub);
+                       endpoint_name = buf;
+                       goto match;
+               }
+       }
+       ast_rwlock_unlock(&tech_subscriptions_lock);
+
+       sub = ao2_find(endpoint_subscriptions, buf, OBJ_SEARCH_KEY);
+       if (sub) {
+               endpoint_name = buf;
+               goto match;
+       }
+
+       return -1;
+
+match:
+       ast_debug(3, "Dispatching message for %s\n", endpoint_name);
+
+       json_msg = msg_to_json(msg);
+       if (!json_msg) {
+               ao2_ref(sub, -1);
+               return -1;
+       }
+
+       for (i = 0; i < AST_VECTOR_SIZE(&sub->applications); i++) {
+               struct application_tuple *tuple = AST_VECTOR_GET(&sub->applications, i);
+
+               tuple->callback(endpoint_name, json_msg, tuple->pvt);
+       }
+
+       ast_json_unref(json_msg);
+       ao2_ref(sub, -1);
+       return 0;
+}
+
+struct ast_msg_handler ari_msg_handler = {
+       .name = "ari",
+       .handle_msg = handle_msg_cb,
+       .has_destination = has_destination_cb,
+};
+
+static int messaging_subscription_cmp(struct message_subscription *sub, const char *key)
+{
+       return !strcmp(sub->token, key) ? 1 : 0;
+}
+
+static int application_tuple_cmp(struct application_tuple *item, const char *key)
+{
+       return !strcmp(item->app_name, key) ? 1 : 0;
+}
+
+static int is_app_subscribed(struct message_subscription *sub, const char *app_name)
+{
+       int i;
+
+       for (i = 0; i < AST_VECTOR_SIZE(&sub->applications); i++) {
+               struct application_tuple *tuple;
+
+               tuple = AST_VECTOR_GET(&sub->applications, i);
+               if (tuple && !strcmp(tuple->app_name, app_name)) {
+                       return 1;
+               }
+       }
+
+       return 0;
+}
+
+static struct message_subscription *get_subscription(struct ast_endpoint *endpoint)
+{
+       struct message_subscription *sub = NULL;
+
+       if (!ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
+               sub = ao2_find(endpoint_subscriptions, endpoint, OBJ_SEARCH_KEY);
+       } else {
+               int i;
+
+               ast_rwlock_rdlock(&tech_subscriptions_lock);
+               for (i = 0; i < AST_VECTOR_SIZE(&tech_subscriptions); i++) {
+                       sub = AST_VECTOR_GET(&tech_subscriptions, i);
+
+                       if (sub && !strcmp(sub->token, ast_endpoint_get_tech(endpoint))) {
+                               ao2_bump(sub);
+                               break;
+                       }
+               }
+               ast_rwlock_unlock(&tech_subscriptions_lock);
+       }
+
+       return sub;
+}
+
+void messaging_app_unsubscribe_endpoint(const char *app_name, const char *endpoint_id)
+{
+       RAII_VAR(struct message_subscription *, sub, NULL, ao2_cleanup);
+       RAII_VAR(struct ast_endpoint *, endpoint, NULL, ao2_cleanup);
+
+       endpoint = ast_endpoint_find_by_id(endpoint_id);
+       if (!endpoint) {
+               return;
+       }
+
+       sub = get_subscription(endpoint);
+       if (!sub) {
+               return;
+       }
+
+       ao2_lock(sub);
+       if (!is_app_subscribed(sub, app_name)) {
+               ao2_unlock(sub);
+               return;
+       }
+
+       AST_VECTOR_REMOVE_CMP_UNORDERED(&sub->applications, app_name, application_tuple_cmp, ao2_cleanup);
+       if (AST_VECTOR_SIZE(&sub->applications) == 0) {
+               if (!ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
+                       ao2_unlink(endpoint_subscriptions, sub);
+               } else {
+                       ast_rwlock_wrlock(&tech_subscriptions_lock);
+                       AST_VECTOR_REMOVE_CMP_UNORDERED(&tech_subscriptions, ast_endpoint_get_id(endpoint),
+                               messaging_subscription_cmp, AST_VECTOR_ELEM_CLEANUP_NOOP);
+                       ast_rwlock_unlock(&tech_subscriptions_lock);
+               }
+       }
+       ao2_unlock(sub);
+       ao2_ref(sub, -1);
+
+       ast_debug(3, "App '%s' unsubscribed to messages from endpoint '%s'\n", app_name, ast_endpoint_get_id(endpoint));
+       ast_test_suite_event_notify("StasisMessagingSubscription", "SubState: Unsubscribed\r\nAppName: %s\r\nToken: %s\r\n",
+               app_name, ast_endpoint_get_id(endpoint));
+}
+
+static struct message_subscription *get_or_create_subscription(struct ast_endpoint *endpoint)
+{
+       struct message_subscription *sub = get_subscription(endpoint);
+
+       if (sub) {
+               return sub;
+       }
+
+       sub = message_subscription_alloc(ast_endpoint_get_id(endpoint));
+       if (!sub) {
+               return NULL;
+       }
+
+       if (!ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
+               ao2_link(endpoint_subscriptions, sub);
+       } else {
+               ast_rwlock_wrlock(&tech_subscriptions_lock);
+               AST_VECTOR_APPEND(&tech_subscriptions, ao2_bump(sub));
+               ast_rwlock_unlock(&tech_subscriptions_lock);
+       }
+
+       return sub;
+}
+
+int messaging_app_subscribe_endpoint(const char *app_name, struct ast_endpoint *endpoint, message_received_cb callback, void *pvt)
+{
+       RAII_VAR(struct message_subscription *, sub, NULL, ao2_cleanup);
+       struct application_tuple *tuple;
+
+       sub = get_or_create_subscription(endpoint);
+       if (!sub) {
+               return -1;
+       }
+
+       ao2_lock(sub);
+       if (is_app_subscribed(sub, app_name)) {
+               ao2_unlock(sub);
+               return 0;
+       }
+
+       tuple = application_tuple_alloc(app_name, callback, pvt);
+       if (!tuple) {
+               ao2_unlock(sub);
+               return -1;
+       }
+       AST_VECTOR_APPEND(&sub->applications, tuple);
+       ao2_unlock(sub);
+
+       ast_debug(3, "App '%s' subscribed to messages from endpoint '%s'\n", app_name, ast_endpoint_get_id(endpoint));
+       ast_test_suite_event_notify("StasisMessagingSubscription", "SubState: Subscribed\r\nAppName: %s\r\nToken: %s\r\n",
+               app_name, ast_endpoint_get_id(endpoint));
+
+       return 0;
+}
+
+
+int messaging_cleanup(void)
+{
+       ast_msg_handler_unregister(&ari_msg_handler);
+       ao2_ref(endpoint_subscriptions, -1);
+       AST_VECTOR_FREE(&tech_subscriptions);
+       ast_rwlock_destroy(&tech_subscriptions_lock);\
+
+       return 0;
+}
+
+int messaging_init(void)
+{
+       endpoint_subscriptions = ao2_t_container_alloc_hash(AO2_ALLOC_OPT_LOCK_RWLOCK, 0,
+               ENDPOINTS_NUM_BUCKETS, message_subscription_hash_cb, NULL,
+               message_subscription_compare_cb, "Endpoint messaging subscription container creation");
+       if (!endpoint_subscriptions) {
+               return -1;
+       }
+
+       if (AST_VECTOR_INIT(&tech_subscriptions, 4)) {
+               ao2_ref(endpoint_subscriptions, -1);
+               return -1;
+       }
+
+       if (ast_rwlock_init(&tech_subscriptions_lock)) {
+               ao2_ref(endpoint_subscriptions, -1);
+               AST_VECTOR_FREE(&tech_subscriptions);
+               return -1;
+       }
+
+       if (ast_msg_handler_register(&ari_msg_handler)) {
+               ao2_ref(endpoint_subscriptions, -1);
+               AST_VECTOR_FREE(&tech_subscriptions);
+               ast_rwlock_destroy(&tech_subscriptions_lock);
+               return -1;
+       }
+
+       return 0;
+}
diff --git a/res/stasis/messaging.h b/res/stasis/messaging.h
new file mode 100644 (file)
index 0000000..c69d5d5
--- /dev/null
@@ -0,0 +1,83 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2014, Digium, Inc.
+ *
+ * Matt Jordan <mjordan@digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+#ifndef _ASTERISK_RES_STASIS_MESSAGING_H
+#define _ASTERISK_RES_STASIS_MESSAGING_H
+
+/*!
+ * \file
+ *
+ * \brief Stasis out-of-call text message support
+ *
+ * \author Matt Jordan <mjordan@digium.com>
+ * \since 12.4.0
+ */
+
+/*!
+ * \brief Callback handler for when a message is received from the core
+ *
+ * \param endpoint_id The ID of the endpoint that we received the message from
+ * \param json_msg JSON representation of the text message
+ * \param pvt ao2 ref counted pvt passed during registration
+ *
+ * \retval 0 the message was handled
+ * \retval non-zero the message was not handled
+ */
+typedef int (* message_received_cb)(const char *endpoint_id, struct ast_json *json_msg, void *pvt);
+
+/*!
+ * \brief Subscribe for messages from a particular endpoint
+ *
+ * \param app_name Name of the stasis application to unsubscribe from messaging
+ * \param endpoint_id The ID of the endpoint we no longer care about
+ *
+ * \retval 0 success
+ * \retval -1 error
+ */
+void messaging_app_unsubscribe_endpoint(const char *app_name, const char *endpoint_id);
+
+/*!
+ * \brief Subscribe an application to an endpoint for messages
+ *
+ * \param app_name The name of the \ref stasis application to subscribe to \c endpoint
+ * \param endpoint The endpoint object to subscribe to
+ * \param message_received_cb The callback to call when a message is received
+ * \param pvt An ao2 ref counted object that will be passed to the callback.
+ *
+ * \retval 0 subscription was successful
+ * \retval -1 subscription failed
+ */
+int messaging_app_subscribe_endpoint(const char *app_name, struct ast_endpoint *endpoint, message_received_cb callback, void *pvt);
+
+/*!
+ * \brief Tidy up the messaging layer
+ *
+ * \retval 0 success
+ * \retval -1 failure
+ */
+int messaging_cleanup(void);
+
+/*!
+ * \brief Initialize the messaging layer
+ *
+ * \retval 0 success
+ * \retval -1 failure
+ */
+int messaging_init(void);
+
+#endif /* #define _ASTERISK_RES_STASIS_MESSAGING_H  */
\ No newline at end of file
index 12b9f2e..31da643 100644 (file)
                        ]
                },
                {
+                       "path": "/endpoints/sendMessage",
+                       "description": "Send a message to some technology URI or endpoint.",
+                       "operations": [
+                               {
+                                       "httpMethod": "PUT",
+                                       "summary": "Send a message to some technology URI or endpoint.",
+                                       "nickname": "sendMessage",
+                                       "responseClass": "void",
+                                       "parameters": [
+                                               {
+                                                       "name": "to",
+                                                       "description": "The endpoint resource or technology specific URI to send the message to. Valid resources are sip, pjsip, and xmpp.",
+                                                       "paramType": "query",
+                                                       "required": true,
+                                                       "allowMultiple": false,
+                                                       "dataType": "string"
+                                               },
+                                               {
+                                                       "name": "from",
+                                                       "description": "The endpoint resource or technology specific identity to send this message from. Valid resources are sip, pjsip, and xmpp.",
+                                                       "paramType": "query",
+                                                       "required": true,
+                                                       "allowMultiple": false,
+                                                       "dataType": "string"
+                                               },
+                                               {
+                                                       "name": "body",
+                                                       "description": "The body of the message",
+                                                       "paramType": "query",
+                                                       "required": false,
+                                                       "allowMultiple": false,
+                                                       "dataType": "string"
+                                               },
+                                               {
+                                                       "name": "variables",
+                                                       "descriptioni": "The \"variables\" key in the body object holds technology specific key/value pairs to append to the message. These can be interpreted and used by the various resource types; for example, pjsip and sip resource types will add the key/value pairs as SIP headers,",
+                                                       "paramType": "body",
+                                                       "required": false,
+                                                       "dataType": "containers",
+                                                       "allowMultiple": false
+                                               }
+                                       ],
+                                       "errorResponses": [
+                                               {
+                                                       "code": 404,
+                                                       "reason": "Endpoint not found"
+                                               }
+                                       ]
+                               }
+                       ]
+               },
+               {
                        "path": "/endpoints/{tech}",
                        "description": "Asterisk endpoints",
                        "operations": [
                                        ],
                                        "errorResponses": [
                                                {
+                                                       "code": 400,
+                                                       "reason": "Invalid parameters for sending a message."
+                                               },
+                                               {
                                                        "code": 404,
                                                        "reason": "Endpoints not found"
                                                }
                                        ]
                                }
                        ]
+               },
+               {
+                       "path": "/endpoints/{tech}/{resource}/sendMessage",
+                       "description": "Send a message to some endpoint in a technology.",
+                       "operations": [
+                               {
+                                       "httpMethod": "PUT",
+                                       "summary": "Send a message to some endpoint in a technology.",
+                                       "nickname": "sendMessageToEndpoint",
+                                       "responseClass": "void",
+                                       "parameters": [
+                                               {
+                                                       "name": "tech",
+                                                       "description": "Technology of the endpoint",
+                                                       "paramType": "path",
+                                                       "dataType": "string"
+                                               },
+                                               {
+                                                       "name": "resource",
+                                                       "description": "ID of the endpoint",
+                                                       "paramType": "path",
+                                                       "dataType": "string"
+                                               },
+                                               {
+                                                       "name": "from",
+                                                       "description": "The endpoint resource or technology specific identity to send this message from. Valid resources are sip, pjsip, and xmpp.",
+                                                       "paramType": "query",
+                                                       "required": true,
+                                                       "allowMultiple": false,
+                                                       "dataType": "string"
+                                               },
+                                               {
+                                                       "name": "body",
+                                                       "description": "The body of the message",
+                                                       "paramType": "query",
+                                                       "required": false,
+                                                       "allowMultiple": false,
+                                                       "dataType": "string"
+                                               },
+                                               {
+                                                       "name": "variables",
+                                                       "descriptioni": "The \"variables\" key in the body object holds technology specific key/value pairs to append to the message. These can be interpreted and used by the various resource types; for example, pjsip and sip resource types will add the key/value pairs as SIP headers,",
+                                                       "paramType": "body",
+                                                       "required": false,
+                                                       "dataType": "containers",
+                                                       "allowMultiple": false
+                                               }
+                                       ],
+                                       "errorResponses": [
+                                               {
+                                                       "code": 400,
+                                                       "reason": "Invalid parameters for sending a message."
+                                               },
+                                               {
+                                                       "code": 404,
+                                                       "reason": "Endpoint not found"
+                                               }
+                                       ]
+                               }
+                       ]
                }
        ],
        "models": {
                                        "required": true
                                }
                        }
+               },
+               "TextMessageVariable": {
+                       "id": "TextMessageVariable",
+                       "description": "A key/value pair variable in a text message.",
+                       "properties": {
+                               "key": {
+                                       "type": "string",
+                                       "description": "A unique key identifying the variable.",
+                                       "required": true
+                               },
+                               "value": {
+                                       "type": "string",
+                                       "description": "The value of the variable.",
+                                       "required": true
+                               }
+                       }
+               },
+               "TextMessage": {
+                       "id": "TextMessage",
+                       "description": "A text message.",
+                       "properties": {
+                               "from": {
+                                       "type": "string",
+                                       "description": "A technology specific URI specifying the source of the message. For sip and pjsip technologies, any SIP URI can be specified. For xmpp, the URI must correspond to the client connection being used to send the message.",
+                                       "required": true
+                               },
+                               "to": {
+                                       "type": "string",
+                                       "description": "A technology specific URI specifying the destination of the message. Valid technologies include sip, pjsip, and xmp. The destination of a message should be an endpoint.",
+                                       "required": true
+                               },
+                               "body": {
+                                       "type": "string",
+                                       "description": "The text of the message.",
+                                       "required": true
+                               },
+                               "variables": {
+                                       "type": "List[TextMessageVariable]",
+                                       "description": "Technology specific key/value pairs associated with the message.",
+                                       "required": false
+                               }
+                       }
                }
        }
 }
index cf93f4b..5e115fb 100644 (file)
                                "EndpointStateChange",
                                "Dial",
                                "StasisEnd",
-                               "StasisStart"
+                               "StasisStart",
+                               "TextMessageReceived"
                        ]
                },
                "DeviceStateChanged": {
                                        "type": "Channel"
                                }
                        }
+               },
+               "TextMessageReceived": {
+                       "id": "TextMessageReceived",
+                       "description": "A text message was received from an endpoint.",
+                       "properties": {
+                               "message": {
+                                       "required": true,
+                                       "type": "TextMessage"
+                               },
+                               "endpoint": {
+                                       "required": false,
+                                       "type": "Endpoint"
+                               }
+                       }
                }
        }
 }
diff --git a/tests/test_message.c b/tests/test_message.c
new file mode 100644 (file)
index 0000000..13e24a8
--- /dev/null
@@ -0,0 +1,888 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2014, Digium, Inc.
+ *
+ * Matt Jordan <mjordan@digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*! \file
+ *
+ * \brief Test module for out-of-call text message module
+ *
+ * \author \verbatim Matt Jordan <mjordan@digium.com> \endverbatim
+ *
+ * \ingroup tests
+ */
+
+/*** MODULEINFO
+       <depend>TEST_FRAMEWORK</depend>
+       <support_level>core</support_level>
+ ***/
+
+#include "asterisk.h"
+
+ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
+
+#include <regex.h>
+
+#include "asterisk/module.h"
+#include "asterisk/test.h"
+#include "asterisk/message.h"
+#include "asterisk/pbx.h"
+#include "asterisk/manager.h"
+#include "asterisk/vector.h"
+
+#define TEST_CATEGORY "/main/message/"
+
+#define TEST_CONTEXT "__TEST_MESSAGE_CONTEXT__"
+#define TEST_EXTENSION "test_message_extension"
+
+/*! \brief The number of user events we should get in a dialplan test */
+#define DEFAULT_EXPECTED_EVENTS 4
+
+static struct ast_context *test_message_context;
+
+/*! \brief The current number of received user events */
+static int received_user_events;
+
+/*! \brief The number of user events we expect for this test */
+static int expected_user_events;
+
+/*! \brief Predicate for the \ref test_message_handler receiving a message */
+static int handler_received_message;
+
+/*! \brief Condition wait variable for all dialplan user events being received */
+static ast_cond_t user_event_cond;
+
+/*! \brief Mutex for \c user_event_cond */
+AST_MUTEX_DEFINE_STATIC(user_event_lock);
+
+/*! \brief Condition wait variable for \ref test_msg_handler receiving message */
+static ast_cond_t handler_cond;
+
+/*! \brief Mutex for \c handler_cond */
+AST_MUTEX_DEFINE_STATIC(handler_lock);
+
+/*! \brief The expected user event fields */
+AST_VECTOR(var_vector, struct ast_variable *) expected_user_event_fields;
+
+/*! \brief If a user event fails, the bad headers that didn't match */
+AST_VECTOR(, struct ast_variable *) bad_headers;
+
+static int test_msg_send(const struct ast_msg *msg, const char *to, const char *from);
+
+static struct ast_msg_tech test_msg_tech = {
+       .name = "testmsg",
+       .msg_send = test_msg_send,
+};
+
+static int test_msg_handle_msg_cb(struct ast_msg *msg);
+static int test_msg_has_destination_cb(const struct ast_msg *msg);
+
+/*! \brief Our test message handler */
+static struct ast_msg_handler test_msg_handler = {
+       .name = "testmsg",
+       .handle_msg = test_msg_handle_msg_cb,
+       .has_destination = test_msg_has_destination_cb,
+};
+
+static int user_event_hook_cb(int category, const char *event, char *body);
+
+/*! \brief AMI event hook that verifies whether or not we've gotten our user events */
+static struct manager_custom_hook user_event_hook = {
+       .file = AST_MODULE,
+       .helper = user_event_hook_cb,
+};
+
+/*!
+ * \brief Verifies a user event header/value pair
+ *
+ * \param user_event which user event to check
+ * \param header The header to verify
+ * \param value The value read from the event
+ *
+ * \retval -1 on error or evaluation failure
+ * \retval 0 if match not needed or success
+ */
+static int verify_user_event_fields(int user_event, const char *header, const char *value)
+{
+       struct ast_variable *current;
+       struct ast_variable *expected;
+       regex_t regexbuf;
+       int error;
+
+       if (user_event >= AST_VECTOR_SIZE(&expected_user_event_fields)) {
+               return -1;
+       }
+
+       expected = AST_VECTOR_GET(&expected_user_event_fields, user_event);
+       if (!expected) {
+               return -1;
+       }
+
+       for (current = expected; current; current = current->next) {
+               struct ast_variable *bad_header;
+
+               if (strcmp(current->name, header)) {
+                       continue;
+               }
+
+               error = regcomp(&regexbuf, current->value, REG_EXTENDED | REG_NOSUB);
+               if (error) {
+                       char error_buf[128];
+                       regerror(error, &regexbuf, error_buf, sizeof(error_buf));
+                       ast_log(LOG_ERROR, "Failed to compile regex '%s' for header check '%s': %s\n",
+                               current->value, current->name, error_buf);
+                       return -1;
+               }
+
+               if (!regexec(&regexbuf, value, 0, NULL, 0)) {
+                       regfree(&regexbuf);
+                       return 0;
+               }
+
+               bad_header = ast_variable_new(header, value, __FILE__);
+               if (bad_header) {
+                       struct ast_variable *bad_headers_head = NULL;
+
+                       if (user_event < AST_VECTOR_SIZE(&bad_headers)) {
+                               bad_headers_head = AST_VECTOR_GET(&bad_headers, user_event);
+                       }
+                       ast_variable_list_append(&bad_headers_head, bad_header);
+                       AST_VECTOR_INSERT(&bad_headers, user_event, bad_headers_head);
+               }
+               regfree(&regexbuf);
+               return -1;
+       }
+
+       return 0;
+}
+
+static int message_received;
+
+static int test_msg_send(const struct ast_msg *msg, const char *to, const char *from)
+{
+       message_received = 1;
+
+       return 0;
+}
+
+static int test_msg_handle_msg_cb(struct ast_msg *msg)
+{
+       ast_mutex_lock(&handler_lock);
+       handler_received_message = 1;
+       ast_cond_signal(&handler_cond);
+       ast_mutex_unlock(&handler_lock);
+
+       return 0;
+}
+
+static int test_msg_has_destination_cb(const struct ast_msg *msg)
+{
+       /* We only care about one destination: foo! */
+       if (ast_strlen_zero(ast_msg_get_to(msg))) {
+               return 0;
+       }
+       return (!strcmp(ast_msg_get_to(msg), "foo") ? 1 : 0);
+}
+
+static int user_event_hook_cb(int category, const char *event, char *body)
+{
+       char *parse;
+       char *kvp;
+
+       if (strcmp(event, "UserEvent")) {
+               return -1;
+       }
+
+       parse = ast_strdupa(body);
+       while ((kvp = strsep(&parse, "\r\n"))) {
+               char *key, *value;
+
+               kvp = ast_trim_blanks(kvp);
+               if (ast_strlen_zero(kvp)) {
+                       continue;
+               }
+               key = strsep(&kvp, ":");
+               value = ast_skip_blanks(kvp);
+               verify_user_event_fields(received_user_events, key, value);
+       }
+
+       received_user_events++;
+
+       ast_mutex_lock(&user_event_lock);
+       if (received_user_events == expected_user_events) {
+               ast_cond_signal(&user_event_cond);
+       }
+       ast_mutex_unlock(&user_event_lock);
+
+       return 0;
+}
+
+/*! \brief Wait for the \ref test_msg_handler to receive the message */
+static int handler_wait_for_message(struct ast_test *test)
+{
+       int error = 0;
+       struct timeval wait_now = ast_tvnow();
+       struct timespec wait_time = { .tv_sec = wait_now.tv_sec + 1, .tv_nsec = wait_now.tv_usec * 1000 };
+
+       ast_mutex_lock(&handler_lock);
+       while (!handler_received_message) {
+               error = ast_cond_timedwait(&handler_cond, &handler_lock, &wait_time);
+               if (error == ETIMEDOUT) {
+                       ast_test_status_update(test, "Test timed out while waiting for handler to get message\n");
+                       ast_test_set_result(test, AST_TEST_FAIL);
+                       break;
+               }
+       }
+       ast_mutex_unlock(&handler_lock);
+
+       return (error != ETIMEDOUT);
+}
+
+/*! \brief Wait for the expected number of user events to be received */
+static int user_event_wait_for_events(struct ast_test *test, int expected_events)
+{
+       int error;
+       struct timeval wait_now = ast_tvnow();
+       struct timespec wait_time = { .tv_sec = wait_now.tv_sec + 1, .tv_nsec = wait_now.tv_usec * 1000 };
+
+       expected_user_events = expected_events;
+
+       ast_mutex_lock(&user_event_lock);
+       while (received_user_events != expected_user_events) {
+               error = ast_cond_timedwait(&user_event_cond, &user_event_lock, &wait_time);
+               if (error == ETIMEDOUT) {
+                       ast_test_status_update(test, "Test timed out while waiting for %d expected user events\n", expected_events);
+                       ast_test_set_result(test, AST_TEST_FAIL);
+                       break;
+               }
+       }
+       ast_mutex_unlock(&user_event_lock);
+
+       ast_test_status_update(test, "Received %d of %d user events\n", received_user_events, expected_events);
+       return !(received_user_events == expected_events);
+}
+
+static int verify_bad_headers(struct ast_test *test)
+{
+       int res = 0;
+       int i;
+
+       for (i = 0; i < AST_VECTOR_SIZE(&bad_headers); i++) {
+               struct ast_variable *headers;
+               struct ast_variable *current;
+
+               headers = AST_VECTOR_GET(&bad_headers, i);
+               if (!headers) {
+                       continue;
+               }
+
+               res = -1;
+               for (current = headers; current; current = current->next) {
+                       ast_test_status_update(test, "Expected UserEvent %d: Failed to match %s: %s\n",
+                               i, current->name, current->value);
+                       ast_test_set_result(test, AST_TEST_FAIL);
+               }
+       }
+
+       return res;
+}
+
+AST_TEST_DEFINE(test_message_msg_tech_registration)
+{
+       int reg_result;
+
+       switch (cmd) {
+       case TEST_INIT:
+               info->name = __func__;
+               info->category = TEST_CATEGORY;
+               info->summary = "Test register/unregister of a message tech";
+               info->description =
+                       "Test that:\n"
+                       "\tA message technology can be registered once only\n"
+                       "\tA registered message technology can be unregistered once only\n";
+               return AST_TEST_NOT_RUN;
+       case TEST_EXECUTE:
+               break;
+       }
+
+       reg_result = ast_msg_tech_register(&test_msg_tech);
+       ast_test_validate(test, reg_result == 0);
+
+       reg_result = ast_msg_tech_register(&test_msg_tech);
+       ast_test_validate(test, reg_result == -1);
+
+       reg_result = ast_msg_tech_unregister(&test_msg_tech);
+       ast_test_validate(test, reg_result == 0);
+
+       reg_result = ast_msg_tech_unregister(&test_msg_tech);
+       ast_test_validate(test, reg_result == -1);
+
+       return AST_TEST_PASS;
+}
+
+AST_TEST_DEFINE(test_message_msg_handler_registration)
+{
+       int reg_result;
+
+       switch (cmd) {
+       case TEST_INIT:
+               info->name = __func__;
+               info->category = TEST_CATEGORY;
+               info->summary = "Test register/unregister of a message handler";
+               info->description =
+                       "Test that:\n"
+                       "\tA message handler can be registered once only\n"
+                       "\tA registered message handler can be unregistered once only\n";
+               return AST_TEST_NOT_RUN;
+       case TEST_EXECUTE:
+               break;
+       }
+
+       reg_result = ast_msg_handler_register(&test_msg_handler);
+       ast_test_validate(test, reg_result == 0);
+
+       reg_result = ast_msg_handler_register(&test_msg_handler);
+       ast_test_validate(test, reg_result == -1);
+
+       reg_result = ast_msg_handler_unregister(&test_msg_handler);
+       ast_test_validate(test, reg_result == 0);
+
+       reg_result = ast_msg_handler_unregister(&test_msg_handler);
+       ast_test_validate(test, reg_result == -1);
+
+       return AST_TEST_PASS;
+}
+
+static void ast_msg_safe_destroy(void *obj)
+{
+       struct ast_msg *msg = obj;
+
+       if (msg) {
+               ast_msg_destroy(msg);
+       }
+}
+
+AST_TEST_DEFINE(test_message_manipulation)
+{
+       RAII_VAR(struct ast_msg *, msg, NULL, ast_msg_safe_destroy);
+       RAII_VAR(struct ast_msg_var_iterator *, it_vars, NULL, ast_msg_var_iterator_destroy);
+       int result;
+       const char *actual;
+       const char *out_name;
+       const char *out_value;
+
+       switch (cmd) {
+       case TEST_INIT:
+               info->name = __func__;
+               info->category = TEST_CATEGORY;
+               info->summary = "Test manipulating properties of a message";
+               info->description =
+                       "This test covers the following:\n"
+                       "\tSetting/getting the body\n"
+                       "\tSetting/getting inbound/outbound variables\n"
+                       "\tIterating over variables\n";
+               return AST_TEST_NOT_RUN;
+       case TEST_EXECUTE:
+               break;
+       }
+
+       msg = ast_msg_alloc();
+       ast_test_validate(test, msg != NULL);
+
+       /* Test setting/getting to */
+       result = ast_msg_set_to(msg, "testmsg:%s", "foo");
+       ast_test_validate(test, result == 0);
+       actual = ast_msg_get_to(msg);
+       ast_test_validate(test, !strcmp(actual, "testmsg:foo"));
+
+       /* Test setting/getting from */
+       result = ast_msg_set_from(msg, "testmsg:%s", "bar");
+       ast_test_validate(test, result == 0);
+       actual = ast_msg_get_from(msg);
+       ast_test_validate(test, !strcmp(actual, "testmsg:bar"));
+
+       /* Test setting/getting body */
+       result = ast_msg_set_body(msg, "BodyTest: %s", "foo");
+       ast_test_validate(test, result == 0);
+       actual = ast_msg_get_body(msg);
+       ast_test_validate(test, !strcmp(actual, "BodyTest: foo"));
+
+       /* Test setting/getting technology */
+       result = ast_msg_set_tech(msg, "%s", "my_tech");
+       ast_test_validate(test, result == 0);
+       actual = ast_msg_get_tech(msg);
+       ast_test_validate(test, !strcmp(actual, "my_tech"));
+
+       /* Test setting/getting endpoint */
+       result = ast_msg_set_endpoint(msg, "%s", "terminus");
+       ast_test_validate(test, result == 0);
+       actual = ast_msg_get_endpoint(msg);
+       ast_test_validate(test, !strcmp(actual, "terminus"));
+
+       /* Test setting/getting non-outbound variable */
+       result = ast_msg_set_var(msg, "foo", "bar");
+       ast_test_validate(test, result == 0);
+       actual = ast_msg_get_var(msg, "foo");
+       ast_test_validate(test, !strcmp(actual, "bar"));
+
+       /* Test updating existing variable */
+       result = ast_msg_set_var(msg, "foo", "new_bar");
+       ast_test_validate(test, result == 0);
+       actual = ast_msg_get_var(msg, "foo");
+       ast_test_validate(test, !strcmp(actual, "new_bar"));
+
+       /* Verify a non-outbound variable is not iterable */
+       it_vars = ast_msg_var_iterator_init(msg);
+       ast_test_validate(test, it_vars != NULL);
+       ast_test_validate(test, ast_msg_var_iterator_next(msg, it_vars, &out_name, &out_value) == 0);
+       ast_msg_var_iterator_destroy(it_vars);
+
+       /* Test updating an existing variable as an outbound variable */
+       result = ast_msg_set_var_outbound(msg, "foo", "outbound_bar");
+       ast_test_validate(test, result == 0);
+       it_vars = ast_msg_var_iterator_init(msg);
+       ast_test_validate(test, it_vars != NULL);
+       result = ast_msg_var_iterator_next(msg, it_vars, &out_name, &out_value);
+       ast_test_validate(test, result == 1);
+       ast_test_validate(test, !strcmp(out_name, "foo"));
+       ast_test_validate(test, !strcmp(out_value, "outbound_bar"));
+       ast_msg_var_unref_current(it_vars);
+       result = ast_msg_var_iterator_next(msg, it_vars, &out_name, &out_value);
+       ast_test_validate(test, result == 0);
+
+       return AST_TEST_PASS;
+}
+
+AST_TEST_DEFINE(test_message_queue_dialplan_nominal)
+{
+       RAII_VAR(struct ast_msg *, msg, NULL, ast_msg_safe_destroy);
+       struct ast_variable *expected;
+       struct ast_variable *expected_response = NULL;
+
+       switch (cmd) {
+       case TEST_INIT:
+               info->name = __func__;
+               info->category = TEST_CATEGORY;
+               info->summary = "Test enqueueing messages to the dialplan";
+               info->description =
+                       "Test that a message enqueued for the dialplan is\n"
+                       "passed to that particular extension\n";
+               return AST_TEST_NOT_RUN;
+       case TEST_EXECUTE:
+               break;
+       }
+
+       msg = ast_msg_alloc();
+       ast_test_validate(test, msg != NULL);
+
+       expected = ast_variable_new("Verify","^To$", __FILE__);
+       ast_variable_list_append(&expected_response, expected);
+       expected = ast_variable_new("Value","^foo$", __FILE__);
+       ast_variable_list_append(&expected_response, expected);
+       AST_VECTOR_INSERT(&expected_user_event_fields, 0, expected_response);
+
+       expected_response = NULL;
+       expected = ast_variable_new("Verify", "^From$", __FILE__);
+       ast_variable_list_append(&expected_response, expected);
+       expected = ast_variable_new("Value","^bar$", __FILE__);
+       ast_variable_list_append(&expected_response, expected);
+       AST_VECTOR_INSERT(&expected_user_event_fields, 1, expected_response);
+
+       expected_response = NULL;
+       expected = ast_variable_new("Verify", "^Body$", __FILE__);
+       ast_variable_list_append(&expected_response, expected);
+       expected = ast_variable_new("Value", "^a body$", __FILE__);
+       ast_variable_list_append(&expected_response, expected);
+       AST_VECTOR_INSERT(&expected_user_event_fields, 2, expected_response);
+
+       expected_response = NULL;
+       expected = ast_variable_new("Verify", "^Custom$", __FILE__);
+       ast_variable_list_append(&expected_response, expected);
+       expected = ast_variable_new("Value", "^field$", __FILE__);
+       ast_variable_list_append(&expected_response, expected);
+       AST_VECTOR_INSERT(&expected_user_event_fields, 3, expected_response);
+
+       ast_msg_set_to(msg, "foo");
+       ast_msg_set_from(msg, "bar");
+       ast_msg_set_body(msg, "a body");
+       ast_msg_set_var_outbound(msg, "custom_data", "field");
+
+       ast_msg_set_context(msg, TEST_CONTEXT);
+       ast_msg_set_exten(msg, TEST_EXTENSION);
+
+       ast_msg_queue(msg);
+       msg = NULL;
+
+       if (user_event_wait_for_events(test, DEFAULT_EXPECTED_EVENTS)) {
+               ast_test_status_update(test, "Failed to received %d expected user events\n", DEFAULT_EXPECTED_EVENTS);
+               return AST_TEST_FAIL;
+       }
+
+       if (verify_bad_headers(test)) {
+               return AST_TEST_FAIL;
+       }
+
+       return AST_TEST_PASS;
+}
+
+AST_TEST_DEFINE(test_message_queue_handler_nominal)
+{
+       RAII_VAR(struct ast_msg *, msg, NULL, ast_msg_safe_destroy);
+       int result;
+
+       switch (cmd) {
+       case TEST_INIT:
+               info->name = __func__;
+               info->category = TEST_CATEGORY;
+               info->summary = "Test enqueueing messages to a handler";
+               info->description =
+                       "Test that a message enqueued can be handled by a\n"
+                       "non-dialplan handler\n";
+               return AST_TEST_NOT_RUN;
+       case TEST_EXECUTE:
+               break;
+       }
+
+       msg = ast_msg_alloc();
+       ast_test_validate(test, msg != NULL);
+
+       result = ast_msg_handler_register(&test_msg_handler);
+       ast_test_validate(test, result == 0);
+
+       ast_msg_set_to(msg, "foo");
+       ast_msg_set_from(msg, "bar");
+       ast_msg_set_body(msg, "a body");
+
+       ast_msg_queue(msg);
+       msg = NULL;
+
+       /* This will automatically fail the test if we don't get the message */
+       handler_wait_for_message(test);
+
+       result = ast_msg_handler_unregister(&test_msg_handler);
+       ast_test_validate(test, result == 0);
+
+       return AST_TEST_PASS;
+}
+
+AST_TEST_DEFINE(test_message_queue_both_nominal)
+{
+       RAII_VAR(struct ast_msg *, msg, NULL, ast_msg_safe_destroy);
+       struct ast_variable *expected;
+       struct ast_variable *expected_response = NULL;
+       int result;
+
+       switch (cmd) {
+       case TEST_INIT:
+               info->name = __func__;
+               info->category = TEST_CATEGORY;
+               info->summary = "Test enqueueing messages to a dialplan and custom handler";
+               info->description =
+                       "Test that a message enqueued is passed to all\n"
+                       "handlers that can process it, dialplan as well as\n"
+                       "a custom handler\n";
+               return AST_TEST_NOT_RUN;
+       case TEST_EXECUTE:
+               break;
+       }
+
+       msg = ast_msg_alloc();
+       ast_test_validate(test, msg != NULL);
+
+       result = ast_msg_handler_register(&test_msg_handler);
+       ast_test_validate(test, result == 0);
+
+       expected = ast_variable_new("Verify","^To$", __FILE__);
+       ast_variable_list_append(&expected_response, expected);
+       expected = ast_variable_new("Value","^foo$", __FILE__);
+       ast_variable_list_append(&expected_response, expected);
+       AST_VECTOR_INSERT(&expected_user_event_fields, 0, expected_response);
+
+       expected_response = NULL;
+       expected = ast_variable_new("Verify", "^From$", __FILE__);
+       ast_variable_list_append(&expected_response, expected);
+       expected = ast_variable_new("Value","^bar$", __FILE__);
+       ast_variable_list_append(&expected_response, expected);
+       AST_VECTOR_INSERT(&expected_user_event_fields, 1, expected_response);
+
+       expected_response = NULL;
+       expected = ast_variable_new("Verify", "^Body$", __FILE__);
+       ast_variable_list_append(&expected_response, expected);
+       expected = ast_variable_new("Value", "^a body$", __FILE__);
+       ast_variable_list_append(&expected_response, expected);
+       AST_VECTOR_INSERT(&expected_user_event_fields, 2, expected_response);
+
+       ast_msg_set_to(msg, "foo");
+       ast_msg_set_from(msg, "bar");
+       ast_msg_set_body(msg, "a body");
+
+       ast_msg_set_context(msg, TEST_CONTEXT);
+       ast_msg_set_exten(msg, TEST_EXTENSION);
+
+       ast_msg_queue(msg);
+       msg = NULL;
+
+       if (user_event_wait_for_events(test, DEFAULT_EXPECTED_EVENTS)) {
+               ast_test_status_update(test, "Failed to received %d expected user events\n", DEFAULT_EXPECTED_EVENTS);
+               ast_test_set_result(test, AST_TEST_FAIL);
+       }
+
+       /* This will automatically fail the test if we don't get the message */
+       handler_wait_for_message(test);
+
+       result = ast_msg_handler_unregister(&test_msg_handler);
+       ast_test_validate(test, result == 0);
+
+       if (verify_bad_headers(test)) {
+               return AST_TEST_FAIL;
+       }
+
+       return AST_TEST_PASS;
+}
+
+AST_TEST_DEFINE(test_message_has_destination_dialplan)
+{
+       RAII_VAR(struct ast_msg *, msg, NULL, ast_msg_safe_destroy);
+
+       switch (cmd) {
+       case TEST_INIT:
+               info->name = __func__;
+               info->category = TEST_CATEGORY;
+               info->summary = "Test checking for a dialplan destination";
+               info->description =
+                       "Test that a message's destination is verified via the\n"
+                       "dialplan\n";
+               return AST_TEST_NOT_RUN;
+       case TEST_EXECUTE:
+               break;
+       }
+
+       msg = ast_msg_alloc();
+       ast_test_validate(test, msg != NULL);
+
+       ast_msg_set_context(msg, TEST_CONTEXT);
+       ast_msg_set_exten(msg, TEST_EXTENSION);
+       ast_test_validate(test, ast_msg_has_destination(msg) == 1);
+
+       ast_msg_set_context(msg, "__I_SHOULD_NOT_EXIST_PLZ__");
+       ast_test_validate(test, ast_msg_has_destination(msg) == 0);
+
+       ast_msg_set_context(msg, TEST_CONTEXT);
+       ast_msg_set_exten(msg, "__I_SHOULD_NOT_EXIST_PLZ__");
+       ast_test_validate(test, ast_msg_has_destination(msg) == 0);
+
+       ast_msg_set_exten(msg, NULL);
+       ast_test_validate(test, ast_msg_has_destination(msg) == 0);
+
+       ast_msg_set_context(msg, NULL);
+       ast_msg_set_exten(msg, TEST_EXTENSION);
+       ast_test_validate(test, ast_msg_has_destination(msg) == 0);
+
+       return AST_TEST_PASS;
+}
+
+AST_TEST_DEFINE(test_message_has_destination_handler)
+{
+       RAII_VAR(struct ast_msg *, msg, NULL, ast_msg_safe_destroy);
+       int result;
+
+       switch (cmd) {
+       case TEST_INIT:
+               info->name = __func__;
+               info->category = TEST_CATEGORY;
+               info->summary = "Test checking for a handler destination";
+               info->description =
+                       "Test that a message's destination is verified via a\n"
+                       "handler\n";
+               return AST_TEST_NOT_RUN;
+       case TEST_EXECUTE:
+               break;
+       }
+
+       result = ast_msg_handler_register(&test_msg_handler);
+       ast_test_validate(test, result == 0);
+
+       msg = ast_msg_alloc();
+       ast_test_validate(test, msg != NULL);
+
+       ast_msg_set_to(msg, "foo");
+       ast_msg_set_context(msg, TEST_CONTEXT);
+       ast_msg_set_exten(msg, NULL);
+       ast_test_validate(test, ast_msg_has_destination(msg) == 1);
+
+       ast_msg_set_context(msg, NULL);
+       ast_test_validate(test, ast_msg_has_destination(msg) == 1);
+
+       ast_msg_set_to(msg, "__I_SHOULD_NOT_EXIST_PLZ__");
+       ast_test_validate(test, ast_msg_has_destination(msg) == 0);
+
+       result = ast_msg_handler_unregister(&test_msg_handler);
+       ast_test_validate(test, result == 0);
+
+       return AST_TEST_PASS;
+}
+
+AST_TEST_DEFINE(test_message_msg_send)
+{
+       RAII_VAR(struct ast_msg *, msg, NULL, ast_msg_safe_destroy);
+
+       switch (cmd) {
+       case TEST_INIT:
+               info->name = __func__;
+               info->category = TEST_CATEGORY;
+               info->summary = "Test message routing";
+               info->description =
+                       "Test that a message can be routed if it has\n"
+                       "a valid handler\n";
+               return AST_TEST_NOT_RUN;
+       case TEST_EXECUTE:
+               break;
+       }
+
+       ast_test_validate(test, ast_msg_tech_register(&test_msg_tech) == 0);
+       ast_test_validate(test, ast_msg_handler_register(&test_msg_handler) == 0);
+
+       msg = ast_msg_alloc();
+       ast_test_validate(test, msg != NULL);
+
+       ast_msg_set_to(msg, "foo");
+       ast_msg_set_context(msg, TEST_CONTEXT);
+       ast_msg_set_exten(msg, NULL);
+       ast_test_validate(test, ast_msg_has_destination(msg) == 1);
+
+       if (!ast_msg_send(msg, "testmsg:foo", "blah")) {
+               msg = NULL;
+       } else {
+               ast_test_status_update(test, "Failed to send message\n");
+               ast_test_set_result(test, AST_TEST_FAIL);
+       }
+
+       ast_test_validate(test, ast_msg_handler_unregister(&test_msg_handler) == 0);
+       ast_test_validate(test, ast_msg_tech_unregister(&test_msg_tech) == 0);
+
+       return AST_TEST_PASS;
+}
+
+static int test_init_cb(struct ast_test_info *info, struct ast_test *test)
+{
+       received_user_events = 0;
+       handler_received_message = 0;
+       message_received = 0;
+
+       AST_VECTOR_INIT(&expected_user_event_fields, DEFAULT_EXPECTED_EVENTS);
+       AST_VECTOR_INIT(&bad_headers, DEFAULT_EXPECTED_EVENTS);
+
+       return 0;
+}
+
+#define FREE_VARIABLE_VECTOR(vector) do { \
+       int i; \
+       for (i = 0; i < AST_VECTOR_SIZE(&(vector)); i++) { \
+               struct ast_variable *headers; \
+               headers = AST_VECTOR_GET(&(vector), i); \
+               if (!headers) { \
+                       continue; \
+               } \
+               ast_variables_destroy(headers); \
+       } \
+       AST_VECTOR_FREE(&(vector)); \
+       } while (0)
+
+
+static int test_cleanup_cb(struct ast_test_info *info, struct ast_test *test)
+{
+       FREE_VARIABLE_VECTOR(expected_user_event_fields);
+       FREE_VARIABLE_VECTOR(bad_headers);
+
+       return 0;
+}
+
+static int unload_module(void)
+{
+       AST_TEST_UNREGISTER(test_message_msg_tech_registration);
+       AST_TEST_UNREGISTER(test_message_msg_handler_registration);
+       AST_TEST_UNREGISTER(test_message_manipulation);
+       AST_TEST_UNREGISTER(test_message_queue_dialplan_nominal);
+       AST_TEST_UNREGISTER(test_message_queue_handler_nominal);
+       AST_TEST_UNREGISTER(test_message_queue_both_nominal);
+       AST_TEST_UNREGISTER(test_message_has_destination_dialplan);
+       AST_TEST_UNREGISTER(test_message_has_destination_handler);
+       AST_TEST_UNREGISTER(test_message_msg_send);
+
+       if (test_message_context) {
+               ast_context_destroy(test_message_context, AST_MODULE);
+       }
+
+       ast_manager_unregister_hook(&user_event_hook);
+
+       return 0;
+}
+
+static int create_test_dialplan(void)
+{
+       int res = 0;
+
+       test_message_context = ast_context_find_or_create(NULL, NULL, TEST_CONTEXT, AST_MODULE);
+       if (!test_message_context) {
+               return -1;
+       }
+
+       res |= ast_add_extension(TEST_CONTEXT, 0, TEST_EXTENSION, 1, NULL, NULL,
+                                "UserEvent", "TestMessageUnitTest,Verify:To,Value:${MESSAGE(to)}",
+                                NULL, AST_MODULE);
+       res |= ast_add_extension(TEST_CONTEXT, 0, TEST_EXTENSION, 2, NULL, NULL,
+                                "UserEvent", "TestMessageUnitTest,Verify:From,Value:${MESSAGE(from)}",
+                                NULL, AST_MODULE);
+       res |= ast_add_extension(TEST_CONTEXT, 0, TEST_EXTENSION, 3, NULL, NULL,
+                                "UserEvent", "TestMessageUnitTest,Verify:Body,Value:${MESSAGE(body)}",
+                                NULL, AST_MODULE);
+       res |= ast_add_extension(TEST_CONTEXT, 0, TEST_EXTENSION, 4, NULL, NULL,
+                                "UserEvent", "TestMessageUnitTest,Verify:Custom,Value:${MESSAGE_DATA(custom_data)}",
+                                NULL, AST_MODULE);
+       res |= ast_add_extension(TEST_CONTEXT, 0, TEST_EXTENSION, 5, NULL, NULL,
+                                "Set", "MESSAGE_DATA(custom_data)=${MESSAGE_DATA(custom_data)}",
+                                NULL, AST_MODULE);
+       res |= ast_add_extension(TEST_CONTEXT, 0, TEST_EXTENSION, 6, NULL, NULL,
+                                "MessageSend", "testmsg:${MESSAGE(from)},testmsg:${MESSAGE(to)}",
+                                NULL, AST_MODULE);
+
+       ast_manager_register_hook(&user_event_hook);
+
+       return res;
+}
+
+static int load_module(void)
+{
+       AST_TEST_REGISTER(test_message_msg_tech_registration);
+       AST_TEST_REGISTER(test_message_msg_handler_registration);
+       AST_TEST_REGISTER(test_message_manipulation);
+       AST_TEST_REGISTER(test_message_queue_dialplan_nominal);
+       AST_TEST_REGISTER(test_message_queue_handler_nominal);
+       AST_TEST_REGISTER(test_message_queue_both_nominal);
+       AST_TEST_REGISTER(test_message_has_destination_dialplan);
+       AST_TEST_REGISTER(test_message_has_destination_handler);
+       AST_TEST_REGISTER(test_message_msg_send);
+
+       create_test_dialplan();
+
+       ast_test_register_init(TEST_CATEGORY, test_init_cb);
+       ast_test_register_cleanup(TEST_CATEGORY, test_cleanup_cb);
+
+       return AST_MODULE_LOAD_SUCCESS;
+}
+
+
+AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "Out-of-call text message support");