ARI: Fix endpoint/channel subscription issues; allow for subscriptions to tech
authorMatthew Jordan <mjordan@digium.com>
Tue, 22 Jul 2014 16:20:58 +0000 (16:20 +0000)
committerMatthew Jordan <mjordan@digium.com>
Tue, 22 Jul 2014 16:20:58 +0000 (16:20 +0000)
This patch serves two purposes:
(1) It fixes some bugs with endpoint subscriptions not reporting all of the
    channel events
(2) It serves as the preliminary work needed for ASTERISK-23692, which allows
    for sending/receiving arbitrary out of call text messages through ARI in a
    technology agnostic fashion.

The messaging functionality described on ASTERISK-23692 requires two things:
(1) The ability to send/receive messages associated with an endpoint. This is
    relatively straight forwards with the endpoint core in Asterisk now.
(2) The ability to send/receive messages associated with a technology and an
    arbitrary technology defined URI. This is less straight forward, as
    endpoints are formed from a tech + resource pair. We don't have a
    mechanism to note that a technology that *may* have endpoints exists.

This patch provides such a mechanism, and fixes a few bugs along the way.

The first major bug this patch fixes is the forwarding of channel messages
to their respective endpoints. Prior to this patch, there were two problems:
(1) Channel caching messages weren't forwarded. Thus, the endpoints missed
    most of the interesting bits (such as channel creation, destruction, state
    changes, etc.)
(2) Channels weren't associated with their endpoint until after creation.
    This resulted in endpoints missing the channel creation message, which
    limited the usefulness of the subscription in the first place (a major use
    case being 'tell me when this endpoint has a channel'). Unfortunately,
    this meant another parameter to ast_channel_alloc. Since not all channel
    technologies support an ast_endpoint, this patch makes such a call
    optional and opts for a new function, ast_channel_alloc_with_endpoint.

When endpoints are created, they will implicitly create a technology endpoint
for their technology (if one does not already exist). A technology endpoint is
special in that it has no state, cannot have channels created for it, cannot
be created explicitly, and cannot be destroyed except on shutdown. It does,
however, have all messages from other endpoints in its technology forwarded to
it.

Combined with the bug fixes, we now have Stasis messages being properly
forwarded. Consider the following scenario: two PJSIP endpoints (foo and bar),
where bar has a single channel associated with it and foo has two channels
associated with it. The messages would be forwarded as follows:

channel PJSIP/foo-1 --
                      \
                       --> endpoint PJSIP/foo --
                      /                         \
channel PJSIP/foo-2 --                           \
                                                  ---- > endpoint PJSIP
                                                /
channel PJSIP/bar-1 -----> endpoint PJSIP/bar --

ARI, through the applications resource, can:
 - subscribe to endpoint:PJSIP/foo and get notifications for channels
   PJSIP/foo-1,PJSIP/foo-2 and endpoint PJSIP/foo
 - subscribe to endpoint:PJSIP/bar and get notifications for channels
   PJSIP/bar-1 and endpoint PJSIP/bar
 - subscribe to endpoint:PJSIP and get notifications for channels
   PJSIP/foo-1,PJSIP/foo-2,PJSIP/bar-1 and endpoints PJSIP/foo,PJSIP/bar

Note that since endpoint PJSIP never changes, it never has events itself. It
merely provides an aggregation point for all other endpoints in its technology
(which in turn aggregate all channel messages associated with that endpoint).

This patch also adds endpoints to res_xmpp and chan_motif, because the actual
messaging work will need it (messaging without XMPP is just sad).

Review: https://reviewboard.asterisk.org/r/3760/

ASTERISK-23692
........

Merged revisions 419196 from http://svn.asterisk.org/svn/asterisk/branches/12

git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@419203 65c4cc65-6c06-0410-ace0-fbb531ad65f3

14 files changed:
channels/chan_iax2.c
channels/chan_motif.c
channels/chan_pjsip.c
channels/chan_sip.c
include/asterisk/channel.h
include/asterisk/endpoints.h
include/asterisk/xmpp.h
main/channel.c
main/channel_internal_api.c
main/endpoints.c
res/ari/resource_applications.h
res/ari/resource_endpoints.c
res/res_xmpp.c
rest-api/api-docs/applications.json

index 424f5ad..59f7944 100644 (file)
@@ -5879,12 +5879,14 @@ static int iax2_getpeertrunk(struct ast_sockaddr addr)
 /*! \brief  Create new call, interface with the PBX core */
 static struct ast_channel *ast_iax2_new(int callno, int state, iax2_format capability, const struct ast_assigned_ids *assignedids, const struct ast_channel *requestor, unsigned int cachable)
 {
-       struct ast_channel *tmp;
+       struct ast_channel *tmp = NULL;
        struct chan_iax2_pvt *i;
+       struct iax2_peer *peer;
        struct ast_variable *v = NULL;
        struct ast_format_cap *native;
        struct ast_format *tmpfmt;
        struct ast_callid *callid;
+       char *peer_name = NULL;
 
        if (!(i = iaxs[callno])) {
                ast_log(LOG_WARNING, "No IAX2 pvt found for callno '%d' !\n", callno);
@@ -5896,9 +5898,27 @@ static struct ast_channel *ast_iax2_new(int callno, int state, iax2_format capab
                return NULL;
        }
 
-       /* Don't hold call lock */
+       if (!ast_strlen_zero(i->peer)) {
+               peer_name = ast_strdupa(i->peer);
+       } else if (!ast_strlen_zero(i->host)) {
+               peer_name = ast_strdupa(i->host);
+       }
+
+       /* Don't hold call lock while making a channel or looking up a peer */
        ast_mutex_unlock(&iaxsl[callno]);
-       tmp = ast_channel_alloc(1, state, i->cid_num, i->cid_name, i->accountcode, i->exten, i->context, assignedids, requestor, i->amaflags, "IAX2/%s-%d", i->host, i->callno);
+
+       if (!ast_strlen_zero(peer_name)) {
+               peer = find_peer(peer_name, 1);
+               if (peer && peer->endpoint) {
+                       tmp = ast_channel_alloc_with_endpoint(1, state, i->cid_num, i->cid_name, i->accountcode, i->exten, i->context, assignedids, requestor, i->amaflags, peer->endpoint, "IAX2/%s-%d", i->host, i->callno);
+               }
+               ao2_cleanup(peer);
+       }
+
+       if (!tmp) {
+               tmp = ast_channel_alloc(1, state, i->cid_num, i->cid_name, i->accountcode, i->exten, i->context, assignedids, requestor, i->amaflags, "IAX2/%s-%d", i->host, i->callno);
+       }
+
        ast_mutex_lock(&iaxsl[callno]);
        if (i != iaxs[callno]) {
                if (tmp) {
index 1bdc8aa..e294852 100644 (file)
@@ -75,6 +75,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
 #include "asterisk/causes.h"
 #include "asterisk/abstract_jb.h"
 #include "asterisk/xmpp.h"
+#include "asterisk/endpoints.h"
 #include "asterisk/stasis_channels.h"
 #include "asterisk/format_cache.h"
 
@@ -783,7 +784,7 @@ static struct ast_channel *jingle_new(struct jingle_endpoint *endpoint, struct j
                return NULL;
        }
 
-       if (!(chan = ast_channel_alloc(1, state, S_OR(title, ""), S_OR(cid_name, ""), "", "", "", assignedids, requestor, 0, "Motif/%s-%04lx", str, (unsigned long)(ast_random() & 0xffff)))) {
+       if (!(chan = ast_channel_alloc_with_endpoint(1, state, S_OR(title, ""), S_OR(cid_name, ""), "", "", "", assignedids, requestor, 0, endpoint->connection->endpoint, "Motif/%s-%04lx", str, (unsigned long)(ast_random() & 0xffff)))) {
                ao2_ref(caps, -1);
                return NULL;
        }
index 5812360..f638a1e 100644 (file)
@@ -371,8 +371,12 @@ static struct ast_channel *chan_pjsip_new(struct ast_sip_session *session, int s
                return NULL;
        }
 
-       if (!(chan = ast_channel_alloc(1, state, S_OR(session->id.number.str, ""), S_OR(session->id.name.str, ""), session->endpoint->accountcode, "", "", assignedids, requestor, 0, "PJSIP/%s-%08x", ast_sorcery_object_get_id(session->endpoint),
-               (unsigned)ast_atomic_fetchadd_int((int *)&chan_idx, +1)))) {
+       chan = ast_channel_alloc_with_endpoint(1, state, S_OR(session->id.number.str, ""),
+                                S_OR(session->id.name.str, ""), session->endpoint->accountcode, "",
+                                "", assignedids, requestor, 0, session->endpoint->persistent,
+                                "PJSIP/%s-%08x", ast_sorcery_object_get_id(session->endpoint),
+                                (unsigned)ast_atomic_fetchadd_int((int *)&chan_idx, +1));
+       if (!chan) {
                ao2_ref(caps, -1);
                return NULL;
        }
@@ -455,8 +459,6 @@ static struct ast_channel *chan_pjsip_new(struct ast_sip_session *session, int s
                ast_rtp_instance_set_channel_id(pvt->media[SIP_MEDIA_VIDEO]->rtp, ast_channel_uniqueid(chan));
        }
 
-       ast_endpoint_add_channel(session->endpoint->persistent, chan);
-
        return chan;
 }
 
index 44629dc..2a32971 100644 (file)
@@ -8071,9 +8071,14 @@ static struct ast_channel *sip_new(struct sip_pvt *i, int state, const char *tit
                        my_name = ast_strdupa(i->fromdomain);
                }
 
-               sip_pvt_unlock(i);
                /* Don't hold a sip pvt lock while we allocate a channel */
-               tmp = ast_channel_alloc(1, state, i->cid_num, i->cid_name, i->accountcode, i->exten, i->context, assignedids, requestor, i->amaflags, "SIP/%s-%08x", my_name, (unsigned)ast_atomic_fetchadd_int((int *)&chan_idx, +1));
+               sip_pvt_unlock(i);
+
+               if (i->relatedpeer && i->relatedpeer->endpoint) {
+                       tmp = ast_channel_alloc_with_endpoint(1, state, i->cid_num, i->cid_name, i->accountcode, i->exten, i->context, assignedids, requestor, i->amaflags, i->relatedpeer->endpoint, "SIP/%s-%08x", my_name, (unsigned)ast_atomic_fetchadd_int((int *)&chan_idx, +1));
+               } else {
+                       tmp = ast_channel_alloc(1, state, i->cid_num, i->cid_name, i->accountcode, i->exten, i->context, assignedids, requestor, i->amaflags, "SIP/%s-%08x", my_name, (unsigned)ast_atomic_fetchadd_int((int *)&chan_idx, +1));
+               }
        }
        if (!tmp) {
                ast_log(LOG_WARNING, "Unable to allocate AST channel structure for SIP channel\n");
@@ -8082,16 +8087,6 @@ static struct ast_channel *sip_new(struct sip_pvt *i, int state, const char *tit
                return NULL;
        }
 
-       if (i->relatedpeer && i->relatedpeer->endpoint) {
-               if (ast_endpoint_add_channel(i->relatedpeer->endpoint, tmp)) {
-                       ast_channel_unlock(tmp);
-                       ast_channel_unref(tmp);
-                       ao2_ref(caps, -1);
-                       sip_pvt_lock(i);
-                       return NULL;
-               }
-       }
-
        ast_channel_stage_snapshot(tmp);
 
        /* If we sent in a callid, bind it to the channel. */
index d118bc8..d5d32c9 100644 (file)
@@ -1158,11 +1158,12 @@ struct ast_datastore *ast_channel_datastore_find(struct ast_channel *chan, const
  *       and "default" context.
  * \note Since 12.0.0 this function returns with the newly created channel locked.
  */
-struct ast_channel * attribute_malloc __attribute__((format(printf, 14, 15)))
+struct ast_channel * attribute_malloc __attribute__((format(printf, 15, 16)))
        __ast_channel_alloc(int needqueue, int state, const char *cid_num,
                const char *cid_name, const char *acctcode,
                const char *exten, const char *context, const struct ast_assigned_ids *assignedids,
                const struct ast_channel *requestor, enum ama_flags amaflag,
+               struct ast_endpoint *endpoint,
                const char *file, int line, const char *function,
                const char *name_fmt, ...);
 
@@ -1178,9 +1179,14 @@ struct ast_channel * attribute_malloc __attribute__((format(printf, 14, 15)))
  * \note Since 12.0.0 this function returns with the newly created channel locked.
  */
 #define ast_channel_alloc(needqueue, state, cid_num, cid_name, acctcode, exten, context, assignedids, requestor, amaflag, ...) \
-       __ast_channel_alloc(needqueue, state, cid_num, cid_name, acctcode, exten, context, assignedids, requestor, amaflag, \
+       __ast_channel_alloc(needqueue, state, cid_num, cid_name, acctcode, exten, context, assignedids, requestor, amaflag, NULL, \
                __FILE__, __LINE__, __FUNCTION__, __VA_ARGS__)
 
+#define ast_channel_alloc_with_endpoint(needqueue, state, cid_num, cid_name, acctcode, exten, context, assignedids, requestor, amaflag, endpoint, ...) \
+       __ast_channel_alloc((needqueue), (state), (cid_num), (cid_name), (acctcode), (exten), (context), (assignedids), (requestor), (amaflag), (endpoint), \
+               __FILE__, __LINE__, __FUNCTION__, __VA_ARGS__)
+
+
 #if defined(REF_DEBUG) || defined(__AST_DEBUG_MALLOC)
 /*!
  * \brief Create a fake channel structure
index 7a7a3f6..663dd94 100644 (file)
@@ -77,11 +77,17 @@ const char *ast_endpoint_state_to_string(enum ast_endpoint_state state);
 struct ast_endpoint;
 
 /*!
- * \brief Finds the endpoint with the given tech/resource id.
+ * \brief Finds the endpoint with the given tech[/resource] id.
  *
  * Endpoints are refcounted, so ao2_cleanup() when you're done.
  *
- * \param id Tech/resource id to look for.
+ * \note The resource portion of an ID is optional. If not provided,
+ *       an aggregate endpoint for the entire technology is returned.
+ *       These endpoints must not be modified, but can be subscribed
+ *       to in order to receive updates for all endpoints of a given
+ *       technology.
+ *
+ * \param id Tech[/resource] id to look for.
  * \return Associated endpoint.
  * \return \c NULL if not found.
  *
@@ -131,6 +137,9 @@ const char *ast_endpoint_get_tech(const struct ast_endpoint *endpoint);
  *
  * This is unique for the endpoint's technology, and immutable.
  *
+ * \note If the endpoint being queried is a technology aggregate
+ *       endpoint, this will be an empty string.
+ *
  * \param endpoint The endpoint.
  * \return Resource name of the endpoint.
  * \return \c NULL if endpoint is \c NULL.
index 58b14e4..294b4fd 100644 (file)
@@ -106,6 +106,8 @@ struct ast_xmpp_message {
        AST_LIST_ENTRY(ast_xmpp_message) list; /*!< Linked list information */
 };
 
+struct ast_endpoint;
+
 /*! \brief XMPP Buddy */
 struct ast_xmpp_buddy {
        char id[XMPP_MAX_JIDLEN];        /*!< JID of the buddy */
@@ -116,9 +118,11 @@ struct ast_xmpp_buddy {
 /*! \brief XMPP Client Connection */
 struct ast_xmpp_client {
        AST_DECLARE_STRING_FIELDS(
-               AST_STRING_FIELD(name); /*!< Name of the client configuration */
+               /*! Name of the client configuration */
+               AST_STRING_FIELD(name);
                );
-       char mid[6]; /* Message ID */
+       /*! Message ID */
+       char mid[6];
        iksid *jid;
        iksparser *parser;
        iksfilter *filter;
@@ -134,9 +138,14 @@ struct ast_xmpp_client {
        AST_LIST_HEAD(, ast_xmpp_message) messages;
        pthread_t thread;
        int timeout;
-       unsigned int reconnect:1; /*!< Reconnect this client */
-       struct stasis_subscription *mwi_sub; /*!< If distributing event information the MWI subscription */
-       struct stasis_subscription *device_state_sub; /*!< If distributing event information the device state subscription */
+       /*! Reconnect this client */
+       unsigned int reconnect:1;
+       /*! If distributing event information the MWI subscription */
+       struct stasis_subscription *mwi_sub;
+       /*! If distributing event information the device state subscription */
+       struct stasis_subscription *device_state_sub;
+       /*! The endpoint associated with this client */
+       struct ast_endpoint *endpoint;
 };
 
 /*!
index b17ddde..e9e37c0 100644 (file)
@@ -783,10 +783,11 @@ static void ast_channel_destructor(void *obj);
 static void ast_dummy_channel_destructor(void *obj);
 
 /*! \brief Create a new channel structure */
-static struct ast_channel * attribute_malloc __attribute__((format(printf, 13, 0)))
+static struct ast_channel * attribute_malloc __attribute__((format(printf, 15, 0)))
 __ast_channel_alloc_ap(int needqueue, int state, const char *cid_num, const char *cid_name,
                       const char *acctcode, const char *exten, const char *context, const struct ast_assigned_ids *assignedids,
-                      const struct ast_channel *requestor, enum ama_flags amaflag, const char *file, int line,
+                      const struct ast_channel *requestor, enum ama_flags amaflag, struct ast_endpoint *endpoint,
+                      const char *file, int line,
                       const char *function, const char *name_fmt, va_list ap)
 {
        struct ast_channel *tmp;
@@ -963,6 +964,10 @@ __ast_channel_alloc_ap(int needqueue, int state, const char *cid_num, const char
 
        ao2_link(channels, tmp);
 
+       if (endpoint) {
+               ast_endpoint_add_channel(endpoint, tmp);
+       }
+
        /*
         * And now, since the channel structure is built, and has its name, let
         * the world know of its existance
@@ -975,6 +980,7 @@ struct ast_channel *__ast_channel_alloc(int needqueue, int state, const char *ci
                                        const char *cid_name, const char *acctcode,
                                        const char *exten, const char *context, const struct ast_assigned_ids *assignedids,
                                        const struct ast_channel *requestor, enum ama_flags amaflag,
+                                       struct ast_endpoint *endpoint,
                                        const char *file, int line, const char *function,
                                        const char *name_fmt, ...)
 {
@@ -983,7 +989,7 @@ struct ast_channel *__ast_channel_alloc(int needqueue, int state, const char *ci
 
        va_start(ap, name_fmt);
        result = __ast_channel_alloc_ap(needqueue, state, cid_num, cid_name, acctcode, exten, context,
-                                       assignedids, requestor, amaflag, file, line, function, name_fmt, ap);
+                                       assignedids, requestor, amaflag, endpoint, file, line, function, name_fmt, ap);
        va_end(ap);
 
        return result;
index 4ad0ef3..8a9e18e 100644 (file)
@@ -219,6 +219,7 @@ struct ast_channel {
        struct timeval sending_dtmf_tv;         /*!< The time this channel started sending the current digit. (Invalid if sending_dtmf_digit is zero.) */
        struct stasis_cp_single *topics;                /*!< Topic for all channel's events */
        struct stasis_forward *endpoint_forward;        /*!< Subscription for event forwarding to endpoint's topic */
+       struct stasis_forward *endpoint_cache_forward; /*!< Subscription for cache updates to endpoint's topic */
 };
 
 /*! \brief The monotonically increasing integer counter for channel uniqueids */
@@ -1528,6 +1529,7 @@ void ast_channel_internal_cleanup(struct ast_channel *chan)
        ast_string_field_free_memory(chan);
 
        chan->endpoint_forward = stasis_forward_cancel(chan->endpoint_forward);
+       chan->endpoint_cache_forward = stasis_forward_cancel(chan->endpoint_cache_forward);
 
        stasis_cp_single_unsubscribe(chan->topics);
        chan->topics = NULL;
@@ -1570,8 +1572,14 @@ int ast_channel_forward_endpoint(struct ast_channel *chan,
        chan->endpoint_forward =
                stasis_forward_all(ast_channel_topic(chan),
                        ast_endpoint_topic(endpoint));
+       if (!chan->endpoint_forward) {
+               return -1;
+       }
 
-       if (chan->endpoint_forward == NULL) {
+       chan->endpoint_cache_forward = stasis_forward_all(ast_channel_topic_cached(chan),
+               ast_endpoint_topic(endpoint));
+       if (!chan->endpoint_cache_forward) {
+               chan->endpoint_forward = stasis_forward_cancel(chan->endpoint_forward);
                return -1;
        }
 
index 10b32e2..985f6e6 100644 (file)
@@ -46,8 +46,13 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
 /*! Buckets for endpoint hash. Keep it prime! */
 #define ENDPOINT_BUCKETS 127
 
+/*! Buckets for technology endpoints. */
+#define TECH_ENDPOINT_BUCKETS 11
+
 static struct ao2_container *endpoints;
 
+static struct ao2_container *tech_endpoints;
+
 struct ast_endpoint {
        AST_DECLARE_STRING_FIELDS(
                AST_STRING_FIELD(tech); /*!< Technology (SIP, IAX2, etc.). */
@@ -69,6 +74,8 @@ struct ast_endpoint {
        struct stasis_message_router *router;
        /*! ast_str_container of channels associated with this endpoint */
        struct ao2_container *channel_ids;
+       /*! Forwarding subscription from an endpoint to its tech endpoint */
+       struct stasis_forward *tech_forward;
 };
 
 static int endpoint_hash(const void *obj, int flags)
@@ -121,7 +128,13 @@ static int endpoint_cmp(void *obj, void *arg, int flags)
 
 struct ast_endpoint *ast_endpoint_find_by_id(const char *id)
 {
-       return ao2_find(endpoints, id, OBJ_KEY);
+       struct ast_endpoint *endpoint = ao2_find(endpoints, id, OBJ_KEY);
+
+       if (!endpoint) {
+               endpoint = ao2_find(tech_endpoints, id, OBJ_KEY);
+       }
+
+       return endpoint;
 }
 
 struct stasis_topic *ast_endpoint_topic(struct ast_endpoint *endpoint)
@@ -181,6 +194,8 @@ static void endpoint_dtor(void *obj)
        ao2_cleanup(endpoint->router);
        endpoint->router = NULL;
 
+       endpoint->tech_forward = stasis_forward_cancel(endpoint->tech_forward);
+
        stasis_cp_single_unsubscribe(endpoint->topics);
        endpoint->topics = NULL;
 
@@ -196,6 +211,7 @@ int ast_endpoint_add_channel(struct ast_endpoint *endpoint,
 {
        ast_assert(chan != NULL);
        ast_assert(endpoint != NULL);
+       ast_assert(!ast_strlen_zero(endpoint->resource));
 
        ast_channel_forward_endpoint(chan, endpoint);
 
@@ -242,19 +258,21 @@ static void endpoint_default(void *data,
        }
 }
 
-struct ast_endpoint *ast_endpoint_create(const char *tech, const char *resource)
+static struct ast_endpoint *endpoint_internal_create(const char *tech, const char *resource)
 {
        RAII_VAR(struct ast_endpoint *, endpoint, NULL, ao2_cleanup);
+       RAII_VAR(struct ast_endpoint *, tech_endpoint, NULL, ao2_cleanup);
        int r = 0;
 
-       if (ast_strlen_zero(tech)) {
-               ast_log(LOG_ERROR, "Endpoint tech cannot be empty\n");
-               return NULL;
-       }
-
-       if (ast_strlen_zero(resource)) {
-               ast_log(LOG_ERROR, "Endpoint resource cannot be empty\n");
-               return NULL;
+       /* Get/create the technology endpoint */
+       if (!ast_strlen_zero(resource)) {
+               tech_endpoint = ao2_find(tech_endpoints, tech, OBJ_KEY);
+               if (!tech_endpoint) {
+                       tech_endpoint = endpoint_internal_create(tech, NULL);
+                       if (!tech_endpoint) {
+                               return NULL;
+                       }
+               }
        }
 
        endpoint = ao2_alloc(sizeof(*endpoint), endpoint_dtor);
@@ -268,10 +286,12 @@ struct ast_endpoint *ast_endpoint_create(const char *tech, const char *resource)
        if (ast_string_field_init(endpoint, 80) != 0) {
                return NULL;
        }
-
        ast_string_field_set(endpoint, tech, tech);
-       ast_string_field_set(endpoint, resource, resource);
-       ast_string_field_build(endpoint, id, "%s/%s", tech, resource);
+       ast_string_field_set(endpoint, resource, S_OR(resource, ""));
+       ast_string_field_build(endpoint, id, "%s%s%s",
+               tech,
+               !ast_strlen_zero(resource) ? "/" : "",
+               S_OR(resource, ""));
 
        /* All access to channel_ids should be covered by the endpoint's
         * lock; no extra lock needed. */
@@ -287,24 +307,47 @@ struct ast_endpoint *ast_endpoint_create(const char *tech, const char *resource)
                return NULL;
        }
 
-       endpoint->router = stasis_message_router_create(ast_endpoint_topic(endpoint));
-       if (!endpoint->router) {
-               return NULL;
-       }
-       r |= stasis_message_router_add(endpoint->router,
-               stasis_cache_clear_type(), endpoint_cache_clear,
-               endpoint);
-       r |= stasis_message_router_set_default(endpoint->router,
-               endpoint_default, endpoint);
-
-       endpoint_publish_snapshot(endpoint);
+       if (!ast_strlen_zero(resource)) {
+               endpoint->router = stasis_message_router_create(ast_endpoint_topic(endpoint));
+               if (!endpoint->router) {
+                       return NULL;
+               }
+               r |= stasis_message_router_add(endpoint->router,
+                       stasis_cache_clear_type(), endpoint_cache_clear,
+                       endpoint);
+               r |= stasis_message_router_set_default(endpoint->router,
+                       endpoint_default, endpoint);
+               if (r) {
+                       return NULL;
+               }
 
-       ao2_link(endpoints, endpoint);
+               endpoint->tech_forward = stasis_forward_all(stasis_cp_single_topic(endpoint->topics),
+                       stasis_cp_single_topic(tech_endpoint->topics));
+               endpoint_publish_snapshot(endpoint);
+               ao2_link(endpoints, endpoint);
+       } else {
+               ao2_link(tech_endpoints, endpoint);
+       }
 
        ao2_ref(endpoint, +1);
        return endpoint;
 }
 
+struct ast_endpoint *ast_endpoint_create(const char *tech, const char *resource)
+{
+       if (ast_strlen_zero(tech)) {
+               ast_log(LOG_ERROR, "Endpoint tech cannot be empty\n");
+               return NULL;
+       }
+
+       if (ast_strlen_zero(resource)) {
+               ast_log(LOG_ERROR, "Endpoint resource cannot be empty\n");
+               return NULL;
+       }
+
+       return endpoint_internal_create(tech, resource);
+}
+
 static struct stasis_message *create_endpoint_snapshot_message(struct ast_endpoint *endpoint)
 {
        RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
@@ -368,6 +411,8 @@ void ast_endpoint_set_state(struct ast_endpoint *endpoint,
        enum ast_endpoint_state state)
 {
        ast_assert(endpoint != NULL);
+       ast_assert(!ast_strlen_zero(endpoint->resource));
+
        ao2_lock(endpoint);
        endpoint->state = state;
        ao2_unlock(endpoint);
@@ -378,6 +423,8 @@ void ast_endpoint_set_max_channels(struct ast_endpoint *endpoint,
        int max_channels)
 {
        ast_assert(endpoint != NULL);
+       ast_assert(!ast_strlen_zero(endpoint->resource));
+
        ao2_lock(endpoint);
        endpoint->max_channels = max_channels;
        ao2_unlock(endpoint);
@@ -407,6 +454,9 @@ struct ast_endpoint_snapshot *ast_endpoint_snapshot_create(
        void *obj;
        SCOPED_AO2LOCK(lock, endpoint);
 
+       ast_assert(endpoint != NULL);
+       ast_assert(!ast_strlen_zero(endpoint->resource));
+
        channel_count = ao2_container_count(endpoint->channel_ids);
 
        snapshot = ao2_alloc(
@@ -440,6 +490,9 @@ static void endpoint_cleanup(void)
 {
        ao2_cleanup(endpoints);
        endpoints = NULL;
+
+       ao2_cleanup(tech_endpoints);
+       tech_endpoints = NULL;
 }
 
 int ast_endpoint_init(void)
@@ -448,10 +501,15 @@ int ast_endpoint_init(void)
 
        endpoints = ao2_container_alloc(ENDPOINT_BUCKETS, endpoint_hash,
                endpoint_cmp);
-
        if (!endpoints) {
                return -1;
        }
 
+       tech_endpoints = ao2_container_alloc(TECH_ENDPOINT_BUCKETS, endpoint_hash,
+               endpoint_cmp);
+       if (!tech_endpoints) {
+               return -1;
+       }
+
        return 0;
 }
index 888f513..be62e9d 100644 (file)
@@ -67,7 +67,7 @@ void ast_ari_applications_get(struct ast_variable *headers, struct ast_ari_appli
 struct ast_ari_applications_subscribe_args {
        /*! Application's name */
        const char *application_name;
-       /*! Array of URI for event source (channel:{channelId}, bridge:{bridgeId}, endpoint:{tech}/{resource}, deviceState:{deviceName} */
+       /*! Array of URI for event source (channel:{channelId}, bridge:{bridgeId}, endpoint:{tech}[/{resource}], deviceState:{deviceName} */
        const char **event_source;
        /*! Length of event_source array. */
        size_t event_source_count;
@@ -99,7 +99,7 @@ void ast_ari_applications_subscribe(struct ast_variable *headers, struct ast_ari
 struct ast_ari_applications_unsubscribe_args {
        /*! Application's name */
        const char *application_name;
-       /*! Array of URI for event source (channel:{channelId}, bridge:{bridgeId}, endpoint:{tech}/{resource}, deviceState:{deviceName} */
+       /*! Array of URI for event source (channel:{channelId}, bridge:{bridgeId}, endpoint:{tech}[/{resource}], deviceState:{deviceName} */
        const char **event_source;
        /*! Length of event_source array. */
        size_t event_source_count;
index 16b7ebd..ff2b150 100644 (file)
@@ -34,6 +34,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
 #include "asterisk/stasis_app.h"
 #include "asterisk/stasis_endpoints.h"
 #include "asterisk/channel.h"
+#include "asterisk/message.h"
 
 void ast_ari_endpoints_list(struct ast_variable *headers,
        struct ast_ari_endpoints_list_args *args,
@@ -82,6 +83,7 @@ void ast_ari_endpoints_list(struct ast_variable *headers,
 
        ast_ari_response_ok(response, ast_json_ref(json));
 }
+
 void ast_ari_endpoints_list_by_tech(struct ast_variable *headers,
        struct ast_ari_endpoints_list_by_tech_args *args,
        struct ast_ari_response *response)
@@ -89,14 +91,17 @@ void ast_ari_endpoints_list_by_tech(struct ast_variable *headers,
        RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
        RAII_VAR(struct ao2_container *, snapshots, NULL, ao2_cleanup);
        RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
+       struct ast_endpoint *tech_endpoint;
        struct ao2_iterator i;
        void *obj;
 
-       if (!ast_get_channel_tech(args->tech)) {
+       tech_endpoint = ast_endpoint_find_by_id(args->tech);
+       if (!tech_endpoint) {
                ast_ari_response_error(response, 404, "Not Found",
                                       "No Endpoints found - invalid tech %s", args->tech);
                return;
        }
+       ao2_ref(tech_endpoint, -1);
 
        cache = ast_endpoint_cache();
        if (!cache) {
@@ -146,6 +151,7 @@ void ast_ari_endpoints_list_by_tech(struct ast_variable *headers,
        ao2_iterator_destroy(&i);
        ast_ari_response_ok(response, ast_json_ref(json));
 }
+
 void ast_ari_endpoints_get(struct ast_variable *headers,
        struct ast_ari_endpoints_get_args *args,
        struct ast_ari_response *response)
index 0cfc37b..758a5f0 100644 (file)
@@ -559,6 +559,10 @@ static void xmpp_client_destructor(void *obj)
 
        ast_xmpp_client_disconnect(client);
 
+       ast_endpoint_shutdown(client->endpoint);
+       ao2_cleanup(client->endpoint);
+       client->endpoint = NULL;
+
        if (client->filter) {
                iks_filter_delete(client->filter);
        }
@@ -593,6 +597,20 @@ static int xmpp_buddy_cmp(void *obj, void *arg, int flags)
        return !strcmp(buddy1->id, flags & OBJ_KEY ? id : buddy2->id) ? CMP_MATCH | CMP_STOP : 0;
 }
 
+/*! \brief Internal function which changes the XMPP client state */
+static void xmpp_client_change_state(struct ast_xmpp_client *client, int state)
+{
+       if (state == client->state) {
+               return;
+       }
+       client->state = state;
+       if (client->state == XMPP_STATE_DISCONNECTED) {
+               ast_endpoint_set_state(client->endpoint, AST_ENDPOINT_OFFLINE);
+       } else if (client->state == XMPP_STATE_CONNECTED) {
+               ast_endpoint_set_state(client->endpoint, AST_ENDPOINT_ONLINE);
+       }
+}
+
 /*! \brief Allocator function for ast_xmpp_client */
 static struct ast_xmpp_client *xmpp_client_alloc(const char *name)
 {
@@ -605,6 +623,12 @@ static struct ast_xmpp_client *xmpp_client_alloc(const char *name)
        AST_LIST_HEAD_INIT(&client->messages);
        client->thread = AST_PTHREADT_NULL;
 
+       client->endpoint = ast_endpoint_create("XMPP", name);
+       if (!client->endpoint) {
+               ao2_ref(client, -1);
+               return NULL;
+       }
+
        if (!(client->buddies = ao2_container_alloc(BUDDY_BUCKETS, xmpp_buddy_hash, xmpp_buddy_cmp))) {
                ast_log(LOG_ERROR, "Could not initialize buddy container for '%s'\n", name);
                ao2_ref(client, -1);
@@ -626,7 +650,7 @@ static struct ast_xmpp_client *xmpp_client_alloc(const char *name)
        ast_string_field_set(client, name, name);
 
        client->timeout = 50;
-       client->state = XMPP_STATE_DISCONNECTED;
+       xmpp_client_change_state(client, XMPP_STATE_DISCONNECTED);
        ast_copy_string(client->mid, "aaaaa", sizeof(client->mid));
 
        return client;
@@ -2213,12 +2237,6 @@ static const struct ast_msg_tech msg_tech = {
        .msg_send = xmpp_send_cb,
 };
 
-/*! \brief Internal function which changes the XMPP client state */
-static void xmpp_client_change_state(struct ast_xmpp_client *client, int state)
-{
-       client->state = state;
-}
-
 /*! \brief Internal function which creates a buddy on a client */
 static struct ast_xmpp_buddy *xmpp_client_create_buddy(struct ao2_container *container, const char *id)
 {
@@ -3530,7 +3548,7 @@ static int xmpp_action_hook(void *data, int type, iks *node)
 int ast_xmpp_client_disconnect(struct ast_xmpp_client *client)
 {
        if ((client->thread != AST_PTHREADT_NULL) && !pthread_equal(pthread_self(), client->thread)) {
-               client->state = XMPP_STATE_DISCONNECTING;
+               xmpp_client_change_state(client, XMPP_STATE_DISCONNECTING);
                pthread_join(client->thread, NULL);
                client->thread = AST_PTHREADT_NULL;
        }
@@ -3559,7 +3577,7 @@ int ast_xmpp_client_disconnect(struct ast_xmpp_client *client)
                iks_disconnect(client->parser);
        }
 
-       client->state = XMPP_STATE_DISCONNECTED;
+       xmpp_client_change_state(client, XMPP_STATE_DISCONNECTED);
 
        return 0;
 }
@@ -3774,7 +3792,7 @@ static void *xmpp_client_thread(void *data)
                        ast_log(LOG_WARNING, "JABBER: Not Supported\n");
                } else if (res == IKS_NET_DROPPED) {
                        ast_log(LOG_WARNING, "JABBER: Dropped?\n");
-               } else {
+               } else if (res == IKS_NET_UNKNOWN) {
                        ast_debug(5, "JABBER: Unknown\n");
                }
 
index 132dd64..cf0731c 100644 (file)
@@ -68,7 +68,7 @@
                                                },
                                                {
                                                        "name": "eventSource",
-                                                       "description": "URI for event source (channel:{channelId}, bridge:{bridgeId}, endpoint:{tech}/{resource}, deviceState:{deviceName}",
+                                                       "description": "URI for event source (channel:{channelId}, bridge:{bridgeId}, endpoint:{tech}[/{resource}], deviceState:{deviceName}",
                                                        "paramType": "query",
                                                        "required": true,
                                                        "allowMultiple": true,
                                                },
                                                {
                                                        "name": "eventSource",
-                                                       "description": "URI for event source (channel:{channelId}, bridge:{bridgeId}, endpoint:{tech}/{resource}, deviceState:{deviceName}",
+                                                       "description": "URI for event source (channel:{channelId}, bridge:{bridgeId}, endpoint:{tech}[/{resource}], deviceState:{deviceName}",
                                                        "paramType": "query",
                                                        "required": true,
                                                        "allowMultiple": true,