install_prereq: Add SUSE.
[asterisk/asterisk.git] / res / res_stasis_device_state.c
index 01b10d3..be09b15 100644 (file)
@@ -23,8 +23,6 @@
 
 #include "asterisk.h"
 
-ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
-
 #include "asterisk/astdb.h"
 #include "asterisk/astobj2.h"
 #include "asterisk/module.h"
@@ -44,6 +42,9 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
 /*! Number of hash buckets for device state subscriptions */
 #define DEVICE_STATE_BUCKETS 37
 
+/*! The key used for tracking a subscription to all device states */
+#define DEVICE_STATE_ALL "__AST_DEVICE_STATE_ALL_TOPIC"
+
 /*! Container for subscribed device states */
 static struct ao2_container *device_state_subscriptions;
 
@@ -105,18 +106,23 @@ static int device_state_subscriptions_cmp(void *obj, void *arg, int flags)
 static void device_state_subscription_destroy(void *obj)
 {
        struct device_state_subscription *sub = obj;
-       sub->sub = stasis_unsubscribe(sub->sub);
        ast_string_field_free_memory(sub);
 }
 
 static struct device_state_subscription *device_state_subscription_create(
        const struct stasis_app *app, const char *device_name)
 {
-       struct device_state_subscription *sub = ao2_alloc(
-               sizeof(*sub), device_state_subscription_destroy);
+       struct device_state_subscription *sub;
        const char *app_name = stasis_app_name(app);
-       size_t size = strlen(device_name) + strlen(app_name) + 2;
+       size_t size;
+
+       if (ast_strlen_zero(device_name)) {
+               device_name = DEVICE_STATE_ALL;
+       }
+
+       size = strlen(device_name) + strlen(app_name) + 2;
 
+       sub = ao2_alloc(sizeof(*sub), device_state_subscription_destroy);
        if (!sub) {
                return NULL;
        }
@@ -139,13 +145,16 @@ static struct device_state_subscription *find_device_state_subscription(
                .device_name = name
        };
 
-       return ao2_find(device_state_subscriptions, &dummy_sub, OBJ_SEARCH_OBJECT);
+       return ao2_find(device_state_subscriptions, &dummy_sub, OBJ_SEARCH_OBJECT | OBJ_NOLOCK);
 }
 
 static void remove_device_state_subscription(
        struct device_state_subscription *sub)
 {
-       ao2_unlink(device_state_subscriptions, sub);
+       if (sub->sub) {
+               sub->sub = stasis_unsubscribe_and_join(sub->sub);
+       }
+       ao2_unlink_flags(device_state_subscriptions, sub, OBJ_NOLOCK);
 }
 
 struct ast_json *stasis_app_device_state_to_json(
@@ -159,22 +168,22 @@ struct ast_json *stasis_app_device_state_to_json(
 struct ast_json *stasis_app_device_states_to_json(void)
 {
        struct ast_json *array = ast_json_array_create();
-       RAII_VAR(struct ast_db_entry *, tree,
-                ast_db_gettree(DEVICE_STATE_FAMILY, NULL), ast_db_freetree);
+       struct ast_db_entry *tree;
        struct ast_db_entry *entry;
 
+       tree = ast_db_gettree(DEVICE_STATE_FAMILY, NULL);
        for (entry = tree; entry; entry = entry->next) {
                const char *name = strrchr(entry->key, '/');
+
                if (!ast_strlen_zero(name)) {
-                       struct ast_str *device = ast_str_alloca(DEVICE_STATE_SIZE);
-                       ast_str_set(&device, 0, "%s%s",
-                                   DEVICE_STATE_SCHEME_STASIS, ++name);
-                       ast_json_array_append(
-                               array, stasis_app_device_state_to_json(
-                                       ast_str_buffer(device),
-                                       ast_device_state(ast_str_buffer(device))));
+                       char device[DEVICE_STATE_SIZE];
+
+                       snprintf(device, sizeof(device), "%s%s", DEVICE_STATE_SCHEME_STASIS, ++name);
+                       ast_json_array_append(array,
+                               stasis_app_device_state_to_json(device, ast_device_state(device)));
                }
        }
+       ast_db_freetree(tree);
 
        return array;
 }
@@ -282,7 +291,7 @@ static void populate_cache(void)
 
 static enum ast_device_state stasis_device_state_cb(const char *data)
 {
-       char buf[DEVICE_STATE_SIZE] = "";
+       char buf[DEVICE_STATE_SIZE];
 
        ast_db_get(DEVICE_STATE_FAMILY, data, buf, sizeof(buf));
 
@@ -294,6 +303,12 @@ static void device_state_cb(void *data, struct stasis_subscription *sub,
 {
        struct ast_device_state_message *device_state;
 
+       if (stasis_subscription_final_message(sub, msg)) {
+               /* Remove stasis subscription's reference to device_state_subscription */
+               ao2_ref(data, -1);
+               return;
+       }
+
        if (ast_device_state_message_type() != stasis_message_type(msg)) {
                return;
        }
@@ -314,39 +329,91 @@ static void *find_device_state(const struct stasis_app *app, const char *name)
 
 static int is_subscribed_device_state(struct stasis_app *app, const char *name)
 {
-       RAII_VAR(struct device_state_subscription *, sub,
-                find_device_state_subscription(app, name), ao2_cleanup);
-       return sub != NULL;
+       struct device_state_subscription *sub;
+
+       sub = find_device_state_subscription(app, DEVICE_STATE_ALL);
+       if (sub) {
+               ao2_ref(sub, -1);
+               return 1;
+       }
+
+       sub = find_device_state_subscription(app, name);
+       if (sub) {
+               ao2_ref(sub, -1);
+               return 1;
+       }
+
+       return 0;
+}
+
+static int is_subscribed_device_state_lock(struct stasis_app *app, const char *name)
+{
+       int is_subscribed;
+
+       ao2_lock(device_state_subscriptions);
+       is_subscribed = is_subscribed_device_state(app, name);
+       ao2_unlock(device_state_subscriptions);
+
+       return is_subscribed;
 }
 
 static int subscribe_device_state(struct stasis_app *app, void *obj)
 {
        struct device_state_subscription *sub = obj;
+       struct stasis_topic *topic;
+
+       if (!sub) {
+               sub = device_state_subscription_create(app, NULL);
+               if (!sub) {
+                       return -1;
+               }
+       }
+
+       if (strcmp(sub->device_name, DEVICE_STATE_ALL)) {
+               topic = ast_device_state_topic(sub->device_name);
+       } else {
+               topic = ast_device_state_topic_all();
+       }
 
-       ast_debug(3, "Subscribing to device %s", sub->device_name);
+       ao2_lock(device_state_subscriptions);
 
        if (is_subscribed_device_state(app, sub->device_name)) {
-               ast_log(LOG_WARNING, "Already subscribed to %s\n", sub->device_name);
-               return -1;
+               ao2_unlock(device_state_subscriptions);
+               ast_debug(3, "App %s is already subscribed to %s\n", stasis_app_name(app), sub->device_name);
+               return 0;
        }
 
-       if (!(sub->sub = stasis_subscribe(
-                       ast_device_state_topic(sub->device_name),
-                       device_state_cb, sub))) {
+       ast_debug(3, "Subscribing to device %s\n", sub->device_name);
+
+       sub->sub = stasis_subscribe_pool(topic, device_state_cb, ao2_bump(sub));
+       if (!sub->sub) {
+               ao2_unlock(device_state_subscriptions);
                ast_log(LOG_ERROR, "Unable to subscribe to device %s\n",
                        sub->device_name);
+               /* Reference we added when attempting to stasis_subscribe_pool */
+               ao2_ref(sub, -1);
                return -1;
        }
 
-       ao2_link(device_state_subscriptions, sub);
+       ao2_link_flags(device_state_subscriptions, sub, OBJ_NOLOCK);
+       ao2_unlock(device_state_subscriptions);
+
        return 0;
 }
 
 static int unsubscribe_device_state(struct stasis_app *app, const char *name)
 {
-       RAII_VAR(struct device_state_subscription *, sub,
-                find_device_state_subscription(app, name), ao2_cleanup);
-       remove_device_state_subscription(sub);
+       struct device_state_subscription *sub;
+
+       ao2_lock(device_state_subscriptions);
+       sub = find_device_state_subscription(app, name);
+       if (sub) {
+               remove_device_state_subscription(sub);
+       }
+       ao2_unlock(device_state_subscriptions);
+
+       ao2_cleanup(sub);
+
        return 0;
 }
 
@@ -379,7 +446,7 @@ struct stasis_app_event_source device_state_event_source = {
        .find = find_device_state,
        .subscribe = subscribe_device_state,
        .unsubscribe = unsubscribe_device_state,
-       .is_subscribed = is_subscribed_device_state,
+       .is_subscribed = is_subscribed_device_state_lock,
        .to_json = devices_to_json
 };
 
@@ -388,13 +455,14 @@ static int load_module(void)
        populate_cache();
        if (ast_devstate_prov_add(DEVICE_STATE_PROVIDER_STASIS,
                                  stasis_device_state_cb)) {
-               return AST_MODULE_LOAD_FAILURE;
+               return AST_MODULE_LOAD_DECLINE;
        }
 
        if (!(device_state_subscriptions = ao2_container_alloc(
                      DEVICE_STATE_BUCKETS, device_state_subscriptions_hash,
                      device_state_subscriptions_cmp))) {
-               return AST_MODULE_LOAD_FAILURE;
+               ast_devstate_prov_del(DEVICE_STATE_PROVIDER_STASIS);
+               return AST_MODULE_LOAD_DECLINE;
        }
 
        stasis_app_register_event_source(&device_state_event_source);
@@ -411,6 +479,8 @@ static int unload_module(void)
 }
 
 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS, "Stasis application device state support",
+       .support_level = AST_MODULE_SUPPORT_CORE,
        .load = load_module,
        .unload = unload_module,
-       .nonoptreq = "res_stasis");
+       .requires = "res_stasis",
+);