#include "asterisk/stasis_endpoints.h"
#include "asterisk/stasis_message_router.h"
+#define BRIDGE_ALL "__AST_BRIDGE_ALL_TOPIC"
+#define CHANNEL_ALL "__AST_CHANNEL_ALL_TOPIC"
+#define ENDPOINT_ALL "__AST_ENDPOINT_ALL_TOPIC"
+
static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate);
struct stasis_app {
struct stasis_message_router *router;
/*! Router for handling messages to the bridge all \a topic. */
struct stasis_message_router *bridge_router;
+ /*! Optional router for handling endpoint messages in 'all' subscriptions */
+ struct stasis_message_router *endpoint_router;
/*! Container of the channel forwards to this app's topic. */
struct ao2_container *forwards;
/*! Callback function for this application. */
stasis_app_cb handler;
/*! Opaque data to hand to callback function. */
void *data;
+ /*! Subscription model for the application */
+ enum stasis_app_subscription_model subscription_model;
/*! Name of the Stasis application */
char name[];
};
static struct app_forwards *forwards_create_channel(struct stasis_app *app,
struct ast_channel *chan)
{
- RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+ struct app_forwards *forwards;
- if (!app || !chan) {
+ if (!app) {
return NULL;
}
- forwards = forwards_create(app, ast_channel_uniqueid(chan));
+ forwards = forwards_create(app, chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL);
if (!forwards) {
return NULL;
}
forwards->forward_type = FORWARD_CHANNEL;
- forwards->topic_forward = stasis_forward_all(ast_channel_topic(chan),
- app->topic);
- if (!forwards->topic_forward) {
- return NULL;
+ if (chan) {
+ forwards->topic_forward = stasis_forward_all(ast_channel_topic(chan),
+ app->topic);
}
-
forwards->topic_cached_forward = stasis_forward_all(
- ast_channel_topic_cached(chan), app->topic);
- if (!forwards->topic_cached_forward) {
+ chan ? ast_channel_topic_cached(chan) : ast_channel_topic_all_cached(),
+ app->topic);
+
+ if ((!forwards->topic_forward && chan) || !forwards->topic_cached_forward) {
/* Half-subscribed is a bad thing */
- stasis_forward_cancel(forwards->topic_forward);
- forwards->topic_forward = NULL;
+ forwards_unsubscribe(forwards);
+ ao2_ref(forwards, -1);
return NULL;
}
- ao2_ref(forwards, +1);
return forwards;
}
static struct app_forwards *forwards_create_bridge(struct stasis_app *app,
struct ast_bridge *bridge)
{
- RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+ struct app_forwards *forwards;
- if (!app || !bridge) {
+ if (!app) {
return NULL;
}
- forwards = forwards_create(app, bridge->uniqueid);
+ forwards = forwards_create(app, bridge ? bridge->uniqueid : BRIDGE_ALL);
if (!forwards) {
return NULL;
}
forwards->forward_type = FORWARD_BRIDGE;
- forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge),
- app->topic);
- if (!forwards->topic_forward) {
- return NULL;
+ if (bridge) {
+ forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge),
+ app->topic);
}
-
forwards->topic_cached_forward = stasis_forward_all(
- ast_bridge_topic_cached(bridge), app->topic);
- if (!forwards->topic_cached_forward) {
+ bridge ? ast_bridge_topic_cached(bridge) : ast_bridge_topic_all_cached(),
+ app->topic);
+
+ if ((!forwards->topic_forward && bridge) || !forwards->topic_cached_forward) {
/* Half-subscribed is a bad thing */
- stasis_forward_cancel(forwards->topic_forward);
- forwards->topic_forward = NULL;
+ forwards_unsubscribe(forwards);
+ ao2_ref(forwards, -1);
return NULL;
}
- ao2_ref(forwards, +1);
return forwards;
}
+static void endpoint_state_cb(void *data, struct stasis_subscription *sub,
+ struct stasis_message *message)
+{
+ struct stasis_app *app = data;
+
+ stasis_publish(app->topic, message);
+}
+
/*! Forward a endpoint's topics to an app */
static struct app_forwards *forwards_create_endpoint(struct stasis_app *app,
struct ast_endpoint *endpoint)
{
- RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+ struct app_forwards *forwards;
+ int ret = 0;
- if (!app || !endpoint) {
+ if (!app) {
return NULL;
}
- forwards = forwards_create(app, ast_endpoint_get_id(endpoint));
+ forwards = forwards_create(app, endpoint ? ast_endpoint_get_id(endpoint) : ENDPOINT_ALL);
if (!forwards) {
return NULL;
}
forwards->forward_type = FORWARD_ENDPOINT;
- forwards->topic_forward = stasis_forward_all(ast_endpoint_topic(endpoint),
- app->topic);
- if (!forwards->topic_forward) {
- return NULL;
- }
+ if (endpoint) {
+ forwards->topic_forward = stasis_forward_all(ast_endpoint_topic(endpoint),
+ app->topic);
+ forwards->topic_cached_forward = stasis_forward_all(
+ ast_endpoint_topic_cached(endpoint), app->topic);
+
+ if (!forwards->topic_forward || !forwards->topic_cached_forward) {
+ /* Half-subscribed is a bad thing */
+ forwards_unsubscribe(forwards);
+ ao2_ref(forwards, -1);
+ return NULL;
+ }
+ } else {
+ /* Since endpoint subscriptions also subscribe to channels, in the case
+ * of all endpoint subscriptions, we only want messages for the endpoints.
+ * As such, we route those particular messages and then re-publish them
+ * on the app's topic.
+ */
+ ast_assert(app->endpoint_router == NULL);
+ app->endpoint_router = stasis_message_router_create(ast_endpoint_topic_all_cached());
+ if (!app->endpoint_router) {
+ forwards_unsubscribe(forwards);
+ ao2_ref(forwards, -1);
+ return NULL;
+ }
- forwards->topic_cached_forward = stasis_forward_all(
- ast_endpoint_topic_cached(endpoint), app->topic);
- if (!forwards->topic_cached_forward) {
- /* Half-subscribed is a bad thing */
- stasis_forward_cancel(forwards->topic_forward);
- forwards->topic_forward = NULL;
- return NULL;
+ ret |= stasis_message_router_add(app->endpoint_router,
+ ast_endpoint_state_type(), endpoint_state_cb, app);
+ ret |= stasis_message_router_add(app->endpoint_router,
+ ast_endpoint_contact_state_type(), endpoint_state_cb, app);
+
+ if (ret) {
+ ao2_ref(app->endpoint_router, -1);
+ app->endpoint_router = NULL;
+ ao2_ref(forwards, -1);
+ return NULL;
+ }
}
- ao2_ref(forwards, +1);
return forwards;
}
ast_assert(app->router == NULL);
ast_assert(app->bridge_router == NULL);
+ ast_assert(app->endpoint_router == NULL);
ao2_cleanup(app->topic);
app->topic = NULL;
}
}
-struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *data)
+struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *data, enum stasis_app_subscription_model subscription_model)
{
RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
size_t size;
size = sizeof(*app) + strlen(name) + 1;
app = ao2_alloc_options(size, app_dtor, AO2_ALLOC_OPT_LOCK_MUTEX);
-
if (!app) {
return NULL;
}
+ app->subscription_model = subscription_model;
app->forwards = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_MUTEX,
AO2_CONTAINER_ALLOC_OPT_DUPS_OBJ_REJECT,
return app;
}
-struct stasis_topic *ast_app_get_topic(struct stasis_app *app) {
+struct stasis_topic *ast_app_get_topic(struct stasis_app *app)
+{
return app->topic;
}
app->router = NULL;
stasis_message_router_unsubscribe(app->bridge_router);
app->bridge_router = NULL;
+ stasis_message_router_unsubscribe(app->endpoint_router);
+ app->endpoint_router = NULL;
}
int app_is_active(struct stasis_app *app)
int app_subscribe_channel(struct stasis_app *app, struct ast_channel *chan)
{
+ struct app_forwards *forwards;
+ SCOPED_AO2LOCK(lock, app->forwards);
int res;
- if (!app || !chan) {
+ if (!app) {
return -1;
- } else {
- RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
- SCOPED_AO2LOCK(lock, app->forwards);
+ }
- forwards = ao2_find(app->forwards, ast_channel_uniqueid(chan),
- OBJ_SEARCH_KEY | OBJ_NOLOCK);
- if (!forwards) {
- /* Forwards not found, create one */
- forwards = forwards_create_channel(app, chan);
- if (!forwards) {
- return -1;
- }
+ /* If subscribed to all, don't subscribe again */
+ forwards = ao2_find(app->forwards, CHANNEL_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (forwards) {
+ ao2_ref(forwards, -1);
+ return 0;
+ }
- res = ao2_link_flags(app->forwards, forwards,
- OBJ_NOLOCK);
- if (!res) {
- return -1;
- }
+ forwards = ao2_find(app->forwards,
+ chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL,
+ OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (!forwards) {
+ /* Forwards not found, create one */
+ forwards = forwards_create_channel(app, chan);
+ if (!forwards) {
+ return -1;
}
- ++forwards->interested;
- ast_debug(3, "Channel '%s' is %d interested in %s\n", ast_channel_uniqueid(chan), forwards->interested, app->name);
- return 0;
+ res = ao2_link_flags(app->forwards, forwards,
+ OBJ_NOLOCK);
+ if (!res) {
+ ao2_ref(forwards, -1);
+ return -1;
+ }
}
+
+ ++forwards->interested;
+ ast_debug(3, "Channel '%s' is %d interested in %s\n",
+ chan ? ast_channel_uniqueid(chan) : "ALL",
+ forwards->interested,
+ app->name);
+
+ ao2_ref(forwards, -1);
+ return 0;
}
static int subscribe_channel(struct stasis_app *app, void *obj)
RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
SCOPED_AO2LOCK(lock, app->forwards);
+ if (!id) {
+ if (!strcmp(kind, "bridge")) {
+ id = BRIDGE_ALL;
+ } else if (!strcmp(kind, "channel")) {
+ id = CHANNEL_ALL;
+ } else if (!strcmp(kind, "endpoint")) {
+ id = ENDPOINT_ALL;
+ } else {
+ ast_log(LOG_WARNING, "Unknown subscription kind '%s'\n", kind);
+ return -1;
+ }
+ }
+
forwards = ao2_find(app->forwards, id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (!forwards) {
ast_debug(3, "App '%s' not subscribed to %s '%s'\n", app->name, kind, id);
int app_unsubscribe_channel(struct stasis_app *app, struct ast_channel *chan)
{
- if (!app || !chan) {
+ if (!app) {
return -1;
}
- return app_unsubscribe_channel_id(app, ast_channel_uniqueid(chan));
+ return app_unsubscribe_channel_id(app, chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL);
}
int app_unsubscribe_channel_id(struct stasis_app *app, const char *channel_id)
{
- if (!app || !channel_id) {
+ if (!app) {
return -1;
}
int app_is_subscribed_channel_id(struct stasis_app *app, const char *channel_id)
{
RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+
+ if (ast_strlen_zero(channel_id)) {
+ channel_id = CHANNEL_ALL;
+ }
forwards = ao2_find(app->forwards, channel_id, OBJ_SEARCH_KEY);
return forwards != NULL;
}
int app_subscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
{
- if (!app || !bridge) {
+ struct app_forwards *forwards;
+ SCOPED_AO2LOCK(lock, app->forwards);
+
+ if (!app) {
return -1;
- } else {
- RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
- SCOPED_AO2LOCK(lock, app->forwards);
+ }
- forwards = ao2_find(app->forwards, bridge->uniqueid,
- OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ /* If subscribed to all, don't subscribe again */
+ forwards = ao2_find(app->forwards, BRIDGE_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (forwards) {
+ ao2_ref(forwards, -1);
+ return 0;
+ }
+ forwards = ao2_find(app->forwards, bridge ? bridge->uniqueid : BRIDGE_ALL,
+ OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (!forwards) {
+ /* Forwards not found, create one */
+ forwards = forwards_create_bridge(app, bridge);
if (!forwards) {
- /* Forwards not found, create one */
- forwards = forwards_create_bridge(app, bridge);
- if (!forwards) {
- return -1;
- }
- ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
+ return -1;
}
-
- ++forwards->interested;
- ast_debug(3, "Bridge '%s' is %d interested in %s\n", bridge->uniqueid, forwards->interested, app->name);
- return 0;
+ ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
}
+
+ ++forwards->interested;
+ ast_debug(3, "Bridge '%s' is %d interested in %s\n",
+ bridge ? bridge->uniqueid : "ALL",
+ forwards->interested,
+ app->name);
+
+ ao2_ref(forwards, -1);
+ return 0;
}
static int subscribe_bridge(struct stasis_app *app, void *obj)
int app_unsubscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
{
- if (!app || !bridge) {
+ if (!app) {
return -1;
}
- return app_unsubscribe_bridge_id(app, bridge->uniqueid);
+ return app_unsubscribe_bridge_id(app, bridge ? bridge->uniqueid : BRIDGE_ALL);
}
int app_unsubscribe_bridge_id(struct stasis_app *app, const char *bridge_id)
{
- if (!app || !bridge_id) {
+ if (!app) {
return -1;
}
int app_is_subscribed_bridge_id(struct stasis_app *app, const char *bridge_id)
{
- RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
- forwards = ao2_find(app->forwards, bridge_id, OBJ_SEARCH_KEY);
- return forwards != NULL;
+ struct app_forwards *forwards;
+ SCOPED_AO2LOCK(lock, app->forwards);
+
+ forwards = ao2_find(app->forwards, BRIDGE_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (forwards) {
+ ao2_ref(forwards, -1);
+ return 1;
+ }
+
+ if (ast_strlen_zero(bridge_id)) {
+ bridge_id = BRIDGE_ALL;
+ }
+
+ forwards = ao2_find(app->forwards, bridge_id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (forwards) {
+ ao2_ref(forwards, -1);
+ return 1;
+ }
+
+ return 0;
}
static void *bridge_find(const struct stasis_app *app, const char *id)
int app_subscribe_endpoint(struct stasis_app *app, struct ast_endpoint *endpoint)
{
- if (!app || !endpoint) {
+ struct app_forwards *forwards;
+ SCOPED_AO2LOCK(lock, app->forwards);
+
+ if (!app) {
return -1;
- } else {
- RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
- SCOPED_AO2LOCK(lock, app->forwards);
+ }
- forwards = ao2_find(app->forwards, ast_endpoint_get_id(endpoint),
- OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ /* If subscribed to all, don't subscribe again */
+ forwards = ao2_find(app->forwards, ENDPOINT_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (forwards) {
+ ao2_ref(forwards, -1);
+ return 0;
+ }
+ forwards = ao2_find(app->forwards,
+ endpoint ? ast_endpoint_get_id(endpoint) : ENDPOINT_ALL,
+ OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (!forwards) {
+ /* Forwards not found, create one */
+ forwards = forwards_create_endpoint(app, endpoint);
if (!forwards) {
- /* Forwards not found, create one */
- forwards = forwards_create_endpoint(app, endpoint);
- if (!forwards) {
- return -1;
- }
- ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
-
- /* Subscribe for messages */
- messaging_app_subscribe_endpoint(app->name, endpoint, &message_received_handler, app);
+ return -1;
}
+ ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
- ++forwards->interested;
- ast_debug(3, "Endpoint '%s' is %d interested in %s\n", ast_endpoint_get_id(endpoint), forwards->interested, app->name);
- return 0;
+ /* Subscribe for messages */
+ messaging_app_subscribe_endpoint(app->name, endpoint, &message_received_handler, app);
}
+
+ ++forwards->interested;
+ ast_debug(3, "Endpoint '%s' is %d interested in %s\n",
+ endpoint ? ast_endpoint_get_id(endpoint) : "ALL",
+ forwards->interested,
+ app->name);
+
+ ao2_ref(forwards, -1);
+ return 0;
}
static int subscribe_endpoint(struct stasis_app *app, void *obj)
int app_unsubscribe_endpoint_id(struct stasis_app *app, const char *endpoint_id)
{
- if (!app || !endpoint_id) {
+ if (!app) {
return -1;
}
int app_is_subscribed_endpoint_id(struct stasis_app *app, const char *endpoint_id)
{
RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+
+ if (ast_strlen_zero(endpoint_id)) {
+ endpoint_id = ENDPOINT_ALL;
+ }
forwards = ao2_find(app->forwards, endpoint_id, OBJ_SEARCH_KEY);
return forwards != NULL;
}
#include "messaging.h"
/*!
+ * \brief Subscription to all technologies
+ */
+#define TECH_WILDCARD "__AST_ALL_TECH"
+
+/*!
* \brief Number of buckets for the \ref endpoint_subscriptions container
*/
#define ENDPOINTS_NUM_BUCKETS 127
for (i = 0; i < AST_VECTOR_SIZE(&tech_subscriptions); i++) {
sub = AST_VECTOR_GET(&tech_subscriptions, i);
- if (sub && (!strncasecmp(sub->token, buf, strlen(sub->token))
- || !strncasecmp(sub->token, buf, strlen(sub->token)))) {
+ if (!sub) {
+ continue;
+ }
+
+ if (!strcmp(sub->token, TECH_WILDCARD)
+ || !strncasecmp(sub->token, buf, strlen(sub->token))
+ || !strncasecmp(sub->token, buf, strlen(sub->token))) {
ast_rwlock_unlock(&tech_subscriptions_lock);
- sub = NULL; /* No ref bump! */
goto match;
}
sub = ao2_find(endpoint_subscriptions, buf, OBJ_SEARCH_KEY);
if (sub) {
+ ao2_ref(sub, -1);
goto match;
}
return 0;
match:
- ao2_cleanup(sub);
return 1;
}
continue;
}
- if (!strncasecmp(sub->token, buf, strlen(sub->token))) {
+ if (!strcmp(sub->token, TECH_WILDCARD)
+ || !strncasecmp(sub->token, buf, strlen(sub->token))) {
ast_rwlock_unlock(&tech_subscriptions_lock);
ao2_bump(sub);
endpoint_name = buf;
{
struct message_subscription *sub = NULL;
- if (!ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
+ if (endpoint && !ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
sub = ao2_find(endpoint_subscriptions, endpoint, OBJ_SEARCH_KEY);
} else {
int i;
for (i = 0; i < AST_VECTOR_SIZE(&tech_subscriptions); i++) {
sub = AST_VECTOR_GET(&tech_subscriptions, i);
- if (sub && !strcmp(sub->token, ast_endpoint_get_tech(endpoint))) {
+ if (sub && !strcmp(sub->token, endpoint ? ast_endpoint_get_tech(endpoint) : TECH_WILDCARD)) {
ao2_bump(sub);
break;
}
RAII_VAR(struct ast_endpoint *, endpoint, NULL, ao2_cleanup);
endpoint = ast_endpoint_find_by_id(endpoint_id);
- if (!endpoint) {
- return;
- }
-
sub = get_subscription(endpoint);
if (!sub) {
return;
AST_VECTOR_REMOVE_CMP_UNORDERED(&sub->applications, app_name, application_tuple_cmp, ao2_cleanup);
if (AST_VECTOR_SIZE(&sub->applications) == 0) {
- if (!ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
+ if (endpoint && !ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
ao2_unlink(endpoint_subscriptions, sub);
} else {
ast_rwlock_wrlock(&tech_subscriptions_lock);
- AST_VECTOR_REMOVE_CMP_UNORDERED(&tech_subscriptions, ast_endpoint_get_id(endpoint),
+ AST_VECTOR_REMOVE_CMP_UNORDERED(&tech_subscriptions, endpoint ? ast_endpoint_get_id(endpoint) : TECH_WILDCARD,
messaging_subscription_cmp, AST_VECTOR_ELEM_CLEANUP_NOOP);
ast_rwlock_unlock(&tech_subscriptions_lock);
}
ao2_unlock(sub);
ao2_ref(sub, -1);
- ast_debug(3, "App '%s' unsubscribed to messages from endpoint '%s'\n", app_name, ast_endpoint_get_id(endpoint));
+ ast_debug(3, "App '%s' unsubscribed to messages from endpoint '%s'\n", app_name, endpoint ? ast_endpoint_get_id(endpoint) : "-- ALL --");
ast_test_suite_event_notify("StasisMessagingSubscription", "SubState: Unsubscribed\r\nAppName: %s\r\nToken: %s\r\n",
- app_name, ast_endpoint_get_id(endpoint));
+ app_name, endpoint ? ast_endpoint_get_id(endpoint) : "ALL");
}
static struct message_subscription *get_or_create_subscription(struct ast_endpoint *endpoint)
return sub;
}
- sub = message_subscription_alloc(ast_endpoint_get_id(endpoint));
+ sub = message_subscription_alloc(endpoint ? ast_endpoint_get_id(endpoint) : TECH_WILDCARD);
if (!sub) {
return NULL;
}
- if (!ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
+ if (endpoint && !ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
ao2_link(endpoint_subscriptions, sub);
} else {
ast_rwlock_wrlock(&tech_subscriptions_lock);
AST_VECTOR_APPEND(&sub->applications, tuple);
ao2_unlock(sub);
- ast_debug(3, "App '%s' subscribed to messages from endpoint '%s'\n", app_name, ast_endpoint_get_id(endpoint));
+ ast_debug(3, "App '%s' subscribed to messages from endpoint '%s'\n", app_name, endpoint ? ast_endpoint_get_id(endpoint) : "-- ALL --");
ast_test_suite_event_notify("StasisMessagingSubscription", "SubState: Subscribed\r\nAppName: %s\r\nToken: %s\r\n",
- app_name, ast_endpoint_get_id(endpoint));
+ app_name, endpoint ? ast_endpoint_get_id(endpoint) : "ALL");
return 0;
}