stasis: Add internal filtering of messages.
[asterisk/asterisk.git] / main / stasis.c
index fe940d3..93112d9 100644 (file)
@@ -29,8 +29,6 @@
 
 #include "asterisk.h"
 
-ASTERISK_REGISTER_FILE();
-
 #include "asterisk/astobj2.h"
 #include "asterisk/stasis_internal.h"
 #include "asterisk/stasis.h"
@@ -59,6 +57,7 @@ ASTERISK_REGISTER_FILE();
                        </description>
                        <see-also>
                                <ref type="application">UserEvent</ref>
+                               <ref type="managerEvent">UserEvent</ref>
                        </see-also>
                </managerEventInstance>
        </managerEvent>
@@ -371,6 +370,11 @@ const char *stasis_topic_name(const struct stasis_topic *topic)
        return topic->name;
 }
 
+size_t stasis_topic_subscribers(const struct stasis_topic *topic)
+{
+       return AST_VECTOR_SIZE(&topic->subscribers);
+}
+
 /*! \internal */
 struct stasis_subscription {
        /*! Unique ID for this subscription */
@@ -392,6 +396,11 @@ struct stasis_subscription {
        /*! Flag set when final message for sub has been processed.
         *  Be sure join_lock is held before reading/setting. */
        int final_message_processed;
+
+       /*! The message types this subscription is accepting */
+       AST_VECTOR(, char) accepted_message_types;
+       /*! The message filter currently in use */
+       enum stasis_subscription_message_filter filter;
 };
 
 static void subscription_dtor(void *obj)
@@ -410,6 +419,8 @@ static void subscription_dtor(void *obj)
        ast_taskprocessor_unreference(sub->mailbox);
        sub->mailbox = NULL;
        ast_cond_destroy(&sub->join_cond);
+
+       AST_VECTOR_FREE(&sub->accepted_message_types);
 }
 
 /*!
@@ -421,23 +432,29 @@ static void subscription_dtor(void *obj)
 static void subscription_invoke(struct stasis_subscription *sub,
                                  struct stasis_message *message)
 {
-       /* Notify that the final message has been received */
-       if (stasis_subscription_final_message(sub, message)) {
-               SCOPED_AO2LOCK(lock, sub);
+       unsigned int final = stasis_subscription_final_message(sub, message);
+       int message_type_id = stasis_message_type_id(stasis_subscription_change_type());
 
+       /* Notify that the final message has been received */
+       if (final) {
+               ao2_lock(sub);
                sub->final_message_rxed = 1;
                ast_cond_signal(&sub->join_cond);
+               ao2_unlock(sub);
        }
 
-       /* Since sub is mostly immutable, no need to lock sub */
-       sub->callback(sub->data, sub, message);
+       if (!final || sub->filter != STASIS_SUBSCRIPTION_FILTER_SELECTIVE ||
+               (message_type_id < AST_VECTOR_SIZE(&sub->accepted_message_types) && AST_VECTOR_GET(&sub->accepted_message_types, message_type_id))) {
+               /* Since sub is mostly immutable, no need to lock sub */
+               sub->callback(sub->data, sub, message);
+       }
 
        /* Notify that the final message has been processed */
-       if (stasis_subscription_final_message(sub, message)) {
-               SCOPED_AO2LOCK(lock, sub);
-
+       if (final) {
+               ao2_lock(sub);
                sub->final_message_processed = 1;
                ast_cond_signal(&sub->join_cond);
+               ao2_unlock(sub);
        }
 }
 
@@ -455,31 +472,40 @@ struct stasis_subscription *internal_stasis_subscribe(
        int needs_mailbox,
        int use_thread_pool)
 {
-       RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup);
+       struct stasis_subscription *sub;
 
        if (!topic) {
                return NULL;
        }
 
        /* The ao2 lock is used for join_cond. */
-       sub = ao2_t_alloc(sizeof(*sub), subscription_dtor, topic->name);
+       sub = ao2_t_alloc(sizeof(*sub), subscription_dtor, stasis_topic_name(topic));
        if (!sub) {
                return NULL;
        }
        ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid));
 
        if (needs_mailbox) {
-               /* With a small number of subscribers, a thread-per-sub is
-                * acceptable. For larger number of subscribers, a thread
+               char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
+
+               /* Create name with seq number appended. */
+               ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "sub%c:%s",
+                       use_thread_pool ? 'p' : 'm',
+                       stasis_topic_name(topic));
+
+               /*
+                * With a small number of subscribers, a thread-per-sub is
+                * acceptable. For a large number of subscribers, a thread
                 * pool should be used.
                 */
                if (use_thread_pool) {
-                       sub->mailbox = ast_threadpool_serializer(sub->uniqueid, pool);
+                       sub->mailbox = ast_threadpool_serializer(tps_name, pool);
                } else {
-                       sub->mailbox = ast_taskprocessor_get(sub->uniqueid,
-                               TPS_REF_DEFAULT);
+                       sub->mailbox = ast_taskprocessor_get(tps_name, TPS_REF_DEFAULT);
                }
                if (!sub->mailbox) {
+                       ao2_ref(sub, -1);
+
                        return NULL;
                }
                ast_taskprocessor_set_local(sub->mailbox, sub);
@@ -492,13 +518,16 @@ struct stasis_subscription *internal_stasis_subscribe(
        sub->callback = callback;
        sub->data = data;
        ast_cond_init(&sub->join_cond, NULL);
+       sub->filter = STASIS_SUBSCRIPTION_FILTER_NONE;
+       AST_VECTOR_INIT(&sub->accepted_message_types, 0);
 
        if (topic_add_subscription(topic, sub) != 0) {
+               ao2_ref(sub, -1);
+
                return NULL;
        }
        send_subscription_subscribe(topic, sub);
 
-       ao2_ref(sub, +1);
        return sub;
 }
 
@@ -529,18 +558,21 @@ struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
 {
        /* The subscription may be the last ref to this topic. Hold
         * the topic ref open until after the unlock. */
-       RAII_VAR(struct stasis_topic *, topic,
-               ao2_bump(sub ? sub->topic : NULL), ao2_cleanup);
+       struct stasis_topic *topic;
 
        if (!sub) {
                return NULL;
        }
 
+       topic = ao2_bump(sub->topic);
+
        /* We have to remove the subscription first, to ensure the unsubscribe
         * is the final message */
        if (topic_remove_subscription(sub->topic, sub) != 0) {
                ast_log(LOG_ERROR,
                        "Internal error: subscription has invalid topic\n");
+               ao2_cleanup(topic);
+
                return NULL;
        }
 
@@ -549,33 +581,124 @@ struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
 
        /* When all that's done, remove the ref the mailbox has on the sub */
        if (sub->mailbox) {
-               ast_taskprocessor_push(sub->mailbox, sub_cleanup, sub);
+               if (ast_taskprocessor_push(sub->mailbox, sub_cleanup, sub)) {
+                       /* Nothing we can do here, the conditional is just to keep
+                        * the compiler happy that we're not ignoring the result. */
+               }
        }
 
        /* Unsubscribing unrefs the subscription */
        ao2_cleanup(sub);
+       ao2_cleanup(topic);
+
        return NULL;
 }
 
-void stasis_subscription_join(struct stasis_subscription *subscription)
+int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscription,
+       long low_water, long high_water)
 {
+       int res = -1;
+
        if (subscription) {
-               SCOPED_AO2LOCK(lock, subscription);
+               res = ast_taskprocessor_alert_set_levels(subscription->mailbox,
+                       low_water, high_water);
+       }
+       return res;
+}
+
+int stasis_subscription_accept_message_type(struct stasis_subscription *subscription,
+       const struct stasis_message_type *type)
+{
+       if (!subscription) {
+               return -1;
+       }
+
+       ast_assert(type != NULL);
+       ast_assert(stasis_message_type_name(type) != NULL);
+
+       if (!type || !stasis_message_type_name(type)) {
+               /* Filtering is unreliable as this message type is not yet initialized
+                * so force all messages through.
+                */
+               subscription->filter = STASIS_SUBSCRIPTION_FILTER_FORCED_NONE;
+               return 0;
+       }
+
+       ao2_lock(subscription->topic);
+       if (AST_VECTOR_REPLACE(&subscription->accepted_message_types, stasis_message_type_id(type), 1)) {
+               /* We do this for the same reason as above. The subscription can still operate, so allow
+                * it to do so by forcing all messages through.
+                */
+               subscription->filter = STASIS_SUBSCRIPTION_FILTER_FORCED_NONE;
+       }
+       ao2_unlock(subscription->topic);
+
+       return 0;
+}
+
+int stasis_subscription_decline_message_type(struct stasis_subscription *subscription,
+       const struct stasis_message_type *type)
+{
+       if (!subscription) {
+               return -1;
+       }
+
+       ast_assert(type != NULL);
+       ast_assert(stasis_message_type_name(type) != NULL);
+
+       if (!type || !stasis_message_type_name(type)) {
+               return 0;
+       }
+
+       ao2_lock(subscription->topic);
+       if (stasis_message_type_id(type) < AST_VECTOR_SIZE(&subscription->accepted_message_types)) {
+               /* The memory is already allocated so this can't fail */
+               AST_VECTOR_REPLACE(&subscription->accepted_message_types, stasis_message_type_id(type), 0);
+       }
+       ao2_unlock(subscription->topic);
+
+       return 0;
+}
+
+int stasis_subscription_set_filter(struct stasis_subscription *subscription,
+       enum stasis_subscription_message_filter filter)
+{
+       if (!subscription) {
+               return -1;
+       }
+
+       ao2_lock(subscription->topic);
+       if (subscription->filter != STASIS_SUBSCRIPTION_FILTER_FORCED_NONE) {
+               subscription->filter = filter;
+       }
+       ao2_unlock(subscription->topic);
+
+       return 0;
+}
 
+void stasis_subscription_join(struct stasis_subscription *subscription)
+{
+       if (subscription) {
+               ao2_lock(subscription);
                /* Wait until the processed flag has been set */
                while (!subscription->final_message_processed) {
                        ast_cond_wait(&subscription->join_cond,
                                ao2_object_get_lockaddr(subscription));
                }
+               ao2_unlock(subscription);
        }
 }
 
 int stasis_subscription_is_done(struct stasis_subscription *subscription)
 {
        if (subscription) {
-               SCOPED_AO2LOCK(lock, subscription);
+               int ret;
+
+               ao2_lock(subscription);
+               ret = subscription->final_message_rxed;
+               ao2_unlock(subscription);
 
-               return subscription->final_message_rxed;
+               return ret;
        }
 
        /* Null subscription is about as done as you can get */
@@ -603,13 +726,15 @@ int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
        if (sub) {
                size_t i;
                struct stasis_topic *topic = sub->topic;
-               SCOPED_AO2LOCK(lock_topic, topic);
 
+               ao2_lock(topic);
                for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
                        if (AST_VECTOR_GET(&topic->subscribers, i) == sub) {
+                               ao2_unlock(topic);
                                return 1;
                        }
                }
+               ao2_unlock(topic);
        }
 
        return 0;
@@ -650,8 +775,8 @@ int stasis_subscription_final_message(struct stasis_subscription *sub, struct st
 static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
 {
        size_t idx;
-       SCOPED_AO2LOCK(lock, topic);
 
+       ao2_lock(topic);
        /* The reference from the topic to the subscription is shared with
         * the owner of the subscription, which will explicitly unsubscribe
         * to release it.
@@ -664,6 +789,7 @@ static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subs
                topic_add_subscription(
                        AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
        }
+       ao2_unlock(topic);
 
        return 0;
 }
@@ -671,15 +797,18 @@ static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subs
 static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
 {
        size_t idx;
-       SCOPED_AO2LOCK(lock_topic, topic);
+       int res;
 
+       ao2_lock(topic);
        for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
                topic_remove_subscription(
                        AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
        }
-
-       return AST_VECTOR_REMOVE_ELEM_UNORDERED(&topic->subscribers, sub,
+       res = AST_VECTOR_REMOVE_ELEM_UNORDERED(&topic->subscribers, sub,
                AST_VECTOR_ELEM_CLEANUP_NOOP);
+       ao2_unlock(topic);
+
+       return res;
 }
 
 /*!
@@ -742,6 +871,18 @@ static void dispatch_message(struct stasis_subscription *sub,
        struct stasis_message *message,
        int synchronous)
 {
+       /* Determine if this subscription is interested in this message. Note that final
+        * messages are special and are always invoked on the subscription.
+        */
+       if (sub->filter == STASIS_SUBSCRIPTION_FILTER_SELECTIVE) {
+               int message_type_id = stasis_message_type_id(stasis_message_type(message));
+               if ((message_type_id >= AST_VECTOR_SIZE(&sub->accepted_message_types) ||
+                       !AST_VECTOR_GET(&sub->accepted_message_types, message_type_id)) &&
+                       !stasis_subscription_final_message(sub, message)) {
+                       return;
+               }
+       }
+
        if (!sub->mailbox) {
                /* Dispatch directly */
                subscription_invoke(sub, message);
@@ -801,6 +942,11 @@ static void publish_msg(struct stasis_topic *topic,
        ast_assert(topic != NULL);
        ast_assert(message != NULL);
 
+       /* If there are no subscribers don't bother */
+       if (!stasis_topic_subscribers(topic)) {
+               return;
+       }
+
        /*
         * The topic may be unref'ed by the subscription invocation.
         * Make sure we hold onto a reference while dispatching.
@@ -890,7 +1036,7 @@ struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic,
 {
        int res;
        size_t idx;
-       RAII_VAR(struct stasis_forward *, forward, NULL, ao2_cleanup);
+       struct stasis_forward *forward;
 
        if (!from_topic || !to_topic) {
                return NULL;
@@ -903,7 +1049,7 @@ struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic,
 
        /* Forwards to ourselves are implicit. */
        if (to_topic == from_topic) {
-               return ao2_bump(forward);
+               return forward;
        }
 
        forward->from_topic = ao2_bump(from_topic);
@@ -914,6 +1060,7 @@ struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic,
        if (res != 0) {
                ao2_unlock(from_topic);
                ao2_unlock(to_topic);
+               ao2_ref(forward, -1);
                return NULL;
        }
 
@@ -923,7 +1070,7 @@ struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic,
        ao2_unlock(from_topic);
        ao2_unlock(to_topic);
 
-       return ao2_bump(forward);
+       return forward;
 }
 
 static void subscription_change_dtor(void *obj)
@@ -1042,6 +1189,15 @@ static void topic_pool_dtor(void *obj)
 {
        struct stasis_topic_pool *pool = obj;
 
+#ifdef AO2_DEBUG
+       {
+               char *container_name =
+                       ast_alloca(strlen(stasis_topic_name(pool->pool_topic)) + strlen("-pool") + 1);
+               sprintf(container_name, "%s-pool", stasis_topic_name(pool->pool_topic));
+               ao2_container_unregister(container_name);
+       }
+#endif
+
        ao2_cleanup(pool->pool_container);
        pool->pool_container = NULL;
        ao2_cleanup(pool->pool_topic);
@@ -1106,6 +1262,18 @@ static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
        return CMP_MATCH;
 }
 
+#ifdef AO2_DEBUG
+static void topic_pool_prnt_obj(void *v_obj, void *where, ao2_prnt_fn *prnt)
+{
+       struct topic_pool_entry *entry = v_obj;
+
+       if (!entry) {
+               return;
+       }
+       prnt(where, "%s", stasis_topic_name(entry->topic));
+}
+#endif
+
 struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic)
 {
        struct stasis_topic_pool *pool;
@@ -1121,12 +1289,27 @@ struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_t
                ao2_cleanup(pool);
                return NULL;
        }
+
+#ifdef AO2_DEBUG
+       {
+               char *container_name =
+                       ast_alloca(strlen(stasis_topic_name(pooled_topic)) + strlen("-pool") + 1);
+               sprintf(container_name, "%s-pool", stasis_topic_name(pooled_topic));
+               ao2_container_register(container_name, pool->pool_container, topic_pool_prnt_obj);
+       }
+#endif
+
        ao2_ref(pooled_topic, +1);
        pool->pool_topic = pooled_topic;
 
        return pool;
 }
 
+void stasis_topic_pool_delete_topic(struct stasis_topic_pool *pool, const char *topic_name)
+{
+       ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY | OBJ_NODATA | OBJ_UNLINK);
+}
+
 struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
 {
        RAII_VAR(struct topic_pool_entry *, topic_pool_entry, NULL, ao2_cleanup);
@@ -1159,10 +1342,25 @@ struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool,
        return topic_pool_entry->topic;
 }
 
+int stasis_topic_pool_topic_exists(const struct stasis_topic_pool *pool, const char *topic_name)
+{
+       struct topic_pool_entry *topic_pool_entry;
+
+       topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY);
+       if (!topic_pool_entry) {
+               return 0;
+       }
+
+       ao2_ref(topic_pool_entry, -1);
+       return 1;
+}
+
 void stasis_log_bad_type_access(const char *name)
 {
 #ifdef AST_DEVMODE
-       ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name);
+       if (!stasis_message_type_declined(name)) {
+               ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name);
+       }
 #endif
 }
 
@@ -1195,25 +1393,25 @@ static void multi_object_blob_dtor(void *obj)
 struct ast_multi_object_blob *ast_multi_object_blob_create(struct ast_json *blob)
 {
        int type;
-       RAII_VAR(struct ast_multi_object_blob *, multi,
-                       ao2_alloc(sizeof(*multi), multi_object_blob_dtor),
-                       ao2_cleanup);
+       struct ast_multi_object_blob *multi;
 
        ast_assert(blob != NULL);
 
+       multi = ao2_alloc(sizeof(*multi), multi_object_blob_dtor);
        if (!multi) {
                return NULL;
        }
 
        for (type = 0; type < STASIS_UMOS_MAX; ++type) {
                if (AST_VECTOR_INIT(&multi->snapshots[type], 0)) {
+                       ao2_ref(multi, -1);
+
                        return NULL;
                }
        }
 
        multi->blob = ast_json_ref(blob);
 
-       ao2_ref(multi, +1);
        return multi;
 }
 
@@ -1221,19 +1419,18 @@ struct ast_multi_object_blob *ast_multi_object_blob_create(struct ast_json *blob
 void ast_multi_object_blob_add(struct ast_multi_object_blob *multi,
        enum stasis_user_multi_object_snapshot_type type, void *object)
 {
-       if (!multi || !object) {
-               return;
+       if (!multi || !object || AST_VECTOR_APPEND(&multi->snapshots[type], object)) {
+               ao2_cleanup(object);
        }
-       AST_VECTOR_APPEND(&multi->snapshots[type],object);
 }
 
 /*! \brief Publish single channel user event (for app_userevent compatibility) */
 void ast_multi_object_blob_single_channel_publish(struct ast_channel *chan,
        struct stasis_message_type *type, struct ast_json *blob)
 {
-       RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
-       RAII_VAR(struct ast_channel_snapshot *, channel_snapshot, NULL, ao2_cleanup);
-       RAII_VAR(struct ast_multi_object_blob *, multi, NULL, ao2_cleanup);
+       struct stasis_message *message;
+       struct ast_channel_snapshot *channel_snapshot;
+       struct ast_multi_object_blob *multi;
 
        if (!type) {
                return;
@@ -1245,13 +1442,20 @@ void ast_multi_object_blob_single_channel_publish(struct ast_channel *chan,
        }
 
        channel_snapshot = ast_channel_snapshot_create(chan);
-       ao2_ref(channel_snapshot, +1);
+       if (!channel_snapshot) {
+               ao2_ref(multi, -1);
+               return;
+       }
+
+       /* this call steals the channel_snapshot reference */
        ast_multi_object_blob_add(multi, STASIS_UMOS_CHANNEL, channel_snapshot);
 
        message = stasis_message_create(type, multi);
+       ao2_ref(multi, -1);
        if (message) {
                /* app_userevent still publishes to channel */
                stasis_publish(ast_channel_topic(chan), message);
+               ao2_ref(message, -1);
        }
 }
 
@@ -1260,7 +1464,7 @@ static struct ast_json *multi_user_event_to_json(
        struct stasis_message *message,
        const struct stasis_message_sanitizer *sanitize)
 {
-       RAII_VAR(struct ast_json *, out, NULL, ast_json_unref);
+       struct ast_json *out;
        struct ast_multi_object_blob *multi = stasis_message_data(message);
        struct ast_json *blob = multi->blob;
        const struct timeval *tv = stasis_message_timestamp(message);
@@ -1274,8 +1478,8 @@ static struct ast_json *multi_user_event_to_json(
 
        ast_json_object_set(out, "type", ast_json_string_create("ChannelUserevent"));
        ast_json_object_set(out, "timestamp", ast_json_timeval(*tv, NULL));
-       ast_json_object_set(out, "eventname", ast_json_string_create(ast_json_string_get((ast_json_object_get(blob, "eventname")))));
-       ast_json_object_set(out, "userevent", ast_json_deep_copy(blob));
+       ast_json_object_set(out, "eventname", ast_json_ref(ast_json_object_get(blob, "eventname")));
+       ast_json_object_set(out, "userevent", ast_json_ref(blob));
 
        for (type = 0; type < STASIS_UMOS_MAX; ++type) {
                for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
@@ -1302,7 +1506,8 @@ static struct ast_json *multi_user_event_to_json(
                        }
                }
        }
-       return ast_json_ref(out);
+
+       return out;
 }
 
 /*! \internal \brief convert multi object blob to ami string */
@@ -1324,7 +1529,7 @@ static struct ast_str *multi_object_blob_to_ami(void *obj)
 
        for (type = 0; type < STASIS_UMOS_MAX; ++type) {
                for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
-                       char *name = "";
+                       char *name = NULL;
                        void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
                        ami_snapshot = NULL;
 
@@ -1334,11 +1539,11 @@ static struct ast_str *multi_object_blob_to_ami(void *obj)
 
                        switch (type) {
                        case STASIS_UMOS_CHANNEL:
-                               ami_snapshot = ast_manager_build_channel_state_string_prefix(snapshot, name);
+                               ami_snapshot = ast_manager_build_channel_state_string_prefix(snapshot, name ?: "");
                                break;
 
                        case STASIS_UMOS_BRIDGE:
-                               ami_snapshot = ast_manager_build_bridge_state_string_prefix(snapshot, name);
+                               ami_snapshot = ast_manager_build_bridge_state_string_prefix(snapshot, name ?: "");
                                break;
 
                        case STASIS_UMOS_ENDPOINT:
@@ -1349,6 +1554,7 @@ static struct ast_str *multi_object_blob_to_ami(void *obj)
                                ast_str_append(&ami_str, 0, "%s", ast_str_buffer(ami_snapshot));
                                ast_free(ami_snapshot);
                        }
+                       ast_free(name);
                }
        }
 
@@ -1415,8 +1621,8 @@ static struct aco_type threadpool_option = {
        .type = ACO_GLOBAL,
        .name = "threadpool",
        .item_offset = offsetof(struct stasis_config, threadpool_options),
-       .category = "^threadpool$",
-       .category_match = ACO_WHITELIST,
+       .category = "threadpool",
+       .category_match = ACO_WHITELIST_EXACT,
 };
 
 static struct aco_type *threadpool_options[] = ACO_TYPES(&threadpool_option);
@@ -1426,8 +1632,8 @@ static struct aco_type declined_option = {
        .type = ACO_GLOBAL,
        .name = "declined_message_types",
        .item_offset = offsetof(struct stasis_config, declined_message_types),
-       .category_match = ACO_WHITELIST,
-       .category = "^declined_message_types$",
+       .category_match = ACO_WHITELIST_EXACT,
+       .category = "declined_message_types",
 };
 
 struct aco_type *declined_options[] = ACO_TYPES(&declined_option);
@@ -1494,17 +1700,19 @@ static void *stasis_config_alloc(void)
 
 int stasis_message_type_declined(const char *name)
 {
-       RAII_VAR(struct stasis_config *, cfg, ao2_global_obj_ref(globals), ao2_cleanup);
+       struct stasis_config *cfg = ao2_global_obj_ref(globals);
        char *name_in_declined;
        int res;
 
        if (!cfg || !cfg->declined_message_types) {
+               ao2_cleanup(cfg);
                return 0;
        }
 
        name_in_declined = ao2_find(cfg->declined_message_types->declined, name, OBJ_SEARCH_KEY);
        res = name_in_declined ? 1 : 0;
        ao2_cleanup(name_in_declined);
+       ao2_ref(cfg, -1);
        if (res) {
                ast_log(LOG_NOTICE, "Declining to allocate Stasis message type '%s' due to configuration\n", name);
        }
@@ -1550,7 +1758,7 @@ static void stasis_cleanup(void)
 
 int stasis_init(void)
 {
-       RAII_VAR(struct stasis_config *, cfg, NULL, ao2_cleanup);
+       struct stasis_config *cfg;
        int cache_init;
        struct ast_threadpool_options threadpool_opts = { 0, };
 
@@ -1586,11 +1794,14 @@ int stasis_init(void)
                if (aco_set_defaults(&threadpool_option, "threadpool", default_cfg->threadpool_options)) {
                        ast_log(LOG_ERROR, "Failed to initialize defaults on Stasis configuration object\n");
                        ao2_ref(default_cfg, -1);
+
                        return -1;
                }
 
                if (aco_set_defaults(&declined_option, "declined_message_types", default_cfg->declined_message_types)) {
                        ast_log(LOG_ERROR, "Failed to load stasis.conf and failed to initialize defaults.\n");
+                       ao2_ref(default_cfg, -1);
+
                        return -1;
                }
 
@@ -1601,6 +1812,7 @@ int stasis_init(void)
                cfg = ao2_global_obj_ref(globals);
                if (!cfg) {
                        ast_log(LOG_ERROR, "Failed to obtain Stasis configuration object\n");
+
                        return -1;
                }
        }
@@ -1611,8 +1823,10 @@ int stasis_init(void)
        threadpool_opts.max_size = cfg->threadpool_options->max_size;
        threadpool_opts.idle_timeout = cfg->threadpool_options->idle_timeout_sec;
        pool = ast_threadpool_create("stasis-core", NULL, &threadpool_opts);
+       ao2_ref(cfg, -1);
        if (!pool) {
                ast_log(LOG_ERROR, "Failed to create 'stasis-core' threadpool\n");
+
                return -1;
        }
 
@@ -1630,4 +1844,3 @@ int stasis_init(void)
 
        return 0;
 }
-