stasis: Add internal filtering of messages.
[asterisk/asterisk.git] / main / stasis.c
index d776979..93112d9 100644 (file)
 
 #include "asterisk.h"
 
-ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
-
 #include "asterisk/astobj2.h"
+#include "asterisk/stasis_internal.h"
 #include "asterisk/stasis.h"
-#include "asterisk/threadpool.h"
 #include "asterisk/taskprocessor.h"
+#include "asterisk/threadpool.h"
 #include "asterisk/utils.h"
 #include "asterisk/uuid.h"
+#include "asterisk/vector.h"
+#include "asterisk/stasis_channels.h"
+#include "asterisk/stasis_bridges.h"
+#include "asterisk/stasis_endpoints.h"
+#include "asterisk/config_options.h"
+
+/*** DOCUMENTATION
+       <managerEvent language="en_US" name="UserEvent">
+               <managerEventInstance class="EVENT_FLAG_USER">
+                       <synopsis>A user defined event raised from the dialplan.</synopsis>
+                       <syntax>
+                               <channel_snapshot/>
+                               <parameter name="UserEvent">
+                                       <para>The event name, as specified in the dialplan.</para>
+                               </parameter>
+                       </syntax>
+                       <description>
+                               <para>Event may contain additional arbitrary parameters in addition to optional bridge and endpoint snapshots.  Multiple snapshots of the same type are prefixed with a numeric value.</para>
+                       </description>
+                       <see-also>
+                               <ref type="application">UserEvent</ref>
+                               <ref type="managerEvent">UserEvent</ref>
+                       </see-also>
+               </managerEventInstance>
+       </managerEvent>
+       <configInfo name="stasis" language="en_US">
+               <configFile name="stasis.conf">
+                       <configObject name="threadpool">
+                               <synopsis>Settings that configure the threadpool Stasis uses to deliver some messages.</synopsis>
+                               <configOption name="initial_size" default="5">
+                                       <synopsis>Initial number of threads in the message bus threadpool.</synopsis>
+                               </configOption>
+                               <configOption name="idle_timeout_sec" default="20">
+                                       <synopsis>Number of seconds before an idle thread is disposed of.</synopsis>
+                               </configOption>
+                               <configOption name="max_size" default="50">
+                                       <synopsis>Maximum number of threads in the threadpool.</synopsis>
+                               </configOption>
+                       </configObject>
+                       <configObject name="declined_message_types">
+                               <synopsis>Stasis message types for which to decline creation.</synopsis>
+                               <configOption name="decline">
+                                       <synopsis>The message type to decline.</synopsis>
+                                       <description>
+                                               <para>This configuration option defines the name of the Stasis
+                                               message type that Asterisk is forbidden from creating and can be
+                                               specified as many times as necessary to achieve the desired result.</para>
+                                               <enumlist>
+                                                       <enum name="stasis_app_recording_snapshot_type" />
+                                                       <enum name="stasis_app_playback_snapshot_type" />
+                                                       <enum name="stasis_test_message_type" />
+                                                       <enum name="confbridge_start_type" />
+                                                       <enum name="confbridge_end_type" />
+                                                       <enum name="confbridge_join_type" />
+                                                       <enum name="confbridge_leave_type" />
+                                                       <enum name="confbridge_start_record_type" />
+                                                       <enum name="confbridge_stop_record_type" />
+                                                       <enum name="confbridge_mute_type" />
+                                                       <enum name="confbridge_unmute_type" />
+                                                       <enum name="confbridge_talking_type" />
+                                                       <enum name="cel_generic_type" />
+                                                       <enum name="ast_bridge_snapshot_type" />
+                                                       <enum name="ast_bridge_merge_message_type" />
+                                                       <enum name="ast_channel_entered_bridge_type" />
+                                                       <enum name="ast_channel_left_bridge_type" />
+                                                       <enum name="ast_blind_transfer_type" />
+                                                       <enum name="ast_attended_transfer_type" />
+                                                       <enum name="ast_endpoint_snapshot_type" />
+                                                       <enum name="ast_endpoint_state_type" />
+                                                       <enum name="ast_device_state_message_type" />
+                                                       <enum name="ast_test_suite_message_type" />
+                                                       <enum name="ast_mwi_state_type" />
+                                                       <enum name="ast_mwi_vm_app_type" />
+                                                       <enum name="ast_format_register_type" />
+                                                       <enum name="ast_format_unregister_type" />
+                                                       <enum name="ast_manager_get_generic_type" />
+                                                       <enum name="ast_parked_call_type" />
+                                                       <enum name="ast_channel_snapshot_type" />
+                                                       <enum name="ast_channel_dial_type" />
+                                                       <enum name="ast_channel_varset_type" />
+                                                       <enum name="ast_channel_hangup_request_type" />
+                                                       <enum name="ast_channel_dtmf_begin_type" />
+                                                       <enum name="ast_channel_dtmf_end_type" />
+                                                       <enum name="ast_channel_hold_type" />
+                                                       <enum name="ast_channel_unhold_type" />
+                                                       <enum name="ast_channel_chanspy_start_type" />
+                                                       <enum name="ast_channel_chanspy_stop_type" />
+                                                       <enum name="ast_channel_fax_type" />
+                                                       <enum name="ast_channel_hangup_handler_type" />
+                                                       <enum name="ast_channel_moh_start_type" />
+                                                       <enum name="ast_channel_moh_stop_type" />
+                                                       <enum name="ast_channel_monitor_start_type" />
+                                                       <enum name="ast_channel_monitor_stop_type" />
+                                                       <enum name="ast_channel_agent_login_type" />
+                                                       <enum name="ast_channel_agent_logoff_type" />
+                                                       <enum name="ast_channel_talking_start" />
+                                                       <enum name="ast_channel_talking_stop" />
+                                                       <enum name="ast_security_event_type" />
+                                                       <enum name="ast_named_acl_change_type" />
+                                                       <enum name="ast_local_bridge_type" />
+                                                       <enum name="ast_local_optimization_begin_type" />
+                                                       <enum name="ast_local_optimization_end_type" />
+                                                       <enum name="stasis_subscription_change_type" />
+                                                       <enum name="ast_multi_user_event_type" />
+                                                       <enum name="stasis_cache_clear_type" />
+                                                       <enum name="stasis_cache_update_type" />
+                                                       <enum name="ast_network_change_type" />
+                                                       <enum name="ast_system_registry_type" />
+                                                       <enum name="ast_cc_available_type" />
+                                                       <enum name="ast_cc_offertimerstart_type" />
+                                                       <enum name="ast_cc_requested_type" />
+                                                       <enum name="ast_cc_requestacknowledged_type" />
+                                                       <enum name="ast_cc_callerstopmonitoring_type" />
+                                                       <enum name="ast_cc_callerstartmonitoring_type" />
+                                                       <enum name="ast_cc_callerrecalling_type" />
+                                                       <enum name="ast_cc_recallcomplete_type" />
+                                                       <enum name="ast_cc_failure_type" />
+                                                       <enum name="ast_cc_monitorfailed_type" />
+                                                       <enum name="ast_presence_state_message_type" />
+                                                       <enum name="ast_rtp_rtcp_sent_type" />
+                                                       <enum name="ast_rtp_rtcp_received_type" />
+                                                       <enum name="ast_call_pickup_type" />
+                                                       <enum name="aoc_s_type" />
+                                                       <enum name="aoc_d_type" />
+                                                       <enum name="aoc_e_type" />
+                                                       <enum name="dahdichannel_type" />
+                                                       <enum name="mcid_type" />
+                                                       <enum name="session_timeout_type" />
+                                                       <enum name="cdr_read_message_type" />
+                                                       <enum name="cdr_write_message_type" />
+                                                       <enum name="cdr_prop_write_message_type" />
+                                                       <enum name="corosync_ping_message_type" />
+                                                       <enum name="agi_exec_start_type" />
+                                                       <enum name="agi_exec_end_type" />
+                                                       <enum name="agi_async_start_type" />
+                                                       <enum name="agi_async_exec_type" />
+                                                       <enum name="agi_async_end_type" />
+                                                       <enum name="queue_caller_join_type" />
+                                                       <enum name="queue_caller_leave_type" />
+                                                       <enum name="queue_caller_abandon_type" />
+                                                       <enum name="queue_member_status_type" />
+                                                       <enum name="queue_member_added_type" />
+                                                       <enum name="queue_member_removed_type" />
+                                                       <enum name="queue_member_pause_type" />
+                                                       <enum name="queue_member_penalty_type" />
+                                                       <enum name="queue_member_ringinuse_type" />
+                                                       <enum name="queue_agent_called_type" />
+                                                       <enum name="queue_agent_connect_type" />
+                                                       <enum name="queue_agent_complete_type" />
+                                                       <enum name="queue_agent_dump_type" />
+                                                       <enum name="queue_agent_ringnoanswer_type" />
+                                                       <enum name="meetme_join_type" />
+                                                       <enum name="meetme_leave_type" />
+                                                       <enum name="meetme_end_type" />
+                                                       <enum name="meetme_mute_type" />
+                                                       <enum name="meetme_talking_type" />
+                                                       <enum name="meetme_talk_request_type" />
+                                                       <enum name="appcdr_message_type" />
+                                                       <enum name="forkcdr_message_type" />
+                                                       <enum name="cdr_sync_message_type" />
+                                               </enumlist>
+                                       </description>
+                               </configOption>
+                       </configObject>
+               </configFile>
+       </configInfo>
+***/
+
+/*!
+ * \page stasis-impl Stasis Implementation Notes
+ *
+ * \par Reference counting
+ *
+ * Stasis introduces a number of objects, which are tightly related to one
+ * another. Because we rely on ref-counting for memory management, understanding
+ * these relationships is important to understanding this code.
+ *
+ * \code{.txt}
+ *
+ *   stasis_topic <----> stasis_subscription
+ *             ^          ^
+ *              \        /
+ *               \      /
+ *               dispatch
+ *                  |
+ *                  |
+ *                  v
+ *            stasis_message
+ *                  |
+ *                  |
+ *                  v
+ *          stasis_message_type
+ *
+ * \endcode
+ *
+ * The most troubling thing in this chart is the cyclic reference between
+ * stasis_topic and stasis_subscription. This is both unfortunate, and
+ * necessary. Topics need the subscription in order to dispatch messages;
+ * subscriptions need the topics to unsubscribe and check subscription status.
+ *
+ * The cycle is broken by stasis_unsubscribe(). The unsubscribe will remove the
+ * topic's reference to a subscription. When the subcription is destroyed, it
+ * will remove its reference to the topic.
+ *
+ * This means that until a subscription has be explicitly unsubscribed, it will
+ * not be destroyed. Neither will a topic be destroyed while it has subscribers.
+ * The destructors of both have assertions regarding this to catch ref-counting
+ * problems where a subscription or topic has had an extra ao2_cleanup().
+ *
+ * The \ref dispatch object is a transient object, which is posted to a
+ * subscription's taskprocessor to send a message to the subscriber. They have
+ * short life cycles, allocated on one thread, destroyed on another.
+ *
+ * During shutdown, or the deletion of a domain object, there are a flurry of
+ * ao2_cleanup()s on subscriptions and topics, as the final in-flight messages
+ * are processed. Any one of these cleanups could be the one to actually destroy
+ * a given object, so care must be taken to ensure that an object isn't
+ * referenced after an ao2_cleanup(). This includes the implicit ao2_unlock()
+ * that might happen when a RAII_VAR() goes out of scope.
+ *
+ * \par Typical life cycles
+ *
+ *  \li stasis_topic - There are several topics which live for the duration of
+ *      the Asterisk process (ast_channel_topic_all(), etc.) but most of these
+ *      are actually fed by shorter-lived topics whose lifetime is associated
+ *      with some domain object (like ast_channel_topic() for a given
+ *      ast_channel).
+ *
+ *  \li stasis_subscription - Subscriptions have a similar mix of lifetimes as
+ *      topics, for similar reasons.
+ *
+ *  \li dispatch - Very short lived; just long enough to post a message to a
+ *      subscriber.
+ *
+ *  \li stasis_message - Short to intermediate lifetimes, but that is mostly
+ *      irrelevant. Messages are strictly data and have no behavior associated
+ *      with them, so it doesn't really matter if/when they are destroyed. By
+ *      design, a component could hold a ref to a message forever without any
+ *      ill consequences (aside from consuming more memory).
+ *
+ *  \li stasis_message_type - Long life cycles, typically only destroyed on
+ *      module unloading or _clean_ process exit.
+ *
+ * \par Subscriber shutdown sequencing
+ *
+ * Subscribers are sensitive to shutdown sequencing, specifically in how the
+ * reference message types. This is fully detailed on the wiki at
+ * https://wiki.asterisk.org/wiki/x/K4BqAQ.
+ *
+ * In short, the lifetime of the \a data (and \a callback, if in a module) must
+ * be held until the stasis_subscription_final_message() has been received.
+ * Depending on the structure of the subscriber code, this can be handled by
+ * using stasis_subscription_final_message() to free resources on the final
+ * message, or using stasis_subscription_join()/stasis_unsubscribe_and_join() to
+ * block until the unsubscribe has completed.
+ */
 
 /*! Initial size of the subscribers list. */
 #define INITIAL_SUBSCRIBERS_MAX 4
@@ -44,57 +299,69 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
 /*! The number of buckets to use for topic pools */
 #define TOPIC_POOL_BUCKETS 57
 
-/*! Threadpool for dispatching notifications to subscribers */
+/*! Thread pool for topics that don't want a dedicated taskprocessor */
 static struct ast_threadpool *pool;
 
-static struct stasis_message_type *__subscription_change_message_type;
+STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type);
 
 /*! \internal */
 struct stasis_topic {
        char *name;
-       /*! Variable length array of the subscribers (raw pointer to avoid cyclic references) */
-       struct stasis_subscription **subscribers;
-       /*! Allocated length of the subscribers array */
-       size_t num_subscribers_max;
-       /*! Current size of the subscribers array */
-       size_t num_subscribers_current;
+       /*! Variable length array of the subscribers */
+       AST_VECTOR(, struct stasis_subscription *) subscribers;
+
+       /*! Topics forwarding into this topic */
+       AST_VECTOR(, struct stasis_topic *) upstream_topics;
 };
 
 /* Forward declarations for the tightly-coupled subscription object */
-struct stasis_subscription;
-static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
+static int topic_add_subscription(struct stasis_topic *topic,
+       struct stasis_subscription *sub);
+
+static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
+
+/*! \brief Lock two topics. */
+#define topic_lock_both(topic1, topic2) \
+       do { \
+               ao2_lock(topic1); \
+               while (ao2_trylock(topic2)) { \
+                       AO2_DEADLOCK_AVOIDANCE(topic1); \
+               } \
+       } while (0)
 
 static void topic_dtor(void *obj)
 {
        struct stasis_topic *topic = obj;
+
+       /* Subscribers hold a reference to topics, so they should all be
+        * unsubscribed before we get here. */
+       ast_assert(AST_VECTOR_SIZE(&topic->subscribers) == 0);
+
        ast_free(topic->name);
        topic->name = NULL;
-       ast_free(topic->subscribers);
-       topic->subscribers = NULL;
+
+       AST_VECTOR_FREE(&topic->subscribers);
+       AST_VECTOR_FREE(&topic->upstream_topics);
 }
 
 struct stasis_topic *stasis_topic_create(const char *name)
 {
-       RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
-
-       topic = ao2_alloc(sizeof(*topic), topic_dtor);
+       struct stasis_topic *topic;
+       int res = 0;
 
+       topic = ao2_t_alloc(sizeof(*topic), topic_dtor, name);
        if (!topic) {
                return NULL;
        }
 
        topic->name = ast_strdup(name);
-       if (!topic->name) {
-               return NULL;
-       }
-
-       topic->num_subscribers_max = INITIAL_SUBSCRIBERS_MAX;
-       topic->subscribers = ast_calloc(topic->num_subscribers_max, sizeof(*topic->subscribers));
-       if (!topic->subscribers) {
+       res |= AST_VECTOR_INIT(&topic->subscribers, INITIAL_SUBSCRIBERS_MAX);
+       res |= AST_VECTOR_INIT(&topic->upstream_topics, 0);
+       if (!topic->name || res) {
+               ao2_cleanup(topic);
                return NULL;
        }
 
-       ao2_ref(topic, +1);
        return topic;
 }
 
@@ -103,10 +370,15 @@ const char *stasis_topic_name(const struct stasis_topic *topic)
        return topic->name;
 }
 
+size_t stasis_topic_subscribers(const struct stasis_topic *topic)
+{
+       return AST_VECTOR_SIZE(&topic->subscribers);
+}
+
 /*! \internal */
 struct stasis_subscription {
        /*! Unique ID for this subscription */
-       char *uniqueid;
+       char uniqueid[AST_UUID_STR_LEN];
        /*! Topic subscribed to. */
        struct stasis_topic *topic;
        /*! Mailbox for processing incoming messages. */
@@ -115,80 +387,337 @@ struct stasis_subscription {
        stasis_subscription_cb callback;
        /*! Data pointer to be handed to the callback. */
        void *data;
+
+       /*! Condition for joining with subscription. */
+       ast_cond_t join_cond;
+       /*! Flag set when final message for sub has been received.
+        *  Be sure join_lock is held before reading/setting. */
+       int final_message_rxed;
+       /*! Flag set when final message for sub has been processed.
+        *  Be sure join_lock is held before reading/setting. */
+       int final_message_processed;
+
+       /*! The message types this subscription is accepting */
+       AST_VECTOR(, char) accepted_message_types;
+       /*! The message filter currently in use */
+       enum stasis_subscription_message_filter filter;
 };
 
 static void subscription_dtor(void *obj)
 {
        struct stasis_subscription *sub = obj;
+
+       /* Subscriptions need to be manually unsubscribed before destruction
+        * b/c there's a cyclic reference between topics and subscriptions */
        ast_assert(!stasis_subscription_is_subscribed(sub));
-       ast_free(sub->uniqueid);
-       sub->uniqueid = NULL;
+       /* If there are any messages in flight to this subscription; that would
+        * be bad. */
+       ast_assert(stasis_subscription_is_done(sub));
+
        ao2_cleanup(sub->topic);
        sub->topic = NULL;
        ast_taskprocessor_unreference(sub->mailbox);
        sub->mailbox = NULL;
+       ast_cond_destroy(&sub->join_cond);
+
+       AST_VECTOR_FREE(&sub->accepted_message_types);
+}
+
+/*!
+ * \brief Invoke the subscription's callback.
+ * \param sub Subscription to invoke.
+ * \param topic Topic message was published to.
+ * \param message Message to send.
+ */
+static void subscription_invoke(struct stasis_subscription *sub,
+                                 struct stasis_message *message)
+{
+       unsigned int final = stasis_subscription_final_message(sub, message);
+       int message_type_id = stasis_message_type_id(stasis_subscription_change_type());
+
+       /* Notify that the final message has been received */
+       if (final) {
+               ao2_lock(sub);
+               sub->final_message_rxed = 1;
+               ast_cond_signal(&sub->join_cond);
+               ao2_unlock(sub);
+       }
+
+       if (!final || sub->filter != STASIS_SUBSCRIPTION_FILTER_SELECTIVE ||
+               (message_type_id < AST_VECTOR_SIZE(&sub->accepted_message_types) && AST_VECTOR_GET(&sub->accepted_message_types, message_type_id))) {
+               /* Since sub is mostly immutable, no need to lock sub */
+               sub->callback(sub->data, sub, message);
+       }
+
+       /* Notify that the final message has been processed */
+       if (final) {
+               ao2_lock(sub);
+               sub->final_message_processed = 1;
+               ast_cond_signal(&sub->join_cond);
+               ao2_unlock(sub);
+       }
 }
 
-static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description);
+static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
+static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
 
-struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data)
+void stasis_subscription_cb_noop(void *data, struct stasis_subscription *sub, struct stasis_message *message)
 {
-       RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup);
-       RAII_VAR(struct ast_uuid *, id, NULL, ast_free);
-       char uniqueid[AST_UUID_STR_LEN];
+}
 
-       sub = ao2_alloc(sizeof(*sub), subscription_dtor);
-       if (!sub) {
+struct stasis_subscription *internal_stasis_subscribe(
+       struct stasis_topic *topic,
+       stasis_subscription_cb callback,
+       void *data,
+       int needs_mailbox,
+       int use_thread_pool)
+{
+       struct stasis_subscription *sub;
+
+       if (!topic) {
                return NULL;
        }
 
-       id = ast_uuid_generate();
-       if (!id) {
-               ast_log(LOG_ERROR, "UUID generation failed\n");
+       /* The ao2 lock is used for join_cond. */
+       sub = ao2_t_alloc(sizeof(*sub), subscription_dtor, stasis_topic_name(topic));
+       if (!sub) {
                return NULL;
        }
-       ast_uuid_to_str(id, uniqueid, sizeof(uniqueid));
+       ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid));
+
+       if (needs_mailbox) {
+               char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
+
+               /* Create name with seq number appended. */
+               ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "sub%c:%s",
+                       use_thread_pool ? 'p' : 'm',
+                       stasis_topic_name(topic));
+
+               /*
+                * With a small number of subscribers, a thread-per-sub is
+                * acceptable. For a large number of subscribers, a thread
+                * pool should be used.
+                */
+               if (use_thread_pool) {
+                       sub->mailbox = ast_threadpool_serializer(tps_name, pool);
+               } else {
+                       sub->mailbox = ast_taskprocessor_get(tps_name, TPS_REF_DEFAULT);
+               }
+               if (!sub->mailbox) {
+                       ao2_ref(sub, -1);
 
-       sub->mailbox = ast_threadpool_serializer(uniqueid, pool);
-       if (!sub->mailbox) {
-               return NULL;
+                       return NULL;
+               }
+               ast_taskprocessor_set_local(sub->mailbox, sub);
+               /* Taskprocessor has a reference */
+               ao2_ref(sub, +1);
        }
 
-       sub->uniqueid = ast_strdup(uniqueid);
        ao2_ref(topic, +1);
        sub->topic = topic;
        sub->callback = callback;
        sub->data = data;
+       ast_cond_init(&sub->join_cond, NULL);
+       sub->filter = STASIS_SUBSCRIPTION_FILTER_NONE;
+       AST_VECTOR_INIT(&sub->accepted_message_types, 0);
 
        if (topic_add_subscription(topic, sub) != 0) {
+               ao2_ref(sub, -1);
+
                return NULL;
        }
-       send_subscription_change_message(topic, uniqueid, "Subscribe");
+       send_subscription_subscribe(topic, sub);
 
-       ao2_ref(sub, +1);
        return sub;
 }
 
+struct stasis_subscription *stasis_subscribe(
+       struct stasis_topic *topic,
+       stasis_subscription_cb callback,
+       void *data)
+{
+       return internal_stasis_subscribe(topic, callback, data, 1, 0);
+}
+
+struct stasis_subscription *stasis_subscribe_pool(
+       struct stasis_topic *topic,
+       stasis_subscription_cb callback,
+       void *data)
+{
+       return internal_stasis_subscribe(topic, callback, data, 1, 1);
+}
+
+static int sub_cleanup(void *data)
+{
+       struct stasis_subscription *sub = data;
+       ao2_cleanup(sub);
+       return 0;
+}
+
 struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
 {
-       if (sub) {
-               size_t i;
-               struct stasis_topic *topic = sub->topic;
-               SCOPED_AO2LOCK(lock_topic, topic);
-
-               for (i = 0; i < topic->num_subscribers_current; ++i) {
-                       if (topic->subscribers[i] == sub) {
-                               send_subscription_change_message(topic, sub->uniqueid, "Unsubscribe");
-                               /* swap [i] with last entry; remove last entry */
-                               topic->subscribers[i] = topic->subscribers[--topic->num_subscribers_current];
-                               /* Unsubscribing unrefs the subscription */
-                               ao2_cleanup(sub);
-                               return NULL;
-                       }
+       /* The subscription may be the last ref to this topic. Hold
+        * the topic ref open until after the unlock. */
+       struct stasis_topic *topic;
+
+       if (!sub) {
+               return NULL;
+       }
+
+       topic = ao2_bump(sub->topic);
+
+       /* We have to remove the subscription first, to ensure the unsubscribe
+        * is the final message */
+       if (topic_remove_subscription(sub->topic, sub) != 0) {
+               ast_log(LOG_ERROR,
+                       "Internal error: subscription has invalid topic\n");
+               ao2_cleanup(topic);
+
+               return NULL;
+       }
+
+       /* Now let everyone know about the unsubscribe */
+       send_subscription_unsubscribe(topic, sub);
+
+       /* When all that's done, remove the ref the mailbox has on the sub */
+       if (sub->mailbox) {
+               if (ast_taskprocessor_push(sub->mailbox, sub_cleanup, sub)) {
+                       /* Nothing we can do here, the conditional is just to keep
+                        * the compiler happy that we're not ignoring the result. */
                }
+       }
+
+       /* Unsubscribing unrefs the subscription */
+       ao2_cleanup(sub);
+       ao2_cleanup(topic);
+
+       return NULL;
+}
+
+int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscription,
+       long low_water, long high_water)
+{
+       int res = -1;
+
+       if (subscription) {
+               res = ast_taskprocessor_alert_set_levels(subscription->mailbox,
+                       low_water, high_water);
+       }
+       return res;
+}
+
+int stasis_subscription_accept_message_type(struct stasis_subscription *subscription,
+       const struct stasis_message_type *type)
+{
+       if (!subscription) {
+               return -1;
+       }
+
+       ast_assert(type != NULL);
+       ast_assert(stasis_message_type_name(type) != NULL);
+
+       if (!type || !stasis_message_type_name(type)) {
+               /* Filtering is unreliable as this message type is not yet initialized
+                * so force all messages through.
+                */
+               subscription->filter = STASIS_SUBSCRIPTION_FILTER_FORCED_NONE;
+               return 0;
+       }
 
-               ast_log(LOG_ERROR, "Internal error: subscription has invalid topic\n");
+       ao2_lock(subscription->topic);
+       if (AST_VECTOR_REPLACE(&subscription->accepted_message_types, stasis_message_type_id(type), 1)) {
+               /* We do this for the same reason as above. The subscription can still operate, so allow
+                * it to do so by forcing all messages through.
+                */
+               subscription->filter = STASIS_SUBSCRIPTION_FILTER_FORCED_NONE;
        }
+       ao2_unlock(subscription->topic);
+
+       return 0;
+}
+
+int stasis_subscription_decline_message_type(struct stasis_subscription *subscription,
+       const struct stasis_message_type *type)
+{
+       if (!subscription) {
+               return -1;
+       }
+
+       ast_assert(type != NULL);
+       ast_assert(stasis_message_type_name(type) != NULL);
+
+       if (!type || !stasis_message_type_name(type)) {
+               return 0;
+       }
+
+       ao2_lock(subscription->topic);
+       if (stasis_message_type_id(type) < AST_VECTOR_SIZE(&subscription->accepted_message_types)) {
+               /* The memory is already allocated so this can't fail */
+               AST_VECTOR_REPLACE(&subscription->accepted_message_types, stasis_message_type_id(type), 0);
+       }
+       ao2_unlock(subscription->topic);
+
+       return 0;
+}
+
+int stasis_subscription_set_filter(struct stasis_subscription *subscription,
+       enum stasis_subscription_message_filter filter)
+{
+       if (!subscription) {
+               return -1;
+       }
+
+       ao2_lock(subscription->topic);
+       if (subscription->filter != STASIS_SUBSCRIPTION_FILTER_FORCED_NONE) {
+               subscription->filter = filter;
+       }
+       ao2_unlock(subscription->topic);
+
+       return 0;
+}
+
+void stasis_subscription_join(struct stasis_subscription *subscription)
+{
+       if (subscription) {
+               ao2_lock(subscription);
+               /* Wait until the processed flag has been set */
+               while (!subscription->final_message_processed) {
+                       ast_cond_wait(&subscription->join_cond,
+                               ao2_object_get_lockaddr(subscription));
+               }
+               ao2_unlock(subscription);
+       }
+}
+
+int stasis_subscription_is_done(struct stasis_subscription *subscription)
+{
+       if (subscription) {
+               int ret;
+
+               ao2_lock(subscription);
+               ret = subscription->final_message_rxed;
+               ao2_unlock(subscription);
+
+               return ret;
+       }
+
+       /* Null subscription is about as done as you can get */
+       return 1;
+}
+
+struct stasis_subscription *stasis_unsubscribe_and_join(
+       struct stasis_subscription *subscription)
+{
+       if (!subscription) {
+               return NULL;
+       }
+
+       /* Bump refcount to hold it past the unsubscribe */
+       ao2_ref(subscription, +1);
+       stasis_unsubscribe(subscription);
+       stasis_subscription_join(subscription);
+       /* Now decrement the refcount back */
+       ao2_cleanup(subscription);
        return NULL;
 }
 
@@ -197,13 +726,15 @@ int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
        if (sub) {
                size_t i;
                struct stasis_topic *topic = sub->topic;
-               SCOPED_AO2LOCK(lock_topic, topic);
 
-               for (i = 0; i < topic->num_subscribers_current; ++i) {
-                       if (topic->subscribers[i] == sub) {
+               ao2_lock(topic);
+               for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
+                       if (AST_VECTOR_GET(&topic->subscribers, i) == sub) {
+                               ao2_unlock(topic);
                                return 1;
                        }
                }
+               ao2_unlock(topic);
        }
 
        return 0;
@@ -217,7 +748,8 @@ const char *stasis_subscription_uniqueid(const struct stasis_subscription *sub)
 int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
 {
        struct stasis_subscription_change *change;
-       if (stasis_message_type(msg) != stasis_subscription_change()) {
+
+       if (stasis_message_type(msg) != stasis_subscription_change_type()) {
                return 0;
        }
 
@@ -242,165 +774,320 @@ int stasis_subscription_final_message(struct stasis_subscription *sub, struct st
  */
 static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
 {
-       struct stasis_subscription **subscribers;
-       SCOPED_AO2LOCK(lock, topic);
-
-       /* Increase list size, if needed */
-       if (topic->num_subscribers_current + 1 > topic->num_subscribers_max) {
-               subscribers = realloc(topic->subscribers, 2 * topic->num_subscribers_max * sizeof(*subscribers));
-               if (!subscribers) {
-                       return -1;
-               }
-               topic->subscribers = subscribers;
-               topic->num_subscribers_max *= 2;
+       size_t idx;
+
+       ao2_lock(topic);
+       /* The reference from the topic to the subscription is shared with
+        * the owner of the subscription, which will explicitly unsubscribe
+        * to release it.
+        *
+        * If we bumped the refcount here, the owner would have to unsubscribe
+        * and cleanup, which is a bit awkward. */
+       AST_VECTOR_APPEND(&topic->subscribers, sub);
+
+       for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
+               topic_add_subscription(
+                       AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
        }
+       ao2_unlock(topic);
 
-       /* Don't ref sub here or we'll cause a reference cycle. */
-       topic->subscribers[topic->num_subscribers_current++] = sub;
        return 0;
 }
 
-/*!
- * \internal
- * \brief Information needed to dispatch a message to a subscription
- */
-struct dispatch {
-       /*! Topic message was published to */
-       struct stasis_topic *topic;
-       /*! The message itself */
-       struct stasis_message *message;
-       /*! Subscription receiving the message */
-       struct stasis_subscription *sub;
-};
-
-static void dispatch_dtor(void *data)
+static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
 {
-       struct dispatch *dispatch = data;
-       ao2_cleanup(dispatch->topic);
-       ao2_cleanup(dispatch->message);
-       ao2_cleanup(dispatch->sub);
+       size_t idx;
+       int res;
+
+       ao2_lock(topic);
+       for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
+               topic_remove_subscription(
+                       AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
+       }
+       res = AST_VECTOR_REMOVE_ELEM_UNORDERED(&topic->subscribers, sub,
+               AST_VECTOR_ELEM_CLEANUP_NOOP);
+       ao2_unlock(topic);
+
+       return res;
 }
 
-static struct dispatch *dispatch_create(struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sub)
+/*!
+ * \internal \brief Dispatch a message to a subscriber asynchronously
+ * \param local \ref ast_taskprocessor_local object
+ * \return 0
+ */
+static int dispatch_exec_async(struct ast_taskprocessor_local *local)
 {
-       RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
+       struct stasis_subscription *sub = local->local_data;
+       struct stasis_message *message = local->data;
 
-       ast_assert(topic != NULL);
-       ast_assert(message != NULL);
-       ast_assert(sub != NULL);
+       subscription_invoke(sub, message);
+       ao2_cleanup(message);
 
-       dispatch = ao2_alloc(sizeof(*dispatch), dispatch_dtor);
-       if (!dispatch) {
-               return NULL;
-       }
+       return 0;
+}
 
-       dispatch->topic = topic;
-       ao2_ref(topic, +1);
+/*!
+ * \internal \brief Data passed to \ref dispatch_exec_sync to synchronize
+ * a published message to a subscriber
+ */
+struct sync_task_data {
+       ast_mutex_t lock;
+       ast_cond_t cond;
+       int complete;
+       void *task_data;
+};
+
+/*!
+ * \internal \brief Dispatch a message to a subscriber synchronously
+ * \param local \ref ast_taskprocessor_local object
+ * \return 0
+ */
+static int dispatch_exec_sync(struct ast_taskprocessor_local *local)
+{
+       struct stasis_subscription *sub = local->local_data;
+       struct sync_task_data *std = local->data;
+       struct stasis_message *message = std->task_data;
 
-       dispatch->message = message;
-       ao2_ref(message, +1);
+       subscription_invoke(sub, message);
+       ao2_cleanup(message);
 
-       dispatch->sub = sub;
-       ao2_ref(sub, +1);
+       ast_mutex_lock(&std->lock);
+       std->complete = 1;
+       ast_cond_signal(&std->cond);
+       ast_mutex_unlock(&std->lock);
 
-       ao2_ref(dispatch, +1);
-       return dispatch;
+       return 0;
 }
 
 /*!
- * \brief Dispatch a message to a subscriber
- * \param data \ref dispatch object
- * \return 0
+ * \internal \brief Dispatch a message to a subscriber
+ * \param sub The subscriber to dispatch to
+ * \param message The message to send
+ * \param synchronous If non-zero, synchronize on the subscriber receiving
+ * the message
  */
-static int dispatch_exec(void *data)
+static void dispatch_message(struct stasis_subscription *sub,
+       struct stasis_message *message,
+       int synchronous)
 {
-       RAII_VAR(struct dispatch *, dispatch, data, ao2_cleanup);
-       RAII_VAR(struct stasis_topic *, sub_topic, NULL, ao2_cleanup);
+       /* Determine if this subscription is interested in this message. Note that final
+        * messages are special and are always invoked on the subscription.
+        */
+       if (sub->filter == STASIS_SUBSCRIPTION_FILTER_SELECTIVE) {
+               int message_type_id = stasis_message_type_id(stasis_message_type(message));
+               if ((message_type_id >= AST_VECTOR_SIZE(&sub->accepted_message_types) ||
+                       !AST_VECTOR_GET(&sub->accepted_message_types, message_type_id)) &&
+                       !stasis_subscription_final_message(sub, message)) {
+                       return;
+               }
+       }
 
-       /* Since sub->topic doesn't change, no need to lock sub */
-       ast_assert(dispatch->sub->topic != NULL);
-       ao2_ref(dispatch->sub->topic, +1);
-       sub_topic = dispatch->sub->topic;
+       if (!sub->mailbox) {
+               /* Dispatch directly */
+               subscription_invoke(sub, message);
+               return;
+       }
+
+       /* Bump the message for the taskprocessor push. This will get de-ref'd
+        * by the task processor callback.
+        */
+       ao2_bump(message);
+       if (!synchronous) {
+               if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec_async, message)) {
+                       /* Push failed; ugh. */
+                       ast_log(LOG_ERROR, "Dropping async dispatch\n");
+                       ao2_cleanup(message);
+               }
+       } else {
+               struct sync_task_data std;
+
+               ast_mutex_init(&std.lock);
+               ast_cond_init(&std.cond, NULL);
+               std.complete = 0;
+               std.task_data = message;
+
+               if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec_sync, &std)) {
+                       /* Push failed; ugh. */
+                       ast_log(LOG_ERROR, "Dropping sync dispatch\n");
+                       ao2_cleanup(message);
+                       ast_mutex_destroy(&std.lock);
+                       ast_cond_destroy(&std.cond);
+                       return;
+               }
 
-       dispatch->sub->callback(dispatch->sub->data,
-                               dispatch->sub,
-                               sub_topic,
-                               dispatch->message);
+               ast_mutex_lock(&std.lock);
+               while (!std.complete) {
+                       ast_cond_wait(&std.cond, &std.lock);
+               }
+               ast_mutex_unlock(&std.lock);
 
-       return 0;
+               ast_mutex_destroy(&std.lock);
+               ast_cond_destroy(&std.cond);
+       }
 }
 
-void stasis_forward_message(struct stasis_topic *topic, struct stasis_topic *publisher_topic, struct stasis_message *message)
+/*!
+ * \internal \brief Publish a message to a topic's subscribers
+ * \brief topic The topic to publish to
+ * \brief message The message to publish
+ * \brief sync_sub An optional subscriber of the topic to publish synchronously
+ * to
+ */
+static void publish_msg(struct stasis_topic *topic,
+       struct stasis_message *message, struct stasis_subscription *sync_sub)
 {
        size_t i;
-       SCOPED_AO2LOCK(lock, topic);
 
        ast_assert(topic != NULL);
-       ast_assert(publisher_topic != NULL);
        ast_assert(message != NULL);
 
-       for (i = 0; i < topic->num_subscribers_current; ++i) {
-               struct stasis_subscription *sub = topic->subscribers[i];
-               RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
+       /* If there are no subscribers don't bother */
+       if (!stasis_topic_subscribers(topic)) {
+               return;
+       }
 
-               ast_assert(sub != NULL);
+       /*
+        * The topic may be unref'ed by the subscription invocation.
+        * Make sure we hold onto a reference while dispatching.
+        */
+       ao2_ref(topic, +1);
+       ao2_lock(topic);
+       for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
+               struct stasis_subscription *sub = AST_VECTOR_GET(&topic->subscribers, i);
 
-               dispatch = dispatch_create(publisher_topic, message, sub);
-               if (!dispatch) {
-                       ast_log(LOG_DEBUG, "Dropping dispatch\n");
-                       break;
-               }
+               ast_assert(sub != NULL);
 
-               if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) == 0) {
-                       dispatch = NULL; /* Ownership transferred to mailbox */
-               }
+               dispatch_message(sub, message, (sub == sync_sub));
        }
+       ao2_unlock(topic);
+       ao2_ref(topic, -1);
 }
 
 void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
 {
-       stasis_forward_message(topic, topic, message);
+       publish_msg(topic, message, NULL);
+}
+
+void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message)
+{
+       ast_assert(sub != NULL);
+
+       publish_msg(sub->topic, message, sub);
 }
 
-/*! \brief Forwarding subscriber */
-static void stasis_forward_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
+/*!
+ * \brief Forwarding information
+ *
+ * Any message posted to \a from_topic is forwarded to \a to_topic.
+ *
+ * In cases where both the \a from_topic and \a to_topic need to be locked,
+ * always lock the \a to_topic first, then the \a from_topic. Lest you deadlock.
+ */
+struct stasis_forward {
+       /*! Originating topic */
+       struct stasis_topic *from_topic;
+       /*! Destination topic */
+       struct stasis_topic *to_topic;
+};
+
+static void forward_dtor(void *obj)
 {
-       struct stasis_topic *to_topic = data;
-       stasis_forward_message(to_topic, topic, message);
+       struct stasis_forward *forward = obj;
+
+       ao2_cleanup(forward->from_topic);
+       forward->from_topic = NULL;
+       ao2_cleanup(forward->to_topic);
+       forward->to_topic = NULL;
+}
+
+struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward)
+{
+       int idx;
+       struct stasis_topic *from;
+       struct stasis_topic *to;
+
+       if (!forward) {
+               return NULL;
+       }
+
+       from = forward->from_topic;
+       to = forward->to_topic;
 
-       if (stasis_subscription_final_message(sub, message)) {
-               ao2_cleanup(to_topic);
+       if (from && to) {
+               topic_lock_both(to, from);
+               AST_VECTOR_REMOVE_ELEM_UNORDERED(&to->upstream_topics, from,
+                       AST_VECTOR_ELEM_CLEANUP_NOOP);
+
+               for (idx = 0; idx < AST_VECTOR_SIZE(&to->subscribers); ++idx) {
+                       topic_remove_subscription(from, AST_VECTOR_GET(&to->subscribers, idx));
+               }
+               ao2_unlock(from);
+               ao2_unlock(to);
        }
+
+       ao2_cleanup(forward);
+
+       return NULL;
 }
 
-struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
+struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic,
+       struct stasis_topic *to_topic)
 {
-       struct stasis_subscription *sub;
+       int res;
+       size_t idx;
+       struct stasis_forward *forward;
+
        if (!from_topic || !to_topic) {
                return NULL;
        }
 
-       sub = stasis_subscribe(from_topic, stasis_forward_cb, to_topic);
-       if (sub) {
-               /* hold a ref to to_topic for this forwarding subscription */
-               ao2_ref(to_topic, +1);
+       forward = ao2_alloc_options(sizeof(*forward), forward_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
+       if (!forward) {
+               return NULL;
        }
-       return sub;
+
+       /* Forwards to ourselves are implicit. */
+       if (to_topic == from_topic) {
+               return forward;
+       }
+
+       forward->from_topic = ao2_bump(from_topic);
+       forward->to_topic = ao2_bump(to_topic);
+
+       topic_lock_both(to_topic, from_topic);
+       res = AST_VECTOR_APPEND(&to_topic->upstream_topics, from_topic);
+       if (res != 0) {
+               ao2_unlock(from_topic);
+               ao2_unlock(to_topic);
+               ao2_ref(forward, -1);
+               return NULL;
+       }
+
+       for (idx = 0; idx < AST_VECTOR_SIZE(&to_topic->subscribers); ++idx) {
+               topic_add_subscription(from_topic, AST_VECTOR_GET(&to_topic->subscribers, idx));
+       }
+       ao2_unlock(from_topic);
+       ao2_unlock(to_topic);
+
+       return forward;
 }
 
 static void subscription_change_dtor(void *obj)
 {
        struct stasis_subscription_change *change = obj;
+
        ast_string_field_free_memory(change);
        ao2_cleanup(change->topic);
 }
 
-static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, char *uniqueid, char *description)
+static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, const char *uniqueid, const char *description)
 {
-       RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
+       struct stasis_subscription_change *change;
 
        change = ao2_alloc(sizeof(struct stasis_subscription_change), subscription_change_dtor);
-       if (ast_string_field_init(change, 128)) {
+       if (!change || ast_string_field_init(change, 128)) {
+               ao2_cleanup(change);
                return NULL;
        }
 
@@ -409,51 +1096,88 @@ static struct stasis_subscription_change *subscription_change_alloc(struct stasi
        ao2_ref(topic, +1);
        change->topic = topic;
 
-       ao2_ref(change, +1);
        return change;
 }
 
-struct stasis_message_type *stasis_subscription_change(void)
+static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
 {
-       return __subscription_change_message_type;
+       struct stasis_subscription_change *change;
+       struct stasis_message *msg;
+
+       /* This assumes that we have already unsubscribed */
+       ast_assert(stasis_subscription_is_subscribed(sub));
+
+       if (!stasis_subscription_change_type()) {
+               return;
+       }
+
+       change = subscription_change_alloc(topic, sub->uniqueid, "Subscribe");
+       if (!change) {
+               return;
+       }
+
+       msg = stasis_message_create(stasis_subscription_change_type(), change);
+       if (!msg) {
+               ao2_cleanup(change);
+               return;
+       }
+
+       stasis_publish(topic, msg);
+       ao2_cleanup(msg);
+       ao2_cleanup(change);
 }
 
-static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description)
+static void send_subscription_unsubscribe(struct stasis_topic *topic,
+       struct stasis_subscription *sub)
 {
-       RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
-       RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
+       struct stasis_subscription_change *change;
+       struct stasis_message *msg;
 
-       change = subscription_change_alloc(topic, uniqueid, description);
+       /* This assumes that we have already unsubscribed */
+       ast_assert(!stasis_subscription_is_subscribed(sub));
 
-       if (!change) {
+       if (!stasis_subscription_change_type()) {
                return;
        }
 
-       msg = stasis_message_create(stasis_subscription_change(), change);
+       change = subscription_change_alloc(topic, sub->uniqueid, "Unsubscribe");
+       if (!change) {
+               return;
+       }
 
+       msg = stasis_message_create(stasis_subscription_change_type(), change);
        if (!msg) {
+               ao2_cleanup(change);
                return;
        }
 
        stasis_publish(topic, msg);
+
+       /* Now we have to dispatch to the subscription itself */
+       dispatch_message(sub, msg, 0);
+
+       ao2_cleanup(msg);
+       ao2_cleanup(change);
 }
 
 struct topic_pool_entry {
-       struct stasis_subscription *forward;
+       struct stasis_forward *forward;
        struct stasis_topic *topic;
 };
 
 static void topic_pool_entry_dtor(void *obj)
 {
        struct topic_pool_entry *entry = obj;
-       entry->forward = stasis_unsubscribe(entry->forward);
+
+       entry->forward = stasis_forward_cancel(entry->forward);
        ao2_cleanup(entry->topic);
        entry->topic = NULL;
 }
 
 static struct topic_pool_entry *topic_pool_entry_alloc(void)
 {
-       return ao2_alloc(sizeof(struct topic_pool_entry), topic_pool_entry_dtor);
+       return ao2_alloc_options(sizeof(struct topic_pool_entry), topic_pool_entry_dtor,
+               AO2_ALLOC_OPT_LOCK_NOLOCK);
 }
 
 struct stasis_topic_pool {
@@ -464,6 +1188,16 @@ struct stasis_topic_pool {
 static void topic_pool_dtor(void *obj)
 {
        struct stasis_topic_pool *pool = obj;
+
+#ifdef AO2_DEBUG
+       {
+               char *container_name =
+                       ast_alloca(strlen(stasis_topic_name(pool->pool_topic)) + strlen("-pool") + 1);
+               sprintf(container_name, "%s-pool", stasis_topic_name(pool->pool_topic));
+               ao2_container_unregister(container_name);
+       }
+#endif
+
        ao2_cleanup(pool->pool_container);
        pool->pool_container = NULL;
        ao2_cleanup(pool->pool_topic);
@@ -472,43 +1206,121 @@ static void topic_pool_dtor(void *obj)
 
 static int topic_pool_entry_hash(const void *obj, const int flags)
 {
-       const char *topic_name= (flags & OBJ_KEY) ? obj : stasis_topic_name(((struct topic_pool_entry*) obj)->topic);
-       return ast_str_case_hash(topic_name);
+       const struct topic_pool_entry *object;
+       const char *key;
+
+       switch (flags & OBJ_SEARCH_MASK) {
+       case OBJ_SEARCH_KEY:
+               key = obj;
+               break;
+       case OBJ_SEARCH_OBJECT:
+               object = obj;
+               key = stasis_topic_name(object->topic);
+               break;
+       default:
+               /* Hash can only work on something with a full key. */
+               ast_assert(0);
+               return 0;
+       }
+       return ast_str_case_hash(key);
 }
 
 static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
 {
-       struct topic_pool_entry *opt1 = obj, *opt2 = arg;
-       const char *topic_name = (flags & OBJ_KEY) ? arg : stasis_topic_name(opt2->topic);
-       return strcasecmp(stasis_topic_name(opt1->topic), topic_name) ? 0 : CMP_MATCH | CMP_STOP;
+       const struct topic_pool_entry *object_left = obj;
+       const struct topic_pool_entry *object_right = arg;
+       const char *right_key = arg;
+       int cmp;
+
+       switch (flags & OBJ_SEARCH_MASK) {
+       case OBJ_SEARCH_OBJECT:
+               right_key = stasis_topic_name(object_right->topic);
+               /* Fall through */
+       case OBJ_SEARCH_KEY:
+               cmp = strcasecmp(stasis_topic_name(object_left->topic), right_key);
+               break;
+       case OBJ_SEARCH_PARTIAL_KEY:
+               /* Not supported by container */
+               ast_assert(0);
+               cmp = -1;
+               break;
+       default:
+               /*
+                * What arg points to is specific to this traversal callback
+                * and has no special meaning to astobj2.
+                */
+               cmp = 0;
+               break;
+       }
+       if (cmp) {
+               return 0;
+       }
+       /*
+        * At this point the traversal callback is identical to a sorted
+        * container.
+        */
+       return CMP_MATCH;
+}
+
+#ifdef AO2_DEBUG
+static void topic_pool_prnt_obj(void *v_obj, void *where, ao2_prnt_fn *prnt)
+{
+       struct topic_pool_entry *entry = v_obj;
+
+       if (!entry) {
+               return;
+       }
+       prnt(where, "%s", stasis_topic_name(entry->topic));
 }
+#endif
 
 struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic)
 {
-       RAII_VAR(struct stasis_topic_pool *, pool, ao2_alloc(sizeof(*pool), topic_pool_dtor), ao2_cleanup);
+       struct stasis_topic_pool *pool;
+
+       pool = ao2_alloc_options(sizeof(*pool), topic_pool_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
        if (!pool) {
                return NULL;
        }
-       pool->pool_container = ao2_container_alloc(TOPIC_POOL_BUCKETS, topic_pool_entry_hash, topic_pool_entry_cmp);
+
+       pool->pool_container = ao2_container_alloc(TOPIC_POOL_BUCKETS,
+               topic_pool_entry_hash, topic_pool_entry_cmp);
+       if (!pool->pool_container) {
+               ao2_cleanup(pool);
+               return NULL;
+       }
+
+#ifdef AO2_DEBUG
+       {
+               char *container_name =
+                       ast_alloca(strlen(stasis_topic_name(pooled_topic)) + strlen("-pool") + 1);
+               sprintf(container_name, "%s-pool", stasis_topic_name(pooled_topic));
+               ao2_container_register(container_name, pool->pool_container, topic_pool_prnt_obj);
+       }
+#endif
+
        ao2_ref(pooled_topic, +1);
        pool->pool_topic = pooled_topic;
 
-       ao2_ref(pool, +1);
        return pool;
 }
 
+void stasis_topic_pool_delete_topic(struct stasis_topic_pool *pool, const char *topic_name)
+{
+       ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY | OBJ_NODATA | OBJ_UNLINK);
+}
+
 struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
 {
        RAII_VAR(struct topic_pool_entry *, topic_pool_entry, NULL, ao2_cleanup);
        SCOPED_AO2LOCK(topic_container_lock, pool->pool_container);
-       topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_KEY | OBJ_NOLOCK);
 
+       topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
        if (topic_pool_entry) {
                return topic_pool_entry->topic;
        }
 
        topic_pool_entry = topic_pool_entry_alloc();
-
        if (!topic_pool_entry) {
                return NULL;
        }
@@ -523,43 +1335,498 @@ struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool,
                return NULL;
        }
 
-       ao2_link_flags(pool->pool_container, topic_pool_entry, OBJ_NOLOCK);
+       if (!ao2_link_flags(pool->pool_container, topic_pool_entry, OBJ_NOLOCK)) {
+               return NULL;
+       }
 
        return topic_pool_entry->topic;
 }
 
-/*! \brief Cleanup function */
-static void stasis_exit(void)
+int stasis_topic_pool_topic_exists(const struct stasis_topic_pool *pool, const char *topic_name)
+{
+       struct topic_pool_entry *topic_pool_entry;
+
+       topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY);
+       if (!topic_pool_entry) {
+               return 0;
+       }
+
+       ao2_ref(topic_pool_entry, -1);
+       return 1;
+}
+
+void stasis_log_bad_type_access(const char *name)
+{
+#ifdef AST_DEVMODE
+       if (!stasis_message_type_declined(name)) {
+               ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name);
+       }
+#endif
+}
+
+/*! \brief A multi object blob data structure to carry user event stasis messages */
+struct ast_multi_object_blob {
+       struct ast_json *blob;                             /*< A blob of JSON data */
+       AST_VECTOR(, void *) snapshots[STASIS_UMOS_MAX];   /*< Vector of snapshots for each type */
+};
+
+/*!
+ * \internal
+ * \brief Destructor for \ref ast_multi_object_blob objects
+ */
+static void multi_object_blob_dtor(void *obj)
+{
+       struct ast_multi_object_blob *multi = obj;
+       int type;
+       int i;
+
+       for (type = 0; type < STASIS_UMOS_MAX; ++type) {
+               for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
+                       ao2_cleanup(AST_VECTOR_GET(&multi->snapshots[type], i));
+               }
+               AST_VECTOR_FREE(&multi->snapshots[type]);
+       }
+       ast_json_unref(multi->blob);
+}
+
+/*! \brief Create a stasis user event multi object blob */
+struct ast_multi_object_blob *ast_multi_object_blob_create(struct ast_json *blob)
+{
+       int type;
+       struct ast_multi_object_blob *multi;
+
+       ast_assert(blob != NULL);
+
+       multi = ao2_alloc(sizeof(*multi), multi_object_blob_dtor);
+       if (!multi) {
+               return NULL;
+       }
+
+       for (type = 0; type < STASIS_UMOS_MAX; ++type) {
+               if (AST_VECTOR_INIT(&multi->snapshots[type], 0)) {
+                       ao2_ref(multi, -1);
+
+                       return NULL;
+               }
+       }
+
+       multi->blob = ast_json_ref(blob);
+
+       return multi;
+}
+
+/*! \brief Add an object (snapshot) to the blob */
+void ast_multi_object_blob_add(struct ast_multi_object_blob *multi,
+       enum stasis_user_multi_object_snapshot_type type, void *object)
+{
+       if (!multi || !object || AST_VECTOR_APPEND(&multi->snapshots[type], object)) {
+               ao2_cleanup(object);
+       }
+}
+
+/*! \brief Publish single channel user event (for app_userevent compatibility) */
+void ast_multi_object_blob_single_channel_publish(struct ast_channel *chan,
+       struct stasis_message_type *type, struct ast_json *blob)
+{
+       struct stasis_message *message;
+       struct ast_channel_snapshot *channel_snapshot;
+       struct ast_multi_object_blob *multi;
+
+       if (!type) {
+               return;
+       }
+
+       multi = ast_multi_object_blob_create(blob);
+       if (!multi) {
+               return;
+       }
+
+       channel_snapshot = ast_channel_snapshot_create(chan);
+       if (!channel_snapshot) {
+               ao2_ref(multi, -1);
+               return;
+       }
+
+       /* this call steals the channel_snapshot reference */
+       ast_multi_object_blob_add(multi, STASIS_UMOS_CHANNEL, channel_snapshot);
+
+       message = stasis_message_create(type, multi);
+       ao2_ref(multi, -1);
+       if (message) {
+               /* app_userevent still publishes to channel */
+               stasis_publish(ast_channel_topic(chan), message);
+               ao2_ref(message, -1);
+       }
+}
+
+/*! \internal \brief convert multi object blob to ari json */
+static struct ast_json *multi_user_event_to_json(
+       struct stasis_message *message,
+       const struct stasis_message_sanitizer *sanitize)
+{
+       struct ast_json *out;
+       struct ast_multi_object_blob *multi = stasis_message_data(message);
+       struct ast_json *blob = multi->blob;
+       const struct timeval *tv = stasis_message_timestamp(message);
+       enum stasis_user_multi_object_snapshot_type type;
+       int i;
+
+       out = ast_json_object_create();
+       if (!out) {
+               return NULL;
+       }
+
+       ast_json_object_set(out, "type", ast_json_string_create("ChannelUserevent"));
+       ast_json_object_set(out, "timestamp", ast_json_timeval(*tv, NULL));
+       ast_json_object_set(out, "eventname", ast_json_ref(ast_json_object_get(blob, "eventname")));
+       ast_json_object_set(out, "userevent", ast_json_ref(blob));
+
+       for (type = 0; type < STASIS_UMOS_MAX; ++type) {
+               for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
+                       struct ast_json *json_object = NULL;
+                       char *name = NULL;
+                       void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
+
+                       switch (type) {
+                       case STASIS_UMOS_CHANNEL:
+                               json_object = ast_channel_snapshot_to_json(snapshot, sanitize);
+                               name = "channel";
+                               break;
+                       case STASIS_UMOS_BRIDGE:
+                               json_object = ast_bridge_snapshot_to_json(snapshot, sanitize);
+                               name = "bridge";
+                               break;
+                       case STASIS_UMOS_ENDPOINT:
+                               json_object = ast_endpoint_snapshot_to_json(snapshot, sanitize);
+                               name = "endpoint";
+                               break;
+                       }
+                       if (json_object) {
+                               ast_json_object_set(out, name, json_object);
+                       }
+               }
+       }
+
+       return out;
+}
+
+/*! \internal \brief convert multi object blob to ami string */
+static struct ast_str *multi_object_blob_to_ami(void *obj)
+{
+       struct ast_str *ami_str=ast_str_create(1024);
+       struct ast_str *ami_snapshot;
+       const struct ast_multi_object_blob *multi = obj;
+       enum stasis_user_multi_object_snapshot_type type;
+       int i;
+
+       if (!ami_str) {
+               return NULL;
+       }
+       if (!multi) {
+               ast_free(ami_str);
+               return NULL;
+       }
+
+       for (type = 0; type < STASIS_UMOS_MAX; ++type) {
+               for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
+                       char *name = NULL;
+                       void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
+                       ami_snapshot = NULL;
+
+                       if (i > 0) {
+                               ast_asprintf(&name, "%d", i + 1);
+                       }
+
+                       switch (type) {
+                       case STASIS_UMOS_CHANNEL:
+                               ami_snapshot = ast_manager_build_channel_state_string_prefix(snapshot, name ?: "");
+                               break;
+
+                       case STASIS_UMOS_BRIDGE:
+                               ami_snapshot = ast_manager_build_bridge_state_string_prefix(snapshot, name ?: "");
+                               break;
+
+                       case STASIS_UMOS_ENDPOINT:
+                               /* currently not sending endpoint snapshots to AMI */
+                               break;
+                       }
+                       if (ami_snapshot) {
+                               ast_str_append(&ami_str, 0, "%s", ast_str_buffer(ami_snapshot));
+                               ast_free(ami_snapshot);
+                       }
+                       ast_free(name);
+               }
+       }
+
+       return ami_str;
+}
+
+/*! \internal \brief Callback to pass only user defined parameters from blob */
+static int userevent_exclusion_cb(const char *key)
+{
+       if (!strcmp("eventname", key)) {
+               return 1;
+       }
+       return 0;
+}
+
+static struct ast_manager_event_blob *multi_user_event_to_ami(
+       struct stasis_message *message)
+{
+       RAII_VAR(struct ast_str *, object_string, NULL, ast_free);
+       RAII_VAR(struct ast_str *, body, NULL, ast_free);
+       struct ast_multi_object_blob *multi = stasis_message_data(message);
+       const char *eventname;
+
+       eventname = ast_json_string_get(ast_json_object_get(multi->blob, "eventname"));
+       body = ast_manager_str_from_json_object(multi->blob, userevent_exclusion_cb);
+       object_string = multi_object_blob_to_ami(multi);
+       if (!object_string || !body) {
+               return NULL;
+       }
+
+       return ast_manager_event_blob_create(EVENT_FLAG_USER, "UserEvent",
+               "%s"
+               "UserEvent: %s\r\n"
+               "%s",
+               ast_str_buffer(object_string),
+               eventname,
+               ast_str_buffer(body));
+}
+
+/*! \brief A structure to hold global configuration-related options */
+struct stasis_declined_config {
+       /*! The list of message types to decline */
+       struct ao2_container *declined;
+};
+
+/*! \brief Threadpool configuration options */
+struct stasis_threadpool_conf {
+       /*! Initial size of the thread pool */
+       int initial_size;
+       /*! Time, in seconds, before we expire a thread */
+       int idle_timeout_sec;
+       /*! Maximum number of thread to allow */
+       int max_size;
+};
+
+struct stasis_config {
+       /*! Thread pool configuration options */
+       struct stasis_threadpool_conf *threadpool_options;
+       /*! Declined message types */
+       struct stasis_declined_config *declined_message_types;
+};
+
+static struct aco_type threadpool_option = {
+       .type = ACO_GLOBAL,
+       .name = "threadpool",
+       .item_offset = offsetof(struct stasis_config, threadpool_options),
+       .category = "threadpool",
+       .category_match = ACO_WHITELIST_EXACT,
+};
+
+static struct aco_type *threadpool_options[] = ACO_TYPES(&threadpool_option);
+
+/*! \brief An aco_type structure to link the "declined_message_types" category to the stasis_declined_config type */
+static struct aco_type declined_option = {
+       .type = ACO_GLOBAL,
+       .name = "declined_message_types",
+       .item_offset = offsetof(struct stasis_config, declined_message_types),
+       .category_match = ACO_WHITELIST_EXACT,
+       .category = "declined_message_types",
+};
+
+struct aco_type *declined_options[] = ACO_TYPES(&declined_option);
+
+struct aco_file stasis_conf = {
+        .filename = "stasis.conf",
+       .types = ACO_TYPES(&declined_option, &threadpool_option),
+};
+
+/*! \brief A global object container that will contain the stasis_config that gets swapped out on reloads */
+static AO2_GLOBAL_OBJ_STATIC(globals);
+
+static void *stasis_config_alloc(void);
+
+/*! \brief Register information about the configs being processed by this module */
+CONFIG_INFO_CORE("stasis", cfg_info, globals, stasis_config_alloc,
+        .files = ACO_FILES(&stasis_conf),
+);
+
+static void stasis_declined_config_destructor(void *obj)
+{
+       struct stasis_declined_config *declined = obj;
+
+       ao2_cleanup(declined->declined);
+}
+
+static void stasis_config_destructor(void *obj)
+{
+       struct stasis_config *cfg = obj;
+
+       ao2_cleanup(cfg->declined_message_types);
+       ast_free(cfg->threadpool_options);
+}
+
+static void *stasis_config_alloc(void)
+{
+       struct stasis_config *cfg;
+
+       if (!(cfg = ao2_alloc(sizeof(*cfg), stasis_config_destructor))) {
+               return NULL;
+       }
+
+       cfg->threadpool_options = ast_calloc(1, sizeof(*cfg->threadpool_options));
+       if (!cfg->threadpool_options) {
+               ao2_ref(cfg, -1);
+               return NULL;
+       }
+
+       cfg->declined_message_types = ao2_alloc(sizeof(*cfg->declined_message_types),
+               stasis_declined_config_destructor);
+       if (!cfg->declined_message_types) {
+               ao2_ref(cfg, -1);
+               return NULL;
+       }
+
+       cfg->declined_message_types->declined = ast_str_container_alloc(13);
+       if (!cfg->declined_message_types->declined) {
+               ao2_ref(cfg, -1);
+               return NULL;
+       }
+
+       return cfg;
+}
+
+int stasis_message_type_declined(const char *name)
+{
+       struct stasis_config *cfg = ao2_global_obj_ref(globals);
+       char *name_in_declined;
+       int res;
+
+       if (!cfg || !cfg->declined_message_types) {
+               ao2_cleanup(cfg);
+               return 0;
+       }
+
+       name_in_declined = ao2_find(cfg->declined_message_types->declined, name, OBJ_SEARCH_KEY);
+       res = name_in_declined ? 1 : 0;
+       ao2_cleanup(name_in_declined);
+       ao2_ref(cfg, -1);
+       if (res) {
+               ast_log(LOG_NOTICE, "Declining to allocate Stasis message type '%s' due to configuration\n", name);
+       }
+       return res;
+}
+
+static int declined_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
+{
+       struct stasis_declined_config *declined = obj;
+
+       if (ast_strlen_zero(var->value)) {
+               return 0;
+       }
+
+       if (ast_str_container_add(declined->declined, var->value)) {
+               return -1;
+       }
+
+       return 0;
+}
+
+/*!
+ * @{ \brief Define multi user event message type(s).
+ */
+
+STASIS_MESSAGE_TYPE_DEFN(ast_multi_user_event_type,
+       .to_json = multi_user_event_to_json,
+       .to_ami = multi_user_event_to_ami,
+       );
+
+/*! @} */
+
+/*! \brief Cleanup function for graceful shutdowns */
+static void stasis_cleanup(void)
 {
-       ao2_cleanup(__subscription_change_message_type);
-       __subscription_change_message_type = NULL;
        ast_threadpool_shutdown(pool);
        pool = NULL;
+       STASIS_MESSAGE_TYPE_CLEANUP(stasis_subscription_change_type);
+       STASIS_MESSAGE_TYPE_CLEANUP(ast_multi_user_event_type);
+       aco_info_destroy(&cfg_info);
+       ao2_global_obj_release(globals);
 }
 
 int stasis_init(void)
 {
+       struct stasis_config *cfg;
        int cache_init;
+       struct ast_threadpool_options threadpool_opts = { 0, };
 
-       /* XXX Should this be configurable? */
-       struct ast_threadpool_options opts = {
-               .version = AST_THREADPOOL_OPTIONS_VERSION,
-               .idle_timeout = 20,
-               .auto_increment = 1,
-               .initial_size = 0,
-               .max_size = 200
-       };
+       /* Be sure the types are cleaned up after the message bus */
+       ast_register_cleanup(stasis_cleanup);
 
-       ast_register_atexit(stasis_exit);
-
-       if (pool) {
-               ast_log(LOG_ERROR, "Stasis double-initialized\n");
+       if (aco_info_init(&cfg_info)) {
                return -1;
        }
 
-       pool = ast_threadpool_create("stasis-core", NULL, &opts);
+       aco_option_register_custom(&cfg_info, "decline", ACO_EXACT,
+               declined_options, "", declined_handler, 0);
+       aco_option_register(&cfg_info, "initial_size", ACO_EXACT,
+               threadpool_options, "5", OPT_INT_T, PARSE_IN_RANGE,
+               FLDSET(struct stasis_threadpool_conf, initial_size), 0,
+               INT_MAX);
+       aco_option_register(&cfg_info, "idle_timeout_sec", ACO_EXACT,
+               threadpool_options, "20", OPT_INT_T, PARSE_IN_RANGE,
+               FLDSET(struct stasis_threadpool_conf, idle_timeout_sec), 0,
+               INT_MAX);
+       aco_option_register(&cfg_info, "max_size", ACO_EXACT,
+               threadpool_options, "50", OPT_INT_T, PARSE_IN_RANGE,
+               FLDSET(struct stasis_threadpool_conf, max_size), 0,
+               INT_MAX);
+
+       if (aco_process_config(&cfg_info, 0) == ACO_PROCESS_ERROR) {
+               struct stasis_config *default_cfg = stasis_config_alloc();
+
+               if (!default_cfg) {
+                       return -1;
+               }
+
+               if (aco_set_defaults(&threadpool_option, "threadpool", default_cfg->threadpool_options)) {
+                       ast_log(LOG_ERROR, "Failed to initialize defaults on Stasis configuration object\n");
+                       ao2_ref(default_cfg, -1);
+
+                       return -1;
+               }
+
+               if (aco_set_defaults(&declined_option, "declined_message_types", default_cfg->declined_message_types)) {
+                       ast_log(LOG_ERROR, "Failed to load stasis.conf and failed to initialize defaults.\n");
+                       ao2_ref(default_cfg, -1);
+
+                       return -1;
+               }
+
+               ast_log(LOG_NOTICE, "Could not load Stasis configuration; using defaults\n");
+               ao2_global_obj_replace_unref(globals, default_cfg);
+               cfg = default_cfg;
+       } else {
+               cfg = ao2_global_obj_ref(globals);
+               if (!cfg) {
+                       ast_log(LOG_ERROR, "Failed to obtain Stasis configuration object\n");
+
+                       return -1;
+               }
+       }
+
+       threadpool_opts.version = AST_THREADPOOL_OPTIONS_VERSION;
+       threadpool_opts.initial_size = cfg->threadpool_options->initial_size;
+       threadpool_opts.auto_increment = 1;
+       threadpool_opts.max_size = cfg->threadpool_options->max_size;
+       threadpool_opts.idle_timeout = cfg->threadpool_options->idle_timeout_sec;
+       pool = ast_threadpool_create("stasis-core", NULL, &threadpool_opts);
+       ao2_ref(cfg, -1);
        if (!pool) {
-               ast_log(LOG_ERROR, "Stasis threadpool allocation failed\n");
+               ast_log(LOG_ERROR, "Failed to create 'stasis-core' threadpool\n");
+
                return -1;
        }
 
@@ -568,8 +1835,10 @@ int stasis_init(void)
                return -1;
        }
 
-       __subscription_change_message_type = stasis_message_type_create("stasis_subscription_change");
-       if (!__subscription_change_message_type) {
+       if (STASIS_MESSAGE_TYPE_INIT(stasis_subscription_change_type) != 0) {
+               return -1;
+       }
+       if (STASIS_MESSAGE_TYPE_INIT(ast_multi_user_event_type) != 0) {
                return -1;
        }