This patch adds a new message bus API to Asterisk.
authorDavid M. Lee <dlee@digium.com>
Fri, 8 Mar 2013 15:15:13 +0000 (15:15 +0000)
committerDavid M. Lee <dlee@digium.com>
Fri, 8 Mar 2013 15:15:13 +0000 (15:15 +0000)
For the initial use of this bus, I took some work kmoore did creating
channel snapshots. So rather than create AMI events directly in the
channel code, this patch generates Stasis events, which manager.c uses
to then publish the AMI event.

This message bus provides a generic publish/subscribe mechanism within
Asterisk. This message bus is:

 - Loosely coupled; new message types can be added in seperate modules.
 - Easy to use; publishing and subscribing are straightforward
   operations.

In addition to basic publish/subscribe, the patch also provides
mechanisms for message forwarding, and for message caching.

(issue ASTERISK-20887)
(closes issue ASTERISK-20959)
Review: https://reviewboard.asterisk.org/r/2339/

git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@382685 65c4cc65-6c06-0410-ace0-fbb531ad65f3

13 files changed:
include/asterisk/channel.h
include/asterisk/channel_internal.h
include/asterisk/stasis.h [new file with mode: 0644]
main/asterisk.c
main/asterisk.exports.in
main/channel.c
main/channel_internal_api.c
main/manager.c
main/pbx.c
main/stasis.c [new file with mode: 0644]
main/stasis_cache.c [new file with mode: 0644]
main/stasis_message.c [new file with mode: 0644]
tests/test_stasis.c [new file with mode: 0644]

index f7e4d2d..eee0828 100644 (file)
@@ -125,7 +125,6 @@ References:
 
 #include "asterisk/abstract_jb.h"
 #include "asterisk/astobj2.h"
-
 #include "asterisk/poll-compat.h"
 
 #if defined(__cplusplus) || defined(c_plusplus)
@@ -151,6 +150,7 @@ extern "C" {
 #include "asterisk/channelstate.h"
 #include "asterisk/ccss.h"
 #include "asterisk/framehook.h"
+#include "asterisk/stasis.h"
 
 #define DATASTORE_INHERIT_FOREVER      INT_MAX
 
@@ -4102,4 +4102,119 @@ int ast_channel_dialed_causes_add(const struct ast_channel *chan, const struct a
 void ast_channel_dialed_causes_clear(const struct ast_channel *chan);
 
 struct ast_flags *ast_channel_flags(struct ast_channel *chan);
+
+/*!
+ * \since 12
+ * \brief Structure representing a snapshot of channel state.
+ *
+ * While not enforced programmatically, this object is shared across multiple
+ * threads, and should be threated as an immutable object.
+ */
+struct ast_channel_snapshot {
+       AST_DECLARE_STRING_FIELDS(
+               AST_STRING_FIELD(name);                 /*!< ASCII unique channel name */
+               AST_STRING_FIELD(accountcode);          /*!< Account code for billing */
+               AST_STRING_FIELD(peeraccount);          /*!< Peer account code for billing */
+               AST_STRING_FIELD(userfield);            /*!< Userfield for CEL billing */
+               AST_STRING_FIELD(uniqueid);             /*!< Unique Channel Identifier */
+               AST_STRING_FIELD(linkedid);             /*!< Linked Channel Identifier -- gets propagated by linkage */
+               AST_STRING_FIELD(parkinglot);           /*!< Default parking lot, if empty, default parking lot */
+               AST_STRING_FIELD(hangupsource);         /*!< Who is responsible for hanging up this channel */
+               AST_STRING_FIELD(appl);                 /*!< Current application */
+               AST_STRING_FIELD(data);                 /*!< Data passed to current application */
+               AST_STRING_FIELD(context);              /*!< Dialplan: Current extension context */
+               AST_STRING_FIELD(exten);                /*!< Dialplan: Current extension number */
+               AST_STRING_FIELD(caller_name);          /*!< Caller ID Name */
+               AST_STRING_FIELD(caller_number);        /*!< Caller ID Number */
+               AST_STRING_FIELD(connected_name);       /*!< Connected Line Name */
+               AST_STRING_FIELD(connected_number);     /*!< Connected Line Number */
+       );
+
+       struct timeval creationtime;    /*!< The time of channel creation */
+       enum ast_channel_state state;   /*!< State of line */
+       int priority;                   /*!< Dialplan: Current extension priority */
+       int amaflags;                   /*!< AMA flags for billing */
+       int hangupcause;                /*!< Why is the channel hanged up. See causes.h */
+       struct ast_flags flags;         /*!< channel flags of AST_FLAG_ type */
+};
+
+/*!
+ * \since 12
+ * \brief Generate a snapshot of the channel state. This is an ao2 object, so
+ * ao2_cleanup() to deallocate.
+ *
+ * \param chan The channel from which to generate a snapshot
+ *
+ * \retval pointer on success (must be ast_freed)
+ * \retval NULL on error
+ */
+struct ast_channel_snapshot *ast_channel_snapshot_create(struct ast_channel *chan);
+
+/*!
+ * \since 12
+ * \brief Message type for \ref ast_channel_snapshot.
+ *
+ * \retval Message type for \ref ast_channel_snapshot.
+ */
+struct stasis_message_type *ast_channel_snapshot(void);
+
+/*!
+ * \since 12
+ * \brief A topic which publishes the events for a particular channel.
+ *
+ * \param chan Channel.
+ *
+ * \retval Topic for channel's events.
+ * \retval \c NULL if \a chan is \c NULL.
+ */
+struct stasis_topic *ast_channel_topic(struct ast_channel *chan);
+
+/*!
+ * \since 12
+ * \brief A topic which publishes the events for all channels.
+ * \retval Topic for all channel events.
+ */
+struct stasis_topic *ast_channel_topic_all(void);
+
+/*!
+ * \since 12
+ * \brief A caching topic which caches \ref ast_channel_snapshot messages from
+ * ast_channel_events_all(void).
+ *
+ * \retval Topic for all channel events.
+ */
+struct stasis_caching_topic *ast_channel_topic_all_cached(void);
+
+/*!
+ * \since 12
+ * \brief Variable set event.
+ */
+struct ast_channel_varset {
+       /*! Channel variable was set on (or NULL for global variable) */
+       struct ast_channel_snapshot *snapshot;
+       /*! Variable name */
+       char *variable;
+       /*! New value */
+       char *value;
+};
+
+/*!
+ * \since 12
+ * \brief Message type for \ref ast_channel_varset messages.
+ *
+ * \retval Message type for \ref ast_channel_varset messages.
+ */
+struct stasis_message_type *ast_channel_varset(void);
+
+/*!
+ * \since 12
+ * \brief Publish a \ref ast_channel_varset for a channel.
+ *
+ * \param chan Channel to pulish the event for, or \c NULL for 'none'.
+ * \param variable Name of the variable being set
+ * \param value Value.
+ */
+void ast_channel_publish_varset(struct ast_channel *chan,
+                               const char *variable, const char *value);
+
 #endif /* _ASTERISK_CHANNEL_H */
index 1b01fe0..38776c1 100644 (file)
@@ -23,4 +23,5 @@ struct ast_channel *__ast_channel_internal_alloc(void (*destructor)(void *obj),
 void ast_channel_internal_finalize(struct ast_channel *chan);
 int ast_channel_internal_is_finalized(struct ast_channel *chan);
 void ast_channel_internal_cleanup(struct ast_channel *chan);
+void ast_channel_internal_setup_topics(struct ast_channel *chan);
 
diff --git a/include/asterisk/stasis.h b/include/asterisk/stasis.h
new file mode 100644 (file)
index 0000000..a6ab421
--- /dev/null
@@ -0,0 +1,506 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2013, Digium, Inc.
+ *
+ * David M. Lee, II <dlee@digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+#ifndef _ASTERISK_STASIS_H
+#define _ASTERISK_STASIS_H
+
+/*! \file
+ *
+ * \brief Stasis Message Bus API. See \ref stasis "Stasis Message Bus API" for
+ * detailed documentation.
+ *
+ * \author David M. Lee, II <dlee@digium.com>
+ * \since 12
+ *
+ * \page stasis Stasis Message Bus API
+ *
+ * \par Intro
+ *
+ * The Stasis Message Bus is a loosely typed mechanism for distributing messages
+ * within Asterisk. It is designed to be:
+ *  - Loosely coupled; new message types can be added in seperate modules.
+ *  - Easy to use; publishing and subscribing are straightforward operations.
+ *  - Consistent memory management; all message bus objects are AO2 managed
+ *    objects, using ao2_ref() and ao2_cleanup() to manage the reference
+ *    counting.
+ *
+ * There are three main concepts for using the Stasis Message Bus:
+ *  - \ref stasis_message
+ *  - \ref stasis_topic
+ *  - \ref stasis_subscription
+ *
+ * \par stasis_message
+ *
+ * Central to the Stasis Message Bus is the \ref stasis_message, the messages
+ * that are sent on the bus. These messages have:
+ *  - a type (as defined by a \ref stasis_message_type)
+ *  - a value - a \c void pointer to an AO2 object
+ *  - a timestamp when it was created
+ *
+ * Once a \ref stasis_message has been created, it is immutable and cannot
+ * change. The same goes for the value of the message (although this cannot be
+ * enforced in code). Messages themselves are reference-counted, AO2 objects,
+ * along with their values. By being both reference counted and immutable,
+ * messages can be shared throughout the system without any concerns for
+ * threading. (Well, the objects must be allocated with \ref
+ * AO2_ALLOC_OPT_LOCK_MUTEX so that the reference counting operations are thread
+ * safe. But other than that, no worries).
+ *
+ * The type of a message is defined by an instance of \ref stasis_message_type,
+ * which can be created by calling stasis_message_type_create(). Message types
+ * are named, which is useful in debugging. It is recommended that the string
+ * name for a message type match the name of the struct that's stored in the
+ * message. For example, name for \ref stasis_cache_update's message type is \c
+ * "stasis_cache_update".
+ *
+ * \par stasis_topic
+ *
+ * A \ref stasis_topic is an object to which \ref stasis_subscriber's may be
+ * subscribed, and \ref stasis_message's may be published. Any message published
+ * to the topic is dispatched to all of its subscribers. The topic itself may be
+ * named, which is useful in debugging.
+ *
+ * Topics themselves are reference counted objects, and automagically
+ * unsubscribe all of their subscribers when they are destroyed. Topics are also
+ * thread safe, so no worries about publishing/subscribing/unsubscribing to a
+ * topic concurrently from multiple threads. It's also designed to handle the
+ * case of unsubscribing from a topic from within the subscription handler.
+ *
+ * \par Forwarding
+ *
+ * There is one special case of topics that's worth noting: forwarding
+ * messages. It's a fairly common use case to want to forward all the messages
+ * published on one topic to another one (for example, an aggregator topic that
+ * publishes all the events from a set of other topics). This can be
+ * accomplished easily using stasis_forward_all(). This sets up the forwarding
+ * between the two topics, and returns a \ref stasis_subscription, which can be
+ * unsubscribed to stop the forwarding.
+ *
+ * \par Caching
+ *
+ * Another common use case is to want to cache certain messages that are
+ * published on the bus. Usually these events are snapshots of the current state
+ * in the system, and it's desirable to query that state from the cache without
+ * locking the original object. It's also desirable for subscribers of the
+ * caching topic to receive messages that have both the old cache value and the
+ * new value being put into the cache. For this, we have
+ * stasis_caching_topic_create(), providing it with the topic which publishes
+ * the messages that you wish to cache, and a function that can identify
+ * cacheable messages.
+ *
+ * The returned \ref stasis_caching_topic provides a topic that forwards
+ * non-cacheable messages unchanged. A cacheable message is wrapped in a \ref
+ * stasis_cache_update message which provides the old snapshot (or \c NULL if
+ * this is a new cache entry), and the new snapshot (or \c NULL if the entry was
+ * removed from the cache). A stasis_cache_clear_create() message must be sent
+ * to the topic in order to remove entries from the cache.
+ *
+ * As with all things Stasis, the \ref stasis_caching_topic is a reference
+ * counted AO2 object.
+ *
+ * \par stasis_subscriber
+ *
+ * Any topic may be subscribed to by simply providing stasis_subscribe() the
+ * \ref stasis_topic to subscribe to, a handler function and \c void pointer to
+ * data that is passed back to the handler. Invocations on the subscription's
+ * handler are serialized, but different invocations may occur on different
+ * threads (this usually isn't important unless you use thread locals or
+ * something similar).
+ *
+ * Since the topic (by necessity) holds a reference to the subscription,
+ * reference counting alone is insufficient to terminate a subscription. In
+ * order to stop receiving messages, call stasis_unsubscribe() with your \ref
+ * stasis_subscription. This will remove the topic's reference to the
+ * subscription, and allow it to be destroyed when all of the other references
+ * are cleaned up.
+ */
+
+#include "asterisk/utils.h"
+
+/*! @{ */
+
+/*!
+ * \brief Metadata about a \ref stasis_message.
+ * \since 12
+ */
+struct stasis_message_type;
+
+/*!
+ * \brief Register a new message type.
+ *
+ * \ref stasis_message_type is an AO2 object, so ao2_cleanup() when you're done
+ * with it.
+ *
+ * \param name Name of the new type.
+ * \return Pointer to the new type.
+ * \return \c NULL on error.
+ * \since 12
+ */
+struct stasis_message_type *stasis_message_type_create(const char *name);
+
+/*!
+ * \brief Gets the name of a given message type
+ * \param type The type to get.
+ * \return Name of the type.
+ * \return \c NULL if \a type is \c NULL.
+ * \since 12
+ */
+const char *stasis_message_type_name(const struct stasis_message_type *type);
+
+/*!
+ * \brief Opaque type for a Stasis message.
+ * \since 12
+ */
+struct stasis_message;
+
+/*!
+ * \brief Create a new message.
+ *
+ * This message is an \c ao2 object, and must be ao2_cleanup()'ed when you are done
+ * with it. Messages are also immutable, and must not be modified after they
+ * are initialized. Especially the \a data in the message.
+ *
+ * \param type Type of the message
+ * \param data Immutable data that is the actual contents of the message
+ * \return New message
+ * \return \c NULL on error
+ * \since 12
+ */
+struct stasis_message *stasis_message_create(struct stasis_message_type *type, void *data);
+
+/*!
+ * \brief Get the message type for a \ref stasis_message.
+ * \param msg Message to type
+ * \return Type of \a msg
+ * \return \c NULL if \a msg is \c NULL.
+ * \since 12
+ */
+struct stasis_message_type *stasis_message_type(const struct stasis_message *msg);
+
+/*!
+ * \brief Get the data contained in a message.
+ * \param msg Message.
+ * \return Immutable data pointer
+ * \return \c NULL if msg is \c NULL.
+ * \since 12
+ */
+void *stasis_message_data(const struct stasis_message *msg);
+
+/*!
+ * \brief Get the time when a message was created.
+ * \param msg Message.
+ * \return Pointer to the \a timeval when the message was created.
+ * \return \c NULL if msg is \c NULL.
+ * \since 12
+ */
+const struct timeval *stasis_message_timestamp(const struct stasis_message *msg);
+
+/*! @} */
+
+/*! @{ */
+
+/*!
+ * \brief A topic to which messages may be posted, and subscribers, well, subscribe
+ * \since 12
+ */
+struct stasis_topic;
+
+/*!
+ * \brief Create a new topic.
+ * \param name Name of the new topic.
+ * \return New topic instance.
+ * \return \c NULL on error.
+ * \since 12
+ */
+struct stasis_topic *stasis_topic_create(const char *name);
+
+/*!
+ * \brief Return the name of a topic.
+ * \param topic Topic.
+ * \return Name of the topic.
+ * \return \c NULL if topic is \c NULL.
+ * \since 12
+ */
+const char *stasis_topic_name(const struct stasis_topic *topic);
+
+/*!
+ * \brief Publish a message to a topic's subscribers.
+ * \param topic Topic.
+ * \param message Message to publish.
+ * \since 12
+ */
+void stasis_publish(struct stasis_topic *topic, struct stasis_message *message);
+
+/*!
+ * \brief Publish a message from a specified topic to all the subscribers of a
+ * possibly different topic.
+ * \param topic Topic to publish message to.
+ * \param topic Original topic message was from.
+ * \param message Message
+ * \since 12
+ */
+void stasis_forward_message(struct stasis_topic *topic,
+                           struct stasis_topic *publisher_topic,
+                           struct stasis_message *message);
+
+/*! @} */
+
+/*! @{ */
+
+/*!
+ * \brief Opaque type for a Stasis subscription.
+ * \since 12
+ */
+struct stasis_subscription;
+
+/*!
+ * \brief Callback function type for Stasis subscriptions.
+ * \param data Data field provided with subscription.
+ * \param topic Topic to which the message was published.
+ * \param message Published message.
+ * \since 12
+ */
+typedef void (*stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message);
+
+/*!
+ * \brief Create a subscription.
+ *
+ * In addition to being AO2 managed memory (requiring an ao2_cleanup() to free
+ * up this reference), the subscription must be explicitly unsubscribed from its
+ * topic using stasis_unsubscribe().
+ *
+ * The invocations of the callback are serialized, but may not always occur on
+ * the same thread. The invocation order of different subscriptions is
+ * unspecified.
+ *
+ * \param topic Topic to subscribe to.
+ * \param callback Callback function for subscription messages.
+ * \param data Data to be passed to the callback, in addition to the message.
+ * \return New \ref stasis_subscription object.
+ * \return \c NULL on error.
+ * \since 12
+ */
+struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic,
+                                            stasis_subscription_cb callback,
+                                            void *data);
+
+/*!
+ * \brief Cancel a subscription.
+ *
+ * Note that in an asynchronous system, there may still be messages queued or
+ * in transit to the subscription's callback. These will still be delivered.
+ * There will be a final 'SubscriptionCancelled' message, indicating the
+ * delivery of the final message.
+ *
+ * \param subscription Subscription to cancel.
+ * \since 12
+ */
+void stasis_unsubscribe(struct stasis_subscription *subscription);
+
+/*!
+ * \brief Create a subscription which forwards all messages from one topic to
+ * another.
+ *
+ * Note that the \a topic parameter of the invoked callback will the be \a topic
+ * the message was sent to, not the topic the subscriber subscribed to.
+ *
+ * \param from_topic Topic to forward.
+ * \param to_topic Destination topic of forwarded messages.
+ * \return New forwarding subscription.
+ * \return \c NULL on error.
+ * \since 12
+ */
+struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic);
+
+/*!
+ * \brief Get the unique ID for the subscription.
+ *
+ * \param sub Subscription for which to get the unique ID.
+ * \return Unique ID for the subscription.
+ * \since 12
+ */
+const char *stasis_subscription_uniqueid(const struct stasis_subscription *sub);
+
+/*!
+ * \brief Returns whether a subscription is currently subscribed.
+ *
+ * Note that there may still be messages queued up to be dispatched to this
+ * subscription, but the stasis_subscription_final_message() has been enqueued.
+ *
+ * \param sub Subscription to check
+ * \return False (zero) if subscription is not subscribed.
+ * \return True (non-zero) if still subscribed.
+ */
+int stasis_subscription_is_subscribed(const struct stasis_subscription *sub);
+
+/*!
+ * \brief Determine whether a message is the final message to be received on a subscription.
+ *
+ * \param sub Subscription on which the message was received.
+ * \param msg Message to check.
+ * \return zero if the provided message is not the final message.
+ * \return non-zero if the provided message is the final message.
+ * \since 12
+ */
+int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg);
+
+/*!
+ * \brief Holds details about changes to subscriptions for the specified topic
+ * \since 12
+ */
+struct stasis_subscription_change {
+       AST_DECLARE_STRING_FIELDS(
+               AST_STRING_FIELD(uniqueid);     /*!< The unique ID associated with this subscription */
+               AST_STRING_FIELD(description);  /*!< The description of the change to the subscription associated with the uniqueid */
+       );
+       struct stasis_topic *topic;             /*!< The topic the subscription is/was subscribing to */
+};
+
+/*!
+ * \brief Gets the message type for subscription change notices
+ * \return The stasis_message_type for subscription change notices
+ * \since 12
+ */
+struct stasis_message_type *stasis_subscription_change(void);
+
+/*! @} */
+
+/*! @{ */
+
+/*!
+ * \brief A topic wrapper, which caches certain messages.
+ * \since 12
+ */
+struct stasis_caching_topic;
+
+/*!
+ * \brief Message type for cache update messages.
+ * \return Message type for cache update messages.
+ * \since 12
+ */
+struct stasis_message_type *stasis_cache_update(void);
+
+/*!
+ * \brief Cache update message
+ * \since 12
+ */
+struct stasis_cache_update {
+       /*! \brief Topic that published \c new_snapshot */
+       struct stasis_topic *topic;
+       /*! \brief Convenience reference to snapshot type */
+       struct stasis_message_type *type;
+       /*! \brief Old value from the cache */
+       struct stasis_message *old_snapshot;
+       /*! \brief New value */
+       struct stasis_message *new_snapshot;
+};
+
+/*!
+ * \brief A message which instructs the caching topic to remove an entry from its cache.
+ * \param type Message type.
+ * \param id Unique id of the snapshot to clear.
+ * \return Message which, when sent to the \a topic, will clear the item from the cache.
+ * \return \c NULL on error.
+ * \since 12
+ */
+struct stasis_message *stasis_cache_clear_create(struct stasis_message_type *type, const char *id);
+
+/*!
+ * \brief Callback extract a unique identity from a snapshot message.
+ *
+ * This identity is unique to the underlying object of the snapshot, such as the
+ * UniqueId field of a channel.
+ *
+ * \param message Message to extract id from.
+ * \return String representing the snapshot's id.
+ * \return \c NULL if the message_type of the message isn't a handled snapshot.
+ * \since 12
+ */
+typedef const char *(*snapshot_get_id)(struct stasis_message *message);
+
+/*!
+ * \brief Create a topic which monitors and caches messages from another topic.
+ *
+ * The idea is that some topics publish 'snapshots' of some other object's state
+ * that should be cached. When these snapshot messages are received, the cache
+ * is updated, and a stasis_cache_update() message is forwarded, which has both
+ * the original snapshot message and the new message.
+ *
+ * \param original_topic Topic publishing snapshot messages.
+ * \param id_fn Callback to extract the id from a snapshot message.
+ * \return New topic which changes snapshot messages to stasis_cache_update()
+ *         messages, and forwards all other messages from the original topic.
+ * \since 12
+ */
+struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, snapshot_get_id id_fn);
+
+/*!
+ * Unsubscribes a caching topic from its upstream topic.
+ * \param caching_topic Caching topic to unsubscribe
+ * \since 12
+ */
+void stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic);
+
+/*!
+ * \brief Returns the topic of cached events from a caching topics.
+ * \param caching_topic The caching topic.
+ * \return The topic that publishes cache update events, along with passthrough events
+ *         from the underlying topic.
+ * \return \c NULL if \a caching_topic is \c NULL.
+ * \since 12
+ */
+struct stasis_topic *stasis_caching_get_topic(struct stasis_caching_topic *caching_topic);
+
+/*!
+ * \brief Retrieve an item from the cache.
+ * \param caching_topic The topic returned from stasis_caching_topic_create().
+ * \param type Type of message to retrieve.
+ * \param id Identity of the snapshot to retrieve.
+ * \return Message from the cache. The cache still owns the message, so
+ *         ao2_ref() if you want to keep it.
+ * \return \c NULL if message is not found.
+ * \since 12
+ */
+struct stasis_message *stasis_cache_get(struct stasis_caching_topic *caching_topic,
+                                       struct stasis_message_type *type,
+                                       const char *id);
+
+/*! @} */
+
+/*! @{ */
+
+/*!
+ * \brief Initialize the Stasis subsystem
+ * \return 0 on success.
+ * \return Non-zero on error.
+ * \since 12
+ */
+int stasis_init(void);
+
+/*!
+ * \private
+ * \brief called by stasis_init() for cache initialization.
+ * \return 0 on success.
+ * \return Non-zero on error.
+ * \since 12
+ */
+int stasis_cache_init(void);
+
+/*! @} */
+
+#endif /* _ASTERISK_STASIS_H */
index 1d53719..4e5e58c 100644 (file)
@@ -240,6 +240,7 @@ int daemon(int, int);  /* defined in libresolv of all places */
 #include "asterisk/aoc.h"
 #include "asterisk/uuid.h"
 #include "asterisk/sorcery.h"
+#include "asterisk/stasis.h"
 
 #include "../defaults.h"
 
@@ -4120,6 +4121,11 @@ int main(int argc, char *argv[])
                exit(1);
        }
 
+       if (stasis_init()) {
+               printf("Stasis initialization failed.\n%s", term_quit());
+               exit(1);
+       }
+
        ast_makesocket();
        sigemptyset(&sigs);
        sigaddset(&sigs, SIGHUP);
index 49d3a44..3157b83 100644 (file)
@@ -32,6 +32,7 @@
                LINKER_SYMBOL_PREFIXdialed_interface_info;
                LINKER_SYMBOL_PREFIXstrsep;
                LINKER_SYMBOL_PREFIXsetenv;
+               LINKER_SYMBOL_PREFIXstasis_*;
                LINKER_SYMBOL_PREFIXunsetenv;
                LINKER_SYMBOL_PREFIXstrcasestr;
                LINKER_SYMBOL_PREFIXstrnlen;
index 9c8f32c..df0a67b 100644 (file)
@@ -152,6 +152,15 @@ static AST_RWLIST_HEAD_STATIC(backends, chanlist);
 /*! \brief All active channels on the system */
 static struct ao2_container *channels;
 
+/*! \brief Message type for channel snapshot events */
+static struct stasis_message_type *__channel_snapshot;
+
+static struct stasis_message_type *__channel_varset;
+
+struct stasis_topic *__channel_topic_all;
+
+struct stasis_caching_topic *__channel_topic_all_cached;
+
 /*! \brief map AST_CAUSE's to readable string representations
  *
  * \ref causes.h
@@ -214,6 +223,77 @@ static const struct causes_map causes[] = {
        { AST_CAUSE_INTERWORKING, "INTERWORKING", "Interworking, unspecified" },
 };
 
+static void publish_channel_state(struct ast_channel *chan)
+{
+       RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
+
+       snapshot = ast_channel_snapshot_create(chan);
+       if (!snapshot) {
+               ast_log(LOG_ERROR, "Allocation error\n");
+               return;
+       }
+
+       message = stasis_message_create(ast_channel_snapshot(), snapshot);
+       if (!message) {
+               return;
+       }
+
+       ast_assert(ast_channel_topic(chan) != NULL);
+       stasis_publish(ast_channel_topic(chan), message);
+}
+
+static void channel_varset_dtor(void *obj)
+{
+       struct ast_channel_varset *event = obj;
+       ao2_cleanup(event->snapshot);
+       ast_free(event->variable);
+       ast_free(event->value);
+}
+
+void ast_channel_publish_varset(struct ast_channel *chan, const char *name, const char *value)
+{
+       RAII_VAR(struct ast_channel_varset *, event, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
+
+       event = ao2_alloc(sizeof(*event), channel_varset_dtor);
+       if (!event) {
+               return;
+       }
+
+       if (chan) {
+               event->snapshot = ast_channel_snapshot_create(chan);
+               if (event->snapshot == NULL) {
+                       return;
+               }
+       }
+       event->variable = ast_strdup(name);
+       event->value = ast_strdup(value);
+       if (event->variable == NULL || event->value == NULL) {
+               return;
+       }
+
+       msg = stasis_message_create(ast_channel_varset(), event);
+       if (!msg) {
+               return;
+       }
+
+       if (chan) {
+               stasis_publish(ast_channel_topic(chan), msg);
+       } else {
+               stasis_publish(ast_channel_topic_all(), msg);
+       }
+}
+
+
+static void publish_cache_clear(struct ast_channel *chan)
+{
+       RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
+
+       message = stasis_cache_clear_create(ast_channel_snapshot(), ast_channel_uniqueid(chan));
+       stasis_publish(ast_channel_topic(chan), message);
+}
+
 struct ast_variable *ast_channeltype_list(void)
 {
        struct chanlist *cl;
@@ -1073,6 +1153,8 @@ __ast_channel_alloc_ap(int needqueue, int state, const char *cid_num, const char
                ast_channel_linkedid_set(tmp, ast_channel_uniqueid(tmp));
        }
 
+       ast_channel_internal_setup_topics(tmp);
+
        if (!ast_strlen_zero(name_fmt)) {
                char *slash, *slash2;
                /* Almost every channel is calling this function, and setting the name via the ast_string_field_build() call.
@@ -1145,34 +1227,7 @@ __ast_channel_alloc_ap(int needqueue, int state, const char *cid_num, const char
         * a lot of data into this func to do it here!
         */
        if (ast_get_channel_tech(tech) || (tech2 && ast_get_channel_tech(tech2))) {
-               /*** DOCUMENTATION
-                       <managerEventInstance>
-                               <synopsis>Raised when a new channel is created.</synopsis>
-                               <syntax>
-                                       <xi:include xpointer="xpointer(/docs/managerEvent[@name='Newstate']/managerEventInstance/syntax/parameter[@name='ChannelState'])" />
-                                       <xi:include xpointer="xpointer(/docs/managerEvent[@name='Newstate']/managerEventInstance/syntax/parameter[@name='ChannelStateDesc'])" />
-                               </syntax>
-                       </managerEventInstance>
-               ***/
-               ast_manager_event(tmp, EVENT_FLAG_CALL, "Newchannel",
-                       "Channel: %s\r\n"
-                       "ChannelState: %d\r\n"
-                       "ChannelStateDesc: %s\r\n"
-                       "CallerIDNum: %s\r\n"
-                       "CallerIDName: %s\r\n"
-                       "AccountCode: %s\r\n"
-                       "Exten: %s\r\n"
-                       "Context: %s\r\n"
-                       "Uniqueid: %s\r\n",
-                       ast_channel_name(tmp),
-                       state,
-                       ast_state2str(state),
-                       S_OR(cid_num, ""),
-                       S_OR(cid_name, ""),
-                       ast_channel_accountcode(tmp),
-                       S_OR(exten, ""),
-                       S_OR(context, ""),
-                       ast_channel_uniqueid(tmp));
+               publish_channel_state(tmp);
        }
 
        ast_channel_internal_finalize(tmp);
@@ -2893,39 +2948,9 @@ int ast_hangup(struct ast_channel *chan)
        ast_channel_unlock(chan);
 
        ast_cc_offer(chan);
-       /*** DOCUMENTATION
-               <managerEventInstance>
-                       <synopsis>Raised when a channel is hung up.</synopsis>
-                               <syntax>
-                                       <parameter name="Cause">
-                                               <para>A numeric cause code for why the channel was hung up.</para>
-                                       </parameter>
-                                       <parameter name="Cause-txt">
-                                               <para>A description of why the channel was hung up.</para>
-                                       </parameter>
-                               </syntax>
-               </managerEventInstance>
-       ***/
-       ast_manager_event(chan, EVENT_FLAG_CALL, "Hangup",
-               "Channel: %s\r\n"
-               "Uniqueid: %s\r\n"
-               "CallerIDNum: %s\r\n"
-               "CallerIDName: %s\r\n"
-               "ConnectedLineNum: %s\r\n"
-               "ConnectedLineName: %s\r\n"
-               "AccountCode: %s\r\n"
-               "Cause: %d\r\n"
-               "Cause-txt: %s\r\n",
-               ast_channel_name(chan),
-               ast_channel_uniqueid(chan),
-               S_COR(ast_channel_caller(chan)->id.number.valid, ast_channel_caller(chan)->id.number.str, "<unknown>"),
-               S_COR(ast_channel_caller(chan)->id.name.valid, ast_channel_caller(chan)->id.name.str, "<unknown>"),
-               S_COR(ast_channel_connected(chan)->id.number.valid, ast_channel_connected(chan)->id.number.str, "<unknown>"),
-               S_COR(ast_channel_connected(chan)->id.name.valid, ast_channel_connected(chan)->id.name.str, "<unknown>"),
-               ast_channel_accountcode(chan),
-               ast_channel_hangupcause(chan),
-               ast_cause2str(ast_channel_hangupcause(chan))
-               );
+
+       publish_channel_state(chan);
+       publish_cache_clear(chan);
 
        if (ast_channel_cdr(chan) && !ast_test_flag(ast_channel_cdr(chan), AST_CDR_FLAG_BRIDGED) &&
                !ast_test_flag(ast_channel_cdr(chan), AST_CDR_FLAG_POST_DISABLED) &&
@@ -7435,47 +7460,7 @@ int ast_setstate(struct ast_channel *chan, enum ast_channel_state state)
         * we override what they are saying the state is and things go amuck. */
        ast_devstate_changed_literal(AST_DEVICE_UNKNOWN, (ast_test_flag(ast_channel_flags(chan), AST_FLAG_DISABLE_DEVSTATE_CACHE) ? AST_DEVSTATE_NOT_CACHABLE : AST_DEVSTATE_CACHABLE), name);
 
-       /* setstate used to conditionally report Newchannel; this is no more */
-       /*** DOCUMENTATION
-               <managerEventInstance>
-                       <synopsis>Raised when a channel's state changes.</synopsis>
-                       <syntax>
-                               <parameter name="ChannelState">
-                                       <para>A numeric code for the channel's current state, related to ChannelStateDesc</para>
-                               </parameter>
-                               <parameter name="ChannelStateDesc">
-                                       <enumlist>
-                                               <enum name="Down"/>
-                                               <enum name="Rsrvd"/>
-                                               <enum name="OffHook"/>
-                                               <enum name="Dialing"/>
-                                               <enum name="Ring"/>
-                                               <enum name="Ringing"/>
-                                               <enum name="Up"/>
-                                               <enum name="Busy"/>
-                                               <enum name="Dialing Offhook"/>
-                                               <enum name="Pre-ring"/>
-                                               <enum name="Unknown"/>
-                                       </enumlist>
-                               </parameter>
-                       </syntax>
-               </managerEventInstance>
-       ***/
-       ast_manager_event(chan, EVENT_FLAG_CALL, "Newstate",
-               "Channel: %s\r\n"
-               "ChannelState: %d\r\n"
-               "ChannelStateDesc: %s\r\n"
-               "CallerIDNum: %s\r\n"
-               "CallerIDName: %s\r\n"
-               "ConnectedLineNum: %s\r\n"
-               "ConnectedLineName: %s\r\n"
-               "Uniqueid: %s\r\n",
-               ast_channel_name(chan), ast_channel_state(chan), ast_state2str(ast_channel_state(chan)),
-               S_COR(ast_channel_caller(chan)->id.number.valid, ast_channel_caller(chan)->id.number.str, ""),
-               S_COR(ast_channel_caller(chan)->id.name.valid, ast_channel_caller(chan)->id.name.str, ""),
-               S_COR(ast_channel_connected(chan)->id.number.valid, ast_channel_connected(chan)->id.number.str, ""),
-               S_COR(ast_channel_connected(chan)->id.name.valid, ast_channel_connected(chan)->id.name.str, ""),
-               ast_channel_uniqueid(chan));
+       publish_channel_state(chan);
 
        return 0;
 }
@@ -8644,6 +8629,14 @@ static void prnt_channel_key(void *v_obj, void *where, ao2_prnt_fn *prnt)
 
 static void channels_shutdown(void)
 {
+       ao2_cleanup(__channel_snapshot);
+       __channel_snapshot = NULL;
+       ao2_cleanup(__channel_varset);
+       __channel_varset = NULL;
+       ao2_cleanup(__channel_topic_all);
+       __channel_topic_all = NULL;
+       stasis_caching_unsubscribe(__channel_topic_all_cached);
+       __channel_topic_all_cached = NULL;
        ast_data_unregister(NULL);
        ast_cli_unregister_multiple(cli_channel, ARRAY_LEN(cli_channel));
        if (channels) {
@@ -8653,6 +8646,16 @@ static void channels_shutdown(void)
        }
 }
 
+static const char *channel_snapshot_get_id(struct stasis_message *message)
+{
+       struct ast_channel_snapshot *snapshot;
+       if (ast_channel_snapshot() != stasis_message_type(message)) {
+               return NULL;
+       }
+       snapshot = stasis_message_data(message);
+       return snapshot->uniqueid;
+}
+
 void ast_channels_init(void)
 {
        channels = ao2_container_alloc(NUM_CHANNEL_BUCKETS,
@@ -8661,6 +8664,12 @@ void ast_channels_init(void)
                ao2_container_register("channels", channels, prnt_channel_key);
        }
 
+       __channel_snapshot = stasis_message_type_create("ast_channel_snapshot");
+       __channel_varset = stasis_message_type_create("ast_channel_varset");
+
+       __channel_topic_all = stasis_topic_create("ast_channel_topic_all");
+       __channel_topic_all_cached = stasis_caching_topic_create(__channel_topic_all, channel_snapshot_get_id);
+
        ast_cli_register_multiple(cli_channel, ARRAY_LEN(cli_channel));
 
        ast_data_register_multiple_core(channel_providers, ARRAY_LEN(channel_providers));
@@ -8668,6 +8677,7 @@ void ast_channels_init(void)
        ast_plc_reload();
 
        ast_register_atexit(channels_shutdown);
+
 }
 
 /*! \brief Print call group and pickup group ---*/
@@ -11241,6 +11251,79 @@ int ast_channel_get_cc_agent_type(struct ast_channel *chan, char *agent_type, si
        return 0;
 }
 
+static void ast_channel_snapshot_dtor(void *obj)
+{
+       struct ast_channel_snapshot *snapshot = obj;
+       ast_string_field_free_memory(snapshot);
+}
+
+struct ast_channel_snapshot *ast_channel_snapshot_create(struct ast_channel *chan)
+{
+       RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
+
+       snapshot = ao2_alloc(sizeof(*snapshot), ast_channel_snapshot_dtor);
+       if (ast_string_field_init(snapshot, 1024)) {
+               return NULL;
+       }
+
+       ast_string_field_set(snapshot, name, ast_channel_name(chan));
+       ast_string_field_set(snapshot, accountcode, ast_channel_accountcode(chan));
+       ast_string_field_set(snapshot, peeraccount, ast_channel_peeraccount(chan));
+       ast_string_field_set(snapshot, userfield, ast_channel_userfield(chan));
+       ast_string_field_set(snapshot, uniqueid, ast_channel_uniqueid(chan));
+       ast_string_field_set(snapshot, linkedid, ast_channel_linkedid(chan));
+       ast_string_field_set(snapshot, parkinglot, ast_channel_parkinglot(chan));
+       ast_string_field_set(snapshot, hangupsource, ast_channel_hangupsource(chan));
+       if (ast_channel_appl(chan)) {
+               ast_string_field_set(snapshot, appl, ast_channel_appl(chan));
+       }
+       if (ast_channel_data(chan)) {
+               ast_string_field_set(snapshot, data, ast_channel_data(chan));
+       }
+       ast_string_field_set(snapshot, context, ast_channel_context(chan));
+       ast_string_field_set(snapshot, exten, ast_channel_exten(chan));
+
+       ast_string_field_set(snapshot, caller_name,
+               S_COR(ast_channel_caller(chan)->id.name.valid, ast_channel_caller(chan)->id.name.str, ""));
+       ast_string_field_set(snapshot, caller_number,
+               S_COR(ast_channel_caller(chan)->id.number.valid, ast_channel_caller(chan)->id.number.str, ""));
+
+       ast_string_field_set(snapshot, connected_name,
+               S_COR(ast_channel_connected(chan)->id.name.valid, ast_channel_connected(chan)->id.name.str, ""));
+       ast_string_field_set(snapshot, connected_number,
+               S_COR(ast_channel_connected(chan)->id.number.valid, ast_channel_connected(chan)->id.number.str, ""));
+
+       snapshot->creationtime = ast_channel_creationtime(chan);
+       snapshot->state = ast_channel_state(chan);
+       snapshot->priority = ast_channel_priority(chan);
+       snapshot->amaflags = ast_channel_amaflags(chan);
+       snapshot->hangupcause = ast_channel_hangupcause(chan);
+       snapshot->flags = *ast_channel_flags(chan);
+
+       ao2_ref(snapshot, +1);
+       return snapshot;
+}
+
+struct stasis_message_type *ast_channel_varset(void)
+{
+       return __channel_varset;
+}
+
+struct stasis_message_type *ast_channel_snapshot(void)
+{
+       return __channel_snapshot;
+}
+
+struct stasis_topic *ast_channel_topic_all(void)
+{
+       return __channel_topic_all;
+}
+
+struct stasis_caching_topic *ast_channel_topic_all_cached(void)
+{
+       return __channel_topic_all_cached;
+}
+
 /* DO NOT PUT ADDITIONAL FUNCTIONS BELOW THIS BOUNDARY
  *
  * ONLY FUNCTIONS FOR PROVIDING BACKWARDS ABI COMPATIBILITY BELONG HERE
index 3f892dd..8cc2e6c 100644 (file)
@@ -195,6 +195,8 @@ struct ast_channel {
        char dtmf_digit_to_emulate;                     /*!< Digit being emulated */
        char sending_dtmf_digit;                        /*!< Digit this channel is currently sending out. (zero if not sending) */
        struct timeval sending_dtmf_tv;         /*!< The time this channel started sending the current digit. (Invalid if sending_dtmf_digit is zero.) */
+       struct stasis_topic *topic;                     /*!< Topic for all channel's events */
+       struct stasis_subscription *forwarder;          /*!< Subscription for event forwarding to all topic */
 };
 
 /* AST_DATA definitions, which will probably have to be re-thought since the channel will be opaque */
@@ -1364,6 +1366,12 @@ void ast_channel_internal_cleanup(struct ast_channel *chan)
        }
 
        ast_string_field_free_memory(chan);
+
+       stasis_unsubscribe(chan->forwarder);
+       chan->forwarder = NULL;
+
+       ao2_cleanup(chan->topic);
+       chan->topic = NULL;
 }
 
 void ast_channel_internal_finalize(struct ast_channel *chan)
@@ -1375,3 +1383,16 @@ int ast_channel_internal_is_finalized(struct ast_channel *chan)
 {
        return chan->finalized;
 }
+
+struct stasis_topic *ast_channel_topic(struct ast_channel *chan)
+{
+       return chan->topic;
+}
+
+void ast_channel_internal_setup_topics(struct ast_channel *chan)
+{
+       ast_assert(chan->topic == NULL);
+       ast_assert(chan->forwarder == NULL);
+       chan->topic = stasis_topic_create(chan->uniqueid);
+       chan->forwarder = stasis_forward_all(chan->topic, ast_channel_topic_all());
+}
index fc0ec26..10a3a33 100644 (file)
@@ -91,6 +91,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
 #include "asterisk/aoc.h"
 #include "asterisk/stringfields.h"
 #include "asterisk/presencestate.h"
+#include "asterisk/stasis.h"
 
 /*** DOCUMENTATION
        <manager name="Ping" language="en_US">
@@ -963,6 +964,73 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
                         manager.conf will be present upon starting a new session.</para>
                </description>
        </manager>
+       <managerEvent language="en_US" name="Newchannel">
+               <managerEventInstance class="EVENT_FLAG_CALL">
+                       <synopsis>Raised when a new channel is created.</synopsis>
+                       <syntax>
+                               <parameter name="Channel">
+                               </parameter>
+                               <parameter name="ChannelState">
+                                       <para>A numeric code for the channel's current state, related to ChannelStateDesc</para>
+                               </parameter>
+                               <parameter name="ChannelStateDesc">
+                                       <enumlist>
+                                               <enum name="Down"/>
+                                               <enum name="Rsrvd"/>
+                                               <enum name="OffHook"/>
+                                               <enum name="Dialing"/>
+                                               <enum name="Ring"/>
+                                               <enum name="Ringing"/>
+                                               <enum name="Up"/>
+                                               <enum name="Busy"/>
+                                               <enum name="Dialing Offhook"/>
+                                               <enum name="Pre-ring"/>
+                                               <enum name="Unknown"/>
+                                       </enumlist>
+                               </parameter>
+                               <parameter name="CallerIDNum">
+                               </parameter>
+                               <parameter name="CallerIDName">
+                               </parameter>
+                               <parameter name="ConnectedLineNum">
+                               </parameter>
+                               <parameter name="ConnectedLineName">
+                               </parameter>
+                               <parameter name="AccountCode">
+                               </parameter>
+                               <parameter name="Context">
+                               </parameter>
+                               <parameter name="Exten">
+                               </parameter>
+                               <parameter name="Priority">
+                               </parameter>
+                               <parameter name="Uniqueid">
+                               </parameter>
+                               <parameter name="Cause">
+                                       <para>A numeric cause code for why the channel was hung up.</para>
+                               </parameter>
+                               <parameter name="Cause-txt">
+                                       <para>A description of why the channel was hung up.</para>
+                               </parameter>
+                       </syntax>
+               </managerEventInstance>
+       </managerEvent>
+       <managerEvent language="en_US" name="Newstate">
+               <managerEventInstance class="EVENT_FLAG_CALL">
+                       <synopsis>Raised when a channel's state changes.</synopsis>
+                       <syntax>
+                               <xi:include xpointer="xpointer(/docs/managerEvent[@name='Newchannel']/managerEventInstance/syntax/parameter)" />
+                       </syntax>
+               </managerEventInstance>
+       </managerEvent>
+       <managerEvent language="en_US" name="Hangup">
+               <managerEventInstance class="EVENT_FLAG_CALL">
+                       <synopsis>Raised when a channel is hung up.</synopsis>
+                       <syntax>
+                               <xi:include xpointer="xpointer(/docs/managerEvent[@name='Newchannel']/managerEventInstance/syntax/parameter)" />
+                       </syntax>
+               </managerEventInstance>
+       </managerEvent>
  ***/
 
 /*! \addtogroup Group_AMI AMI functions
@@ -1060,6 +1128,8 @@ static const struct {
        {{ "restart", "gracefully", NULL }},
 };
 
+static struct stasis_subscription *channel_state_sub;
+
 static void acl_change_event_cb(const struct ast_event *event, void *userdata);
 
 static void acl_change_event_subscribe(void)
@@ -7376,6 +7446,127 @@ static void load_channelvars(struct ast_variable *var)
        AST_RWLIST_UNLOCK(&channelvars);
 }
 
+/*!
+ * \brief Generate the AMI message body from a channel snapshot
+ * \internal
+ *
+ * \param snapshot the channel snapshot for which to generate an AMI message body
+ *
+ * \retval NULL on error
+ * \retval ast_str* on success (must be ast_freed by caller)
+ */
+static struct ast_str *manager_build_channel_state_string(const struct ast_channel_snapshot *snapshot)
+{
+       struct ast_str *out = ast_str_create(1024);
+       int res = 0;
+       if (!out) {
+               return NULL;
+       }
+       res = ast_str_set(&out, 0,
+               "Channel: %s\r\n"
+               "ChannelState: %d\r\n"
+               "ChannelStateDesc: %s\r\n"
+               "CallerIDNum: %s\r\n"
+               "CallerIDName: %s\r\n"
+               "ConnectedLineNum: %s\r\n"
+               "ConnectedLineName: %s\r\n"
+               "AccountCode: %s\r\n"
+               "Context: %s\r\n"
+               "Exten: %s\r\n"
+               "Priority: %d\r\n"
+               "Uniqueid: %s\r\n"
+               "Cause: %d\r\n"
+               "Cause-txt: %s\r\n",
+               snapshot->name,
+               snapshot->state,
+               ast_state2str(snapshot->state),
+               snapshot->caller_number,
+               snapshot->caller_name,
+               snapshot->connected_number,
+               snapshot->connected_name,
+               snapshot->accountcode,
+               snapshot->context,
+               snapshot->exten,
+               snapshot->priority,
+               snapshot->uniqueid,
+               snapshot->hangupcause,
+               ast_cause2str(snapshot->hangupcause));
+
+       if (!res) {
+               return NULL;
+       }
+
+       return out;
+}
+
+static void channel_snapshot_update(struct ast_channel_snapshot *old_snapshot, struct ast_channel_snapshot *new_snapshot)
+{
+       int is_hungup;
+       char *manager_event = NULL;
+
+       if (!new_snapshot) {
+               /* Ignore cache clearing events; we'll see the hangup first */
+               return;
+       }
+
+       is_hungup = ast_test_flag(&new_snapshot->flags, AST_FLAG_ZOMBIE) ? 1 : 0;
+
+       if (!old_snapshot) {
+               manager_event = "Newchannel";
+       }
+
+       if (old_snapshot && old_snapshot->state != new_snapshot->state) {
+               manager_event = "Newstate";
+       }
+
+       if (old_snapshot && is_hungup) {
+               manager_event = "Hangup";
+       }
+
+       if (manager_event) {
+               RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
+
+               channel_event_string = manager_build_channel_state_string(new_snapshot);
+               if (channel_event_string) {
+                       manager_event(EVENT_FLAG_CALL, manager_event, "%s", ast_str_buffer(channel_event_string));
+               }
+       }
+}
+
+static void channel_varset(const char *channel_name, const char *uniqueid, const char *name, const char *value)
+{
+       /*** DOCUMENTATION
+               <managerEventInstance>
+                       <synopsis>Raised when a variable is set to a particular value.</synopsis>
+               </managerEventInstance>
+       ***/
+       manager_event(EVENT_FLAG_DIALPLAN, "VarSet",
+                     "Channel: %s\r\n"
+                     "Variable: %s\r\n"
+                     "Value: %s\r\n"
+                     "Uniqueid: %s\r\n",
+                     channel_name, name, value, uniqueid);
+}
+
+static void channel_event_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
+{
+       if (stasis_message_type(message) == stasis_cache_update()) {
+               struct stasis_cache_update *update = stasis_message_data(message);
+               if (ast_channel_snapshot() == update->type) {
+                       struct ast_channel_snapshot *old_snapshot =
+                               stasis_message_data(update->old_snapshot);
+                       struct ast_channel_snapshot *new_snapshot =
+                               stasis_message_data(update->new_snapshot);
+                       channel_snapshot_update(old_snapshot, new_snapshot);
+               }
+       } else if (stasis_message_type(message) == ast_channel_varset()) {
+               struct ast_channel_varset *varset = stasis_message_data(message);
+               const char *name = varset->snapshot ? varset->snapshot->name : "none";
+               const char *uniqueid = varset->snapshot ? varset->snapshot->uniqueid : "none";
+               channel_varset(name, uniqueid, varset->variable, varset->value);
+       }
+}
+
 /*! \internal \brief Free a user record.  Should already be removed from the list */
 static void manager_free_user(struct ast_manager_user *user)
 {
@@ -7399,6 +7590,9 @@ static void manager_shutdown(void)
 {
        struct ast_manager_user *user;
 
+       stasis_unsubscribe(channel_state_sub);
+       channel_state_sub = NULL;
+
        if (registered) {
                ast_manager_unregister("Ping");
                ast_manager_unregister("Events");
@@ -7490,6 +7684,12 @@ static int __init_manager(int reload, int by_external_config)
 
        manager_enabled = 0;
 
+       if (!channel_state_sub) {
+               channel_state_sub = stasis_subscribe(
+                       stasis_caching_get_topic(ast_channel_topic_all_cached()),
+                       channel_event_cb, NULL);
+       }
+
        if (!registered) {
                /* Register default actions */
                ast_manager_register_xml_core("Ping", 0, action_ping);
index bf95ccb..82bbb52 100644 (file)
@@ -11453,18 +11453,7 @@ int pbx_builtin_setvar_helper(struct ast_channel *chan, const char *name, const
                        ast_verb(2, "Setting global variable '%s' to '%s'\n", name, value);
                newvariable = ast_var_assign(name, value);
                AST_LIST_INSERT_HEAD(headp, newvariable, entries);
-               /*** DOCUMENTATION
-                       <managerEventInstance>
-                               <synopsis>Raised when a variable is set to a particular value.</synopsis>
-                       </managerEventInstance>
-               ***/
-               manager_event(EVENT_FLAG_DIALPLAN, "VarSet",
-                       "Channel: %s\r\n"
-                       "Variable: %s\r\n"
-                       "Value: %s\r\n"
-                       "Uniqueid: %s\r\n",
-                       chan ? ast_channel_name(chan) : "none", name, value,
-                       chan ? ast_channel_uniqueid(chan) : "none");
+               ast_channel_publish_varset(chan, name, value);
        }
 
        if (chan)
diff --git a/main/stasis.c b/main/stasis.c
new file mode 100644 (file)
index 0000000..f94736b
--- /dev/null
@@ -0,0 +1,514 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2013, Digium, Inc.
+ *
+ * David M. Lee, II <dlee@digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*! \file
+ *
+ * \brief Stasis Message Bus API.
+ *
+ * \author David M. Lee, II <dlee@digium.com>
+ */
+
+/*** MODULEINFO
+       <support_level>core</support_level>
+ ***/
+
+#include "asterisk.h"
+
+ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
+
+#include "asterisk/astobj2.h"
+#include "asterisk/stasis.h"
+#include "asterisk/threadpool.h"
+#include "asterisk/taskprocessor.h"
+#include "asterisk/utils.h"
+#include "asterisk/uuid.h"
+
+/*! Initial size of the subscribers list. */
+#define INITIAL_SUBSCRIBERS_MAX 4
+
+/*! Threadpool for dispatching notifications to subscribers */
+static struct ast_threadpool *pool;
+
+static struct stasis_message_type *__subscription_change_message_type;
+
+/*! \private */
+struct stasis_topic {
+       char *name;
+       /*! Variable length array of the subscribers (raw pointer to avoid cyclic references) */
+       struct stasis_subscription **subscribers;
+       /*! Allocated length of the subscribers array */
+       size_t num_subscribers_max;
+       /*! Current size of the subscribers array */
+       size_t num_subscribers_current;
+};
+
+/* Forward declarations for the tightly-coupled subscription object */
+struct stasis_subscription;
+static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
+
+static void topic_dtor(void *obj)
+{
+       struct stasis_topic *topic = obj;
+       ast_free(topic->name);
+       topic->name = NULL;
+       ast_free(topic->subscribers);
+       topic->subscribers = NULL;
+}
+
+struct stasis_topic *stasis_topic_create(const char *name)
+{
+       RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
+
+       topic = ao2_alloc(sizeof(*topic), topic_dtor);
+
+       if (!topic) {
+               return NULL;
+       }
+
+       topic->name = ast_strdup(name);
+       if (!topic->name) {
+               return NULL;
+       }
+
+       topic->num_subscribers_max = INITIAL_SUBSCRIBERS_MAX;
+       topic->subscribers = ast_calloc(topic->num_subscribers_max, sizeof(topic->subscribers));
+       if (!topic->subscribers) {
+               return NULL;
+       }
+
+       ao2_ref(topic, +1);
+       return topic;
+}
+
+const char *stasis_topic_name(const struct stasis_topic *topic)
+{
+       return topic->name;
+}
+
+/*! \private */
+struct stasis_subscription {
+       /*! Unique ID for this subscription */
+       char *uniqueid;
+       /*! Topic subscribed to. */
+       struct stasis_topic *topic;
+       /*! Mailbox for processing incoming messages. */
+       struct ast_taskprocessor *mailbox;
+       /*! Callback function for incoming message processing. */
+       stasis_subscription_cb callback;
+       /*! Data pointer to be handed to the callback. */
+       void *data;
+};
+
+static void subscription_dtor(void *obj)
+{
+       struct stasis_subscription *sub = obj;
+       ast_assert(!stasis_subscription_is_subscribed(sub));
+       ast_free(sub->uniqueid);
+       sub->uniqueid = NULL;
+       ao2_cleanup(sub->topic);
+       sub->topic = NULL;
+       ast_taskprocessor_unreference(sub->mailbox);
+       sub->mailbox = NULL;
+}
+
+static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description);
+
+static struct stasis_subscription *__stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox)
+{
+       RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup);
+       RAII_VAR(struct ast_uuid *, id, NULL, ast_free);
+       char uniqueid[AST_UUID_STR_LEN];
+
+       sub = ao2_alloc(sizeof(*sub), subscription_dtor);
+       if (!sub) {
+               return NULL;
+       }
+
+       id = ast_uuid_generate();
+       if (!id) {
+               ast_log(LOG_ERROR, "UUID generation failed\n");
+               return NULL;
+       }
+       ast_uuid_to_str(id, uniqueid, sizeof(uniqueid));
+       if (needs_mailbox) {
+               sub->mailbox = ast_threadpool_serializer(uniqueid, pool);
+               if (!sub->mailbox) {
+                       return NULL;
+               }
+       }
+
+       sub->uniqueid = ast_strdup(uniqueid);
+       ao2_ref(topic, +1);
+       sub->topic = topic;
+       sub->callback = callback;
+       sub->data = data;
+
+       if (topic_add_subscription(topic, sub) != 0) {
+               return NULL;
+       }
+       send_subscription_change_message(topic, uniqueid, "Subscribe");
+
+       ao2_ref(sub, +1);
+       return sub;
+}
+
+struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data)
+{
+       return __stasis_subscribe(topic, callback, data, 1);
+}
+
+void stasis_unsubscribe(struct stasis_subscription *sub)
+{
+       if (sub) {
+               size_t i;
+               struct stasis_topic *topic = sub->topic;
+               SCOPED_AO2LOCK(lock_topic, topic);
+
+               for (i = 0; i < topic->num_subscribers_current; ++i) {
+                       if (topic->subscribers[i] == sub) {
+                               send_subscription_change_message(topic, sub->uniqueid, "Unsubscribe");
+                               /* swap [i] with last entry; remove last entry */
+                               topic->subscribers[i] = topic->subscribers[--topic->num_subscribers_current];
+                               /* Unsubscribing unrefs the subscription */
+                               ao2_cleanup(sub);
+                               return;
+                       }
+               }
+
+               ast_log(LOG_ERROR, "Internal error: subscription has invalid topic\n");
+       }
+}
+
+int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
+{
+       if (sub) {
+               size_t i;
+               struct stasis_topic *topic = sub->topic;
+               SCOPED_AO2LOCK(lock_topic, topic);
+
+               for (i = 0; i < topic->num_subscribers_current; ++i) {
+                       if (topic->subscribers[i] == sub) {
+                               return 1;
+                       }
+               }
+       }
+
+       return 0;
+}
+
+const char *stasis_subscription_uniqueid(const struct stasis_subscription *sub)
+{
+       return sub->uniqueid;
+}
+
+int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
+{
+       struct stasis_subscription_change *change;
+       if (stasis_message_type(msg) != stasis_subscription_change()) {
+               return 0;
+       }
+
+       change = stasis_message_data(msg);
+       if (strcmp("Unsubscribe", change->description)) {
+               return 0;
+       }
+
+       if (strcmp(stasis_subscription_uniqueid(sub), change->uniqueid)) {
+               return 0;
+       }
+
+       return 1;
+}
+
+/*!
+ * \brief Add a subscriber to a topic.
+ * \param topic Topic
+ * \param sub Subscriber
+ * \return 0 on success
+ * \return Non-zero on error
+ */
+static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
+{
+       struct stasis_subscription **subscribers;
+       SCOPED_AO2LOCK(lock, topic);
+
+       /* Increase list size, if needed */
+       if (topic->num_subscribers_current + 1 > topic->num_subscribers_max) {
+               subscribers = realloc(topic->subscribers, 2 * topic->num_subscribers_max * sizeof(*subscribers));
+               if (!subscribers) {
+                       return -1;
+               }
+               topic->subscribers = subscribers;
+               topic->num_subscribers_max *= 2;
+       }
+
+       /* Don't ref sub here or we'll cause a reference cycle. */
+       topic->subscribers[topic->num_subscribers_current++] = sub;
+       return 0;
+}
+
+/*!
+ * \private
+ * \brief Information needed to dispatch a message to a subscription
+ */
+struct dispatch {
+       /*! Topic message was published to */
+       struct stasis_topic *topic;
+       /*! The message itself */
+       struct stasis_message *message;
+       /*! Subscription receiving the message */
+       struct stasis_subscription *sub;
+};
+
+static void dispatch_dtor(void *data)
+{
+       struct dispatch *dispatch = data;
+       ao2_cleanup(dispatch->topic);
+       ao2_cleanup(dispatch->message);
+       ao2_cleanup(dispatch->sub);
+}
+
+static struct dispatch *dispatch_create(struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sub)
+{
+       RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
+
+       ast_assert(topic != NULL);
+       ast_assert(message != NULL);
+       ast_assert(sub != NULL);
+
+       dispatch = ao2_alloc(sizeof(*dispatch), dispatch_dtor);
+       if (!dispatch) {
+               return NULL;
+       }
+
+       dispatch->topic = topic;
+       ao2_ref(topic, +1);
+
+       dispatch->message = message;
+       ao2_ref(message, +1);
+
+       dispatch->sub = sub;
+       ao2_ref(sub, +1);
+
+       ao2_ref(dispatch, +1);
+       return dispatch;
+}
+
+/*!
+ * \brief Dispatch a message to a subscriber
+ * \param data \ref dispatch object
+ * \return 0
+ */
+static int dispatch_exec(void *data)
+{
+       RAII_VAR(struct dispatch *, dispatch, data, ao2_cleanup);
+       RAII_VAR(struct stasis_topic *, sub_topic, NULL, ao2_cleanup);
+
+       /* Since sub->topic doesn't change, no need to lock sub */
+       ast_assert(dispatch->sub->topic != NULL);
+       ao2_ref(dispatch->sub->topic, +1);
+       sub_topic = dispatch->sub->topic;
+
+       dispatch->sub->callback(dispatch->sub->data,
+                               dispatch->sub,
+                               sub_topic,
+                               dispatch->message);
+
+       return 0;
+}
+
+void stasis_forward_message(struct stasis_topic *topic, struct stasis_topic *publisher_topic, struct stasis_message *message)
+{
+       struct stasis_subscription **subscribers = NULL;
+       size_t num_subscribers, i;
+
+       /* Copy the subscribers, so we don't have to hold the mutex for long */
+       {
+               SCOPED_AO2LOCK(lock, topic);
+               num_subscribers = topic->num_subscribers_current;
+               subscribers = ast_malloc(num_subscribers * sizeof(*subscribers));
+               if (subscribers) {
+                       for (i = 0; i < num_subscribers; ++i) {
+                               ao2_ref(topic->subscribers[i], +1);
+                               subscribers[i] = topic->subscribers[i];
+                       }
+               }
+       }
+
+       if (!subscribers) {
+               ast_log(LOG_ERROR, "Dropping message\n");
+               return;
+       }
+
+       for (i = 0; i < num_subscribers; ++i) {
+               struct stasis_subscription *sub = subscribers[i];
+
+               ast_assert(sub != NULL);
+
+               if (sub->mailbox) {
+                       RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
+
+                       dispatch = dispatch_create(publisher_topic, message, sub);
+                       if (!dispatch) {
+                               ast_log(LOG_DEBUG, "Dropping dispatch\n");
+                               break;
+                       }
+
+                       if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) == 0) {
+                               dispatch = NULL; /* Ownership transferred to mailbox */
+                       }
+               } else {
+                       /* No mailbox; dispatch directly */
+                       sub->callback(sub->data, sub, sub->topic, message);
+               }
+       }
+
+       for (i = 0; i < num_subscribers; ++i) {
+               ao2_cleanup(subscribers[i]);
+       }
+       ast_free(subscribers);
+}
+
+void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
+{
+       stasis_forward_message(topic, topic, message);
+}
+
+/*! \brief Forwarding subscriber */
+static void stasis_forward_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
+{
+       struct stasis_topic *to_topic = data;
+       stasis_forward_message(to_topic, topic, message);
+
+       if (stasis_subscription_final_message(sub, message)) {
+               ao2_cleanup(to_topic);
+       }
+}
+
+struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
+{
+       struct stasis_subscription *sub;
+       if (!from_topic || !to_topic) {
+               return NULL;
+       }
+       /* Subscribe without a mailbox, since we're just forwarding messages */
+       sub = __stasis_subscribe(from_topic, stasis_forward_cb, to_topic, 0);
+       if (sub) {
+               /* hold a ref to to_topic for this forwarding subscription */
+               ao2_ref(to_topic, +1);
+       }
+       return sub;
+}
+
+static void subscription_change_dtor(void *obj)
+{
+       struct stasis_subscription_change *change = obj;
+       ast_string_field_free_memory(change);
+       ao2_cleanup(change->topic);
+}
+
+static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, char *uniqueid, char *description)
+{
+       RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
+
+       change = ao2_alloc(sizeof(struct stasis_subscription_change), subscription_change_dtor);
+       if (ast_string_field_init(change, 128)) {
+               return NULL;
+       }
+
+       ast_string_field_set(change, uniqueid, uniqueid);
+       ast_string_field_set(change, description, description);
+       ao2_ref(topic, +1);
+       change->topic = topic;
+
+       ao2_ref(change, +1);
+       return change;
+}
+
+struct stasis_message_type *stasis_subscription_change(void)
+{
+       return __subscription_change_message_type;
+}
+
+static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description)
+{
+       RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
+
+       change = subscription_change_alloc(topic, uniqueid, description);
+
+       if (!change) {
+               return;
+       }
+
+       msg = stasis_message_create(stasis_subscription_change(), change);
+
+       if (!msg) {
+               return;
+       }
+
+       stasis_publish(topic, msg);
+}
+
+/*! \brief Cleanup function */
+static void stasis_exit(void)
+{
+       ao2_cleanup(__subscription_change_message_type);
+       __subscription_change_message_type = NULL;
+       ast_threadpool_shutdown(pool);
+       pool = NULL;
+}
+
+int stasis_init(void)
+{
+       int cache_init;
+
+       /* XXX Should this be configurable? */
+       struct ast_threadpool_options opts = {
+               .version = AST_THREADPOOL_OPTIONS_VERSION,
+               .idle_timeout = 20,
+               .auto_increment = 1,
+               .initial_size = 0,
+               .max_size = 200
+       };
+
+       ast_register_atexit(stasis_exit);
+
+       if (pool) {
+               ast_log(LOG_ERROR, "Stasis double-initialized\n");
+               return -1;
+       }
+
+       pool = ast_threadpool_create("stasis-core", NULL, &opts);
+       if (!pool) {
+               ast_log(LOG_ERROR, "Stasis threadpool allocation failed\n");
+               return -1;
+       }
+
+       cache_init = stasis_cache_init();
+       if (cache_init != 0) {
+               return -1;
+       }
+
+       __subscription_change_message_type = stasis_message_type_create("stasis_subscription_change");
+       if (!__subscription_change_message_type) {
+               return -1;
+       }
+
+       return 0;
+}
diff --git a/main/stasis_cache.c b/main/stasis_cache.c
new file mode 100644 (file)
index 0000000..2f4cf52
--- /dev/null
@@ -0,0 +1,443 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2013, Digium, Inc.
+ *
+ * David M. Lee, II <dlee@digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*! \file
+ *
+ * \brief Stasis Message API.
+ *
+ * \author David M. Lee, II <dlee@digium.com>
+ */
+
+/*** MODULEINFO
+       <support_level>core</support_level>
+ ***/
+
+#include "asterisk.h"
+
+ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
+
+#include "asterisk/astobj2.h"
+#include "asterisk/hashtab.h"
+#include "asterisk/stasis.h"
+#include "asterisk/utils.h"
+
+#ifdef LOW_MEMORY
+#define NUM_CACHE_BUCKETS 17
+#else
+#define NUM_CACHE_BUCKETS 563
+#endif
+
+struct stasis_caching_topic {
+       struct ao2_container *cache;
+       struct stasis_topic *topic;
+       struct stasis_subscription *sub;
+       snapshot_get_id id_fn;
+};
+
+static void stasis_caching_topic_dtor(void *obj) {
+       struct stasis_caching_topic *caching_topic = obj;
+       ast_assert(!stasis_subscription_is_subscribed(caching_topic->sub));
+       caching_topic->sub = NULL;
+       ao2_cleanup(caching_topic->cache);
+       caching_topic->cache = NULL;
+       ao2_cleanup(caching_topic->topic);
+       caching_topic->topic = NULL;
+}
+
+struct stasis_topic *stasis_caching_get_topic(struct stasis_caching_topic *caching_topic)
+{
+       return caching_topic->topic;
+}
+
+void stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic)
+{
+       if (caching_topic) {
+               if (stasis_subscription_is_subscribed(caching_topic->sub)) {
+                       stasis_unsubscribe(caching_topic->sub);
+               } else {
+                       ast_log(LOG_ERROR, "stasis_caching_topic unsubscribed multiple times\n");
+               }
+       }
+}
+
+struct cache_entry {
+       struct stasis_message_type *type;
+       char *id;
+       struct stasis_message *snapshot;
+};
+
+static void cache_entry_dtor(void *obj)
+{
+       struct cache_entry *entry = obj;
+       ao2_cleanup(entry->type);
+       entry->type = NULL;
+       ast_free(entry->id);
+       entry->id = NULL;
+       ao2_cleanup(entry->snapshot);
+       entry->snapshot = NULL;
+}
+
+static struct cache_entry *cache_entry_create(struct stasis_message_type *type, const char *id, struct stasis_message *snapshot)
+{
+       RAII_VAR(struct cache_entry *, entry, NULL, ao2_cleanup);
+
+       ast_assert(type != NULL);
+       ast_assert(id != NULL);
+
+       entry = ao2_alloc(sizeof(*entry), cache_entry_dtor);
+       if (!entry) {
+               return NULL;
+       }
+
+       entry->id = ast_strdup(id);
+       if (!entry->id) {
+               return NULL;
+       }
+
+       ao2_ref(type, +1);
+       entry->type = type;
+       if (snapshot != NULL) {
+               ao2_ref(snapshot, +1);
+               entry->snapshot = snapshot;
+       }
+
+       ao2_ref(entry, +1);
+       return entry;
+}
+
+static int cache_entry_hash(const void *obj, int flags)
+{
+       const struct cache_entry *entry = obj;
+       int hash = 0;
+
+       ast_assert(!(flags & OBJ_KEY));
+
+       hash += ast_hashtab_hash_string(stasis_message_type_name(entry->type));
+       hash += ast_hashtab_hash_string(entry->id);
+       return hash;
+}
+
+static int cache_entry_cmp(void *obj, void *arg, int flags)
+{
+       const struct cache_entry *left = obj;
+       const struct cache_entry *right = arg;
+
+       ast_assert(!(flags & OBJ_KEY));
+
+       if (left->type == right->type && strcmp(left->id, right->id) == 0) {
+               return CMP_MATCH | CMP_STOP;
+       }
+
+       return 0;
+}
+
+static struct stasis_message *cache_put(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type, const char *id, struct stasis_message *new_snapshot)
+{
+       RAII_VAR(struct cache_entry *, new_entry, NULL, ao2_cleanup);
+       RAII_VAR(struct cache_entry *, cached_entry, NULL, ao2_cleanup);
+       struct stasis_message *old_snapshot = NULL;
+
+       ast_assert(caching_topic->cache != NULL);
+
+       new_entry = cache_entry_create(type, id, new_snapshot);
+
+       if (new_snapshot == NULL) {
+               /* Remove entry from cache */
+               cached_entry = ao2_find(caching_topic->cache, new_entry, OBJ_POINTER | OBJ_UNLINK);
+               if (cached_entry) {
+                       old_snapshot = cached_entry->snapshot;
+                       cached_entry->snapshot = NULL;
+               }
+       } else {
+               /* Insert/update cache */
+               SCOPED_AO2LOCK(lock, caching_topic->cache);
+
+               cached_entry = ao2_find(caching_topic->cache, new_entry, OBJ_POINTER | OBJ_NOLOCK);
+               if (cached_entry) {
+                       /* Update cache. Because objects are moving, no need to update refcounts. */
+                       old_snapshot = cached_entry->snapshot;
+                       cached_entry->snapshot = new_entry->snapshot;
+                       new_entry->snapshot = NULL;
+               } else {
+                       /* Insert into the cache */
+                       ao2_link_flags(caching_topic->cache, new_entry, OBJ_NOLOCK);
+               }
+
+       }
+
+       return old_snapshot;
+}
+
+struct stasis_message *stasis_cache_get(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type, const char *id)
+{
+       RAII_VAR(struct cache_entry *, search_entry, NULL, ao2_cleanup);
+       RAII_VAR(struct cache_entry *, cached_entry, NULL, ao2_cleanup);
+
+       ast_assert(caching_topic->cache != NULL);
+
+       search_entry = cache_entry_create(type, id, NULL);
+       if (search_entry == NULL) {
+               return NULL;
+       }
+
+       cached_entry = ao2_find(caching_topic->cache, search_entry, OBJ_POINTER);
+       if (cached_entry == NULL) {
+               return NULL;
+       }
+
+       ast_assert(cached_entry->snapshot != NULL);
+       ao2_ref(cached_entry->snapshot, +1);
+       return cached_entry->snapshot;
+}
+
+static struct stasis_message_type *__cache_clear_data;
+
+static struct stasis_message_type *cache_clear_data(void)
+{
+       ast_assert(__cache_clear_data != NULL);
+       return __cache_clear_data;
+}
+
+static struct stasis_message_type *__cache_update;
+
+struct stasis_message_type *stasis_cache_update(void)
+{
+       ast_assert(__cache_update != NULL);
+       return __cache_update;
+}
+
+struct cache_clear_data {
+       struct stasis_message_type *type;
+       char *id;
+};
+
+static void cache_clear_data_dtor(void *obj)
+{
+       struct cache_clear_data *ev = obj;
+       ast_free(ev->id);
+       ev->id = NULL;
+       ao2_cleanup(ev->type);
+       ev->type = NULL;
+}
+
+struct stasis_message *stasis_cache_clear_create(struct stasis_message_type *type, const char *id)
+{
+       RAII_VAR(struct cache_clear_data *, ev, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
+
+       ev = ao2_alloc(sizeof(*ev), cache_clear_data_dtor);
+       if (!ev) {
+               return NULL;
+       }
+
+       ev->id = ast_strdup(id);
+       if (!ev->id) {
+               return NULL;
+       }
+       ao2_ref(type, +1);
+       ev->type = type;
+
+       msg = stasis_message_create(cache_clear_data(), ev);
+
+       if (!msg) {
+               return NULL;
+       }
+
+       ao2_ref(msg, +1);
+       return msg;
+}
+
+static void stasis_cache_update_dtor(void *obj)
+{
+       struct stasis_cache_update *update = obj;
+       ao2_cleanup(update->topic);
+       update->topic = NULL;
+       ao2_cleanup(update->old_snapshot);
+       update->old_snapshot = NULL;
+       ao2_cleanup(update->new_snapshot);
+       update->new_snapshot = NULL;
+       ao2_cleanup(update->type);
+       update->type = NULL;
+}
+
+static struct stasis_message *update_create(struct stasis_topic *topic, struct stasis_message *old_snapshot, struct stasis_message *new_snapshot)
+{
+       RAII_VAR(struct stasis_cache_update *, update, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
+
+       ast_assert(topic != NULL);
+       ast_assert(old_snapshot != NULL || new_snapshot != NULL);
+
+       update = ao2_alloc(sizeof(*update), stasis_cache_update_dtor);
+       if (!update) {
+               return NULL;
+       }
+
+       ao2_ref(topic, +1);
+       update->topic = topic;
+       if (old_snapshot) {
+               ao2_ref(old_snapshot, +1);
+               update->old_snapshot = old_snapshot;
+               if (!new_snapshot) {
+                       ao2_ref(stasis_message_type(old_snapshot), +1);
+                       update->type = stasis_message_type(old_snapshot);
+               }
+       }
+       if (new_snapshot) {
+               ao2_ref(new_snapshot, +1);
+               update->new_snapshot = new_snapshot;
+               ao2_ref(stasis_message_type(new_snapshot), +1);
+               update->type = stasis_message_type(new_snapshot);
+       }
+
+       msg = stasis_message_create(stasis_cache_update(), update);
+       if (!msg) {
+               return NULL;
+       }
+
+       ao2_ref(msg, +1);
+       return msg;
+}
+
+static void caching_topic_exec(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
+{
+       RAII_VAR(struct stasis_caching_topic *, caching_topic_needs_unref, NULL, ao2_cleanup);
+       struct stasis_caching_topic *caching_topic = data;
+       const char *id = NULL;
+
+       ast_assert(caching_topic->topic != NULL);
+       ast_assert(caching_topic->id_fn != NULL);
+
+       if (stasis_subscription_final_message(sub, message)) {
+               caching_topic_needs_unref = caching_topic;
+       }
+
+       /* Handle cache clear event */
+       if (cache_clear_data() == stasis_message_type(message)) {
+               RAII_VAR(struct stasis_message *, old_snapshot, NULL, ao2_cleanup);
+               RAII_VAR(struct stasis_message *, update, NULL, ao2_cleanup);
+               struct cache_clear_data *clear = stasis_message_data(message);
+               ast_assert(clear->type != NULL);
+               ast_assert(clear->id != NULL);
+               old_snapshot = cache_put(caching_topic, clear->type, clear->id, NULL);
+               if (old_snapshot) {
+                       update = update_create(topic, old_snapshot, NULL);
+                       stasis_publish(caching_topic->topic, update);
+               } else {
+                       ast_log(LOG_ERROR,
+                               "Attempting to remove an item from the cache that isn't there: %s %s\n",
+                               stasis_message_type_name(clear->type), clear->id);
+               }
+               return;
+       }
+
+       id = caching_topic->id_fn(message);
+       if (id == NULL) {
+               /* Object isn't cached; forward */
+               stasis_forward_message(caching_topic->topic, topic, message);
+       } else {
+               /* Update the cache */
+               RAII_VAR(struct stasis_message *, old_snapshot, NULL, ao2_cleanup);
+               RAII_VAR(struct stasis_message *, update, NULL, ao2_cleanup);
+
+               old_snapshot = cache_put(caching_topic, stasis_message_type(message), id, message);
+
+               update = update_create(topic, old_snapshot, message);
+               if (update == NULL) {
+                       return;
+               }
+
+               stasis_publish(caching_topic->topic, update);
+       }
+
+       if (stasis_subscription_final_message(sub, message)) {
+               ao2_cleanup(caching_topic);
+       }
+}
+
+struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, snapshot_get_id id_fn)
+{
+       RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, ao2_cleanup);
+       struct stasis_subscription *sub;
+       RAII_VAR(char *, new_name, NULL, free);
+       int ret;
+
+       ret = asprintf(&new_name, "%s-cached", stasis_topic_name(original_topic));
+       if (ret < 0) {
+               return NULL;
+       }
+
+       caching_topic = ao2_alloc(sizeof(*caching_topic), stasis_caching_topic_dtor);
+       if (caching_topic == NULL) {
+               return NULL;
+       }
+
+       caching_topic->cache = ao2_container_alloc(NUM_CACHE_BUCKETS, cache_entry_hash, cache_entry_cmp);
+       if (!caching_topic->cache) {
+               ast_log(LOG_ERROR, "Stasis cache allocation failed\n");
+               return NULL;
+       }
+
+       caching_topic->topic = stasis_topic_create(new_name);
+       if (caching_topic->topic == NULL) {
+               return NULL;
+       }
+
+       caching_topic->id_fn = id_fn;
+
+       sub = stasis_subscribe(original_topic, caching_topic_exec, caching_topic);
+       if (sub == NULL) {
+               return NULL;
+       }
+       /* This is for the reference contained in the subscription above */
+       ao2_ref(caching_topic, +1);
+       caching_topic->sub = sub;
+
+       ao2_ref(caching_topic, +1);
+       return caching_topic;
+}
+
+static void stasis_cache_exit(void)
+{
+       ao2_cleanup(__cache_clear_data);
+       __cache_clear_data = NULL;
+       ao2_cleanup(__cache_update);
+       __cache_update = NULL;
+}
+
+int stasis_cache_init(void)
+{
+       ast_register_atexit(stasis_cache_exit);
+
+       if (__cache_clear_data || __cache_update) {
+               ast_log(LOG_ERROR, "Stasis cache double initialized\n");
+               return -1;
+       }
+
+       __cache_update = stasis_message_type_create("stasis_cache_update");
+       if (!__cache_update) {
+               return -1;
+       }
+
+       __cache_clear_data = stasis_message_type_create("StasisCacheClear");
+       if (!__cache_clear_data) {
+               return -1;
+       }
+       return 0;
+}
+
diff --git a/main/stasis_message.c b/main/stasis_message.c
new file mode 100644 (file)
index 0000000..8d397b9
--- /dev/null
@@ -0,0 +1,135 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2013, Digium, Inc.
+ *
+ * David M. Lee, II <dlee@digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*! \file
+ *
+ * \brief Stasis Message API.
+ *
+ * \author David M. Lee, II <dlee@digium.com>
+ */
+
+/*** MODULEINFO
+       <support_level>core</support_level>
+ ***/
+
+#include "asterisk.h"
+
+ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
+
+#include "asterisk/astobj2.h"
+#include "asterisk/stasis.h"
+#include "asterisk/utils.h"
+
+/*! \private */
+struct stasis_message_type {
+       char *name;
+};
+
+static void message_type_dtor(void *obj)
+{
+       struct stasis_message_type *type = obj;
+       ast_free(type->name);
+       type->name = NULL;
+}
+
+struct stasis_message_type *stasis_message_type_create(const char *name)
+{
+       RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
+
+       type = ao2_alloc(sizeof(*type), message_type_dtor);
+       if (!type) {
+               return NULL;
+       }
+
+       type->name = ast_strdup(name);
+       if (!type->name) {
+               return NULL;
+       }
+
+       ao2_ref(type, +1);
+       return type;
+}
+
+const char *stasis_message_type_name(const struct stasis_message_type *type)
+{
+       return type->name;
+}
+
+/*! \private */
+struct stasis_message {
+       /*! Time the message was created */
+       struct timeval timestamp;
+       /*! Type of the message */
+       struct stasis_message_type *type;
+       /*! Message content */
+       void *data;
+};
+
+static void stasis_message_dtor(void *obj)
+{
+       struct stasis_message *message = obj;
+       ao2_cleanup(message->type);
+       ao2_cleanup(message->data);
+}
+
+struct stasis_message *stasis_message_create(struct stasis_message_type *type, void *data)
+{
+       RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
+
+       if (type == NULL || data == NULL) {
+               return NULL;
+       }
+
+       message = ao2_alloc(sizeof(*message), stasis_message_dtor);
+       if (message == NULL) {
+               return NULL;
+       }
+
+       message->timestamp = ast_tvnow();
+       ao2_ref(type, +1);
+       message->type = type;
+       ao2_ref(data, +1);
+       message->data = data;
+
+       ao2_ref(message, +1);
+       return message;
+}
+
+struct stasis_message_type *stasis_message_type(const struct stasis_message *msg)
+{
+       if (msg == NULL) {
+               return NULL;
+       }
+       return msg->type;
+}
+
+void *stasis_message_data(const struct stasis_message *msg)
+{
+       if (msg == NULL) {
+               return NULL;
+       }
+       return msg->data;
+}
+
+const struct timeval *stasis_message_timestamp(const struct stasis_message *msg)
+{
+       if (msg == NULL) {
+               return NULL;
+       }
+       return &msg->timestamp;
+}
diff --git a/tests/test_stasis.c b/tests/test_stasis.c
new file mode 100644 (file)
index 0000000..b052641
--- /dev/null
@@ -0,0 +1,683 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2013, Digium, Inc.
+ *
+ * David M. Lee, II <dlee@digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*!
+ * \file \brief Test Stasis message bus.
+ *
+ * \author\verbatim David M. Lee, II <dlee@digium.com> \endverbatim
+ *
+ * \ingroup tests
+ */
+
+/*** MODULEINFO
+       <depend>TEST_FRAMEWORK</depend>
+       <support_level>core</support_level>
+ ***/
+
+#include "asterisk.h"
+
+ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
+
+#include "asterisk/astobj2.h"
+#include "asterisk/module.h"
+#include "asterisk/stasis.h"
+#include "asterisk/test.h"
+
+static const char *test_category = "/stasis/core/";
+
+AST_TEST_DEFINE(message_type)
+{
+       RAII_VAR(struct stasis_message_type *, uut, NULL, ao2_cleanup);
+
+       switch (cmd) {
+       case TEST_INIT:
+               info->name = __func__;
+               info->category = test_category;
+               info->summary = "Test basic message_type functions";
+               info->description = "Test basic message_type functions";
+               return AST_TEST_NOT_RUN;
+       case TEST_EXECUTE:
+               break;
+       }
+
+       ast_test_validate(test, NULL == stasis_message_type_create(NULL));
+       uut = stasis_message_type_create("SomeMessage");
+       ast_test_validate(test, 0 == strcmp(stasis_message_type_name(uut), "SomeMessage"));
+
+       return AST_TEST_PASS;
+}
+
+AST_TEST_DEFINE(message)
+{
+       RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
+       RAII_VAR(char *, data, NULL, ao2_cleanup);
+       char *expected = "SomeData";
+       struct timeval expected_timestamp;
+       struct timeval time_diff;
+
+       switch (cmd) {
+       case TEST_INIT:
+               info->name = __func__;
+               info->category = test_category;
+               info->summary = "Test basic message functions";
+               info->description = "Test basic message functions";
+               return AST_TEST_NOT_RUN;
+       case TEST_EXECUTE:
+               break;
+       }
+
+
+       type = stasis_message_type_create("SomeMessage");
+
+       ast_test_validate(test, NULL == stasis_message_create(NULL, NULL));
+       ast_test_validate(test, NULL == stasis_message_create(type, NULL));
+
+       data = ao2_alloc(strlen(expected) + 1, NULL);
+       strcpy(data, expected);
+       expected_timestamp = ast_tvnow();
+       uut = stasis_message_create(type, data);
+
+       ast_test_validate(test, NULL != uut);
+       ast_test_validate(test, type == stasis_message_type(uut));
+       ast_test_validate(test, 0 == strcmp(expected, stasis_message_data(uut)));
+       ast_test_validate(test, 2 == ao2_ref(data, 0)); /* uut has ref to data */
+
+       time_diff = ast_tvsub(*stasis_message_timestamp(uut), expected_timestamp);
+       /* 10ms is certainly long enough for the two calls to complete */
+       ast_test_validate(test, time_diff.tv_sec == 0);
+       ast_test_validate(test, time_diff.tv_usec < 10000);
+
+       ao2_ref(uut, -1);
+       uut = NULL;
+       ast_test_validate(test, 1 == ao2_ref(data, 0)); /* uut unreffed data */
+
+       return AST_TEST_PASS;
+}
+
+struct consumer {
+       ast_mutex_t lock;
+       ast_cond_t out;
+       struct stasis_message **messages_rxed;
+       size_t messages_rxed_len;
+       int ignore_subscriptions;
+       int complete;
+};
+
+static void consumer_dtor(void *obj) {
+       struct consumer *consumer = obj;
+
+       ast_mutex_destroy(&consumer->lock);
+       ast_cond_destroy(&consumer->out);
+
+       while (consumer->messages_rxed_len > 0) {
+               ao2_cleanup(consumer->messages_rxed[--consumer->messages_rxed_len]);
+       }
+       ast_free(consumer->messages_rxed);
+       consumer->messages_rxed = NULL;
+}
+
+static struct consumer *consumer_create(int ignore_subscriptions) {
+       RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
+
+       consumer = ao2_alloc(sizeof(*consumer), consumer_dtor);
+
+       if (!consumer) {
+               return NULL;
+       }
+
+       consumer->ignore_subscriptions = ignore_subscriptions;
+       consumer->messages_rxed = ast_malloc(0);
+       if (!consumer->messages_rxed) {
+               return NULL;
+       }
+
+       ast_mutex_init(&consumer->lock);
+       ast_cond_init(&consumer->out, NULL);
+
+       ao2_ref(consumer, +1);
+       return consumer;
+}
+
+static void consumer_exec(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
+{
+       struct consumer *consumer = data;
+       RAII_VAR(struct consumer *, consumer_needs_cleanup, NULL, ao2_cleanup);
+       SCOPED_MUTEX(lock, &consumer->lock);
+
+       if (!consumer->ignore_subscriptions || stasis_message_type(message) != stasis_subscription_change()) {
+
+               ++consumer->messages_rxed_len;
+               consumer->messages_rxed = ast_realloc(consumer->messages_rxed, sizeof(*consumer->messages_rxed) * consumer->messages_rxed_len);
+               ast_assert(consumer->messages_rxed != NULL);
+               consumer->messages_rxed[consumer->messages_rxed_len - 1] = message;
+               ao2_ref(message, +1);
+       }
+
+       if (stasis_subscription_final_message(sub, message)) {
+               consumer->complete = 1;
+               consumer_needs_cleanup = consumer;
+       }
+
+       ast_cond_signal(&consumer->out);
+}
+
+static int consumer_wait_for(struct consumer *consumer, size_t expected_len)
+{
+       struct timeval start = ast_tvnow();
+       struct timespec end = {
+               .tv_sec = start.tv_sec + 30,
+               .tv_nsec = start.tv_usec * 1000
+       };
+
+       SCOPED_MUTEX(lock, &consumer->lock);
+
+       while (consumer->messages_rxed_len < expected_len) {
+               int r = ast_cond_timedwait(&consumer->out, &consumer->lock, &end);
+               if (r == ETIMEDOUT) {
+                       break;
+               }
+               ast_assert(r == 0); /* Not expecting any othet types of errors */
+       }
+       return consumer->messages_rxed_len;
+}
+
+static int consumer_wait_for_completion(struct consumer *consumer)
+{
+       struct timeval start = ast_tvnow();
+       struct timespec end = {
+               .tv_sec = start.tv_sec + 30,
+               .tv_nsec = start.tv_usec * 1000
+       };
+
+       SCOPED_MUTEX(lock, &consumer->lock);
+
+       while (!consumer->complete) {
+               int r = ast_cond_timedwait(&consumer->out, &consumer->lock, &end);
+               if (r == ETIMEDOUT) {
+                       break;
+               }
+               ast_assert(r == 0); /* Not expecting any othet types of errors */
+       }
+       return consumer->complete;
+}
+
+static int consumer_should_stay(struct consumer *consumer, size_t expected_len)
+{
+       struct timeval start = ast_tvnow();
+       struct timeval diff = {
+               .tv_sec = 0,
+               .tv_usec = 100000 /* wait for 100ms */
+       };
+       struct timeval end_tv = ast_tvadd(start, diff);
+       struct timespec end = {
+               .tv_sec = end_tv.tv_sec,
+               .tv_nsec = end_tv.tv_usec * 1000
+       };
+
+       SCOPED_MUTEX(lock, &consumer->lock);
+
+       while (consumer->messages_rxed_len == expected_len) {
+               int r = ast_cond_timedwait(&consumer->out, &consumer->lock, &end);
+               if (r == ETIMEDOUT) {
+                       break;
+               }
+               ast_assert(r == 0); /* Not expecting any othet types of errors */
+       }
+       return consumer->messages_rxed_len;
+}
+
+AST_TEST_DEFINE(subscription_messages)
+{
+       RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
+       RAII_VAR(char *, test_data, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
+       RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
+       RAII_VAR(char *, expected_uniqueid, NULL, ast_free);
+       int complete;
+       struct stasis_subscription_change *change;
+
+       switch (cmd) {
+       case TEST_INIT:
+               info->name = __func__;
+               info->category = test_category;
+               info->summary = "Test subscribe/unsubscribe messages";
+               info->description = "Test subscribe/unsubscribe messages";
+               return AST_TEST_NOT_RUN;
+       case TEST_EXECUTE:
+               break;
+       }
+
+       topic = stasis_topic_create("TestTopic");
+       ast_test_validate(test, NULL != topic);
+
+       consumer = consumer_create(0);
+       ast_test_validate(test, NULL != consumer);
+
+       uut = stasis_subscribe(topic, consumer_exec, consumer);
+       ast_test_validate(test, NULL != uut);
+       ao2_ref(consumer, +1);
+       expected_uniqueid = ast_strdup(stasis_subscription_uniqueid(uut));
+
+       stasis_unsubscribe(uut);
+       uut = NULL;
+       complete = consumer_wait_for_completion(consumer);
+       ast_test_validate(test, 1 == complete);
+
+       ast_test_validate(test, 2 == consumer->messages_rxed_len);
+       ast_test_validate(test, stasis_subscription_change() == stasis_message_type(consumer->messages_rxed[0]));
+       ast_test_validate(test, stasis_subscription_change() == stasis_message_type(consumer->messages_rxed[1]));
+
+       change = stasis_message_data(consumer->messages_rxed[0]);
+       ast_test_validate(test, topic == change->topic);
+       ast_test_validate(test, 0 == strcmp("Subscribe", change->description));
+       ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid));
+
+       change = stasis_message_data(consumer->messages_rxed[1]);
+       ast_test_validate(test, topic == change->topic);
+       ast_test_validate(test, 0 == strcmp("Unsubscribe", change->description));
+       ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid));
+
+       return AST_TEST_PASS;
+}
+
+AST_TEST_DEFINE(publish)
+{
+       RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
+       RAII_VAR(char *, test_data, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
+       RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
+       int actual_len;
+       const char *actual;
+
+       switch (cmd) {
+       case TEST_INIT:
+               info->name = __func__;
+               info->category = test_category;
+               info->summary = "Test simple subscriptions";
+               info->description = "Test simple subscriptions";
+               return AST_TEST_NOT_RUN;
+       case TEST_EXECUTE:
+               break;
+       }
+
+       topic = stasis_topic_create("TestTopic");
+       ast_test_validate(test, NULL != topic);
+
+       consumer = consumer_create(1);
+       ast_test_validate(test, NULL != consumer);
+
+       uut = stasis_subscribe(topic, consumer_exec, consumer);
+       ast_test_validate(test, NULL != uut);
+       ao2_ref(consumer, +1);
+
+       test_data = ao2_alloc(1, NULL);
+       ast_test_validate(test, NULL != test_data);
+       test_message_type = stasis_message_type_create("TestMessage");
+       test_message = stasis_message_create(test_message_type, test_data);
+
+       stasis_publish(topic, test_message);
+
+       actual_len = consumer_wait_for(consumer, 1);
+       ast_test_validate(test, 1 == actual_len);
+       actual = stasis_message_data(consumer->messages_rxed[0]);
+       ast_test_validate(test, test_data == actual);
+
+       return AST_TEST_PASS;
+}
+
+AST_TEST_DEFINE(unsubscribe_stops_messages)
+{
+       RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
+       RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
+       RAII_VAR(char *, test_data, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
+       int actual_len;
+
+       switch (cmd) {
+       case TEST_INIT:
+               info->name = __func__;
+               info->category = test_category;
+               info->summary = "Test simple subscriptions";
+               info->description = "Test simple subscriptions";
+               return AST_TEST_NOT_RUN;
+       case TEST_EXECUTE:
+               break;
+       }
+
+       topic = stasis_topic_create("TestTopic");
+       ast_test_validate(test, NULL != topic);
+
+       consumer = consumer_create(1);
+       ast_test_validate(test, NULL != consumer);
+
+       uut = stasis_subscribe(topic, consumer_exec, consumer);
+       ast_test_validate(test, NULL != uut);
+       ao2_ref(consumer, +1);
+
+       stasis_unsubscribe(uut);
+       uut = NULL;
+
+       test_data = ao2_alloc(1, NULL);
+       ast_test_validate(test, NULL != test_data);
+       test_message_type = stasis_message_type_create("TestMessage");
+       test_message = stasis_message_create(test_message_type, test_data);
+
+       stasis_publish(topic, test_message);
+
+       actual_len = consumer_should_stay(consumer, 0);
+       ast_test_validate(test, 0 == actual_len);
+
+       return AST_TEST_PASS;
+}
+
+
+AST_TEST_DEFINE(forward)
+{
+       RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
+
+       RAII_VAR(struct consumer *, parent_consumer, NULL, ao2_cleanup);
+       RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
+
+       RAII_VAR(struct stasis_subscription *, forward_sub, NULL, stasis_unsubscribe);
+       RAII_VAR(struct stasis_subscription *, parent_sub, NULL, stasis_unsubscribe);
+       RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
+
+       RAII_VAR(char *, test_data, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
+       int actual_len;
+
+       switch (cmd) {
+       case TEST_INIT:
+               info->name = __func__;
+               info->category = test_category;
+               info->summary = "Test sending events to a parent topic";
+               info->description = "Test sending events to a parent topic.\n"
+                       "This test creates three topics (one parent, two children)\n"
+                       "and publishes a message to one child, and verifies it's\n"
+                       "only seen by that child and the parent";
+               return AST_TEST_NOT_RUN;
+       case TEST_EXECUTE:
+               break;
+       }
+
+       parent_topic = stasis_topic_create("ParentTestTopic");
+       ast_test_validate(test, NULL != parent_topic);
+       topic = stasis_topic_create("TestTopic");
+       ast_test_validate(test, NULL != topic);
+
+       forward_sub = stasis_forward_all(topic, parent_topic);
+       ast_test_validate(test, NULL != forward_sub);
+
+       parent_consumer = consumer_create(1);
+       ast_test_validate(test, NULL != parent_consumer);
+       consumer = consumer_create(1);
+       ast_test_validate(test, NULL != consumer);
+
+       parent_sub = stasis_subscribe(parent_topic, consumer_exec, parent_consumer);
+       ast_test_validate(test, NULL != parent_sub);
+       ao2_ref(parent_consumer, +1);
+       sub = stasis_subscribe(topic, consumer_exec, consumer);
+       ast_test_validate(test, NULL != sub);
+       ao2_ref(consumer, +1);
+
+       test_data = ao2_alloc(1, NULL);
+       ast_test_validate(test, NULL != test_data);
+       test_message_type = stasis_message_type_create("TestMessage");
+       test_message = stasis_message_create(test_message_type, test_data);
+
+       stasis_publish(topic, test_message);
+
+       actual_len = consumer_wait_for(consumer, 1);
+       ast_test_validate(test, 1 == actual_len);
+       actual_len = consumer_wait_for(parent_consumer, 1);
+       ast_test_validate(test, 1 == actual_len);
+
+       return AST_TEST_PASS;
+}
+
+struct cache_test_data {
+       char *id;
+       char *value;
+};
+
+static void cache_test_data_dtor(void *obj)
+{
+       struct cache_test_data *data = obj;
+       ast_free(data->id);
+       ast_free(data->value);
+}
+
+static struct stasis_message *cache_test_message_create(struct stasis_message_type *type, const char *name, const char *value)
+{
+       RAII_VAR(struct cache_test_data *, data, NULL, ao2_cleanup);
+
+       data = ao2_alloc(sizeof(*data), cache_test_data_dtor);
+       if (data == NULL) {
+               return NULL;
+       }
+
+       ast_assert(name != NULL);
+       ast_assert(value != NULL);
+
+       data->id = ast_strdup(name);
+       data->value = ast_strdup(value);
+       if (!data->id || !data->value) {
+               return NULL;
+       }
+
+       return stasis_message_create(type, data);
+}
+
+static const char *cache_test_data_id(struct stasis_message *message) {
+       struct cache_test_data *cachable = stasis_message_data(message);
+
+       if (0 != strcmp("Cacheable", stasis_message_type_name(stasis_message_type(message)))) {
+               return NULL;
+       }
+       return cachable->id;
+}
+
+AST_TEST_DEFINE(cache_passthrough)
+{
+       RAII_VAR(struct stasis_message_type *, non_cache_type, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe);
+       RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
+       RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
+       int actual_len;
+       struct stasis_message_type *actual_type;
+
+       switch (cmd) {
+       case TEST_INIT:
+               info->name = __func__;
+               info->category = test_category;
+               info->summary = "Test passing messages through cache topic unscathed.";
+               info->description = "Test passing messages through cache topic unscathed.";
+               return AST_TEST_NOT_RUN;
+       case TEST_EXECUTE:
+               break;
+       }
+
+       non_cache_type = stasis_message_type_create("NonCacheable");
+       ast_test_validate(test, NULL != non_cache_type);
+       topic = stasis_topic_create("SomeTopic");
+       ast_test_validate(test, NULL != topic);
+       caching_topic = stasis_caching_topic_create(topic, cache_test_data_id);
+       ast_test_validate(test, NULL != caching_topic);
+       consumer = consumer_create(1);
+       ast_test_validate(test, NULL != consumer);
+       sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), consumer_exec, consumer);
+       ast_test_validate(test, NULL != sub);
+       ao2_ref(consumer, +1);
+
+       test_message = cache_test_message_create(non_cache_type, "1", "1");
+       ast_test_validate(test, NULL != test_message);
+
+       stasis_publish(topic, test_message);
+
+       actual_len = consumer_wait_for(consumer, 1);
+       ast_test_validate(test, 1 == actual_len);
+
+       actual_type = stasis_message_type(consumer->messages_rxed[0]);
+       ast_test_validate(test, non_cache_type == actual_type);
+
+       ast_test_validate(test, test_message == consumer->messages_rxed[0]);
+
+       return AST_TEST_PASS;
+}
+
+AST_TEST_DEFINE(cache)
+{
+       RAII_VAR(struct stasis_message_type *, cache_type, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe);
+       RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
+       RAII_VAR(struct stasis_message *, test_message1_1, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_message *, test_message2_1, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_message *, test_message2_2, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_message *, test_message1_clear, NULL, ao2_cleanup);
+       int actual_len;
+       struct stasis_cache_update *actual_update;
+
+       switch (cmd) {
+       case TEST_INIT:
+               info->name = __func__;
+               info->category = test_category;
+               info->summary = "Test passing messages through cache topic unscathed.";
+               info->description = "Test passing messages through cache topic unscathed.";
+               return AST_TEST_NOT_RUN;
+       case TEST_EXECUTE:
+               break;
+       }
+
+       cache_type = stasis_message_type_create("Cacheable");
+       ast_test_validate(test, NULL != cache_type);
+       topic = stasis_topic_create("SomeTopic");
+       ast_test_validate(test, NULL != topic);
+       caching_topic = stasis_caching_topic_create(topic, cache_test_data_id);
+       ast_test_validate(test, NULL != caching_topic);
+       consumer = consumer_create(1);
+       ast_test_validate(test, NULL != consumer);
+       sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), consumer_exec, consumer);
+       ast_test_validate(test, NULL != sub);
+       ao2_ref(consumer, +1);
+
+       test_message1_1 = cache_test_message_create(cache_type, "1", "1");
+       ast_test_validate(test, NULL != test_message1_1);
+       test_message2_1 = cache_test_message_create(cache_type, "2", "1");
+       ast_test_validate(test, NULL != test_message2_1);
+
+       /* Post a couple of snapshots */
+       stasis_publish(topic, test_message1_1);
+       stasis_publish(topic, test_message2_1);
+       actual_len = consumer_wait_for(consumer, 2);
+       ast_test_validate(test, 2 == actual_len);
+
+       /* Check for new snapshot messages */
+       ast_test_validate(test, stasis_cache_update() == stasis_message_type(consumer->messages_rxed[0]));
+       actual_update = stasis_message_data(consumer->messages_rxed[0]);
+       ast_test_validate(test, topic == actual_update->topic);
+       ast_test_validate(test, NULL == actual_update->old_snapshot);
+       ast_test_validate(test, test_message1_1 == actual_update->new_snapshot);
+       ast_test_validate(test, test_message1_1 == stasis_cache_get(caching_topic, cache_type, "1"));
+       /* stasis_cache_get returned a ref, so unref test_message1_1 */
+       ao2_ref(test_message1_1, -1);
+
+       ast_test_validate(test, stasis_cache_update() == stasis_message_type(consumer->messages_rxed[1]));
+       actual_update = stasis_message_data(consumer->messages_rxed[1]);
+       ast_test_validate(test, topic == actual_update->topic);
+       ast_test_validate(test, NULL == actual_update->old_snapshot);
+       ast_test_validate(test, test_message2_1 == actual_update->new_snapshot);
+       ast_test_validate(test, test_message2_1 == stasis_cache_get(caching_topic, cache_type, "2"));
+       /* stasis_cache_get returned a ref, so unref test_message2_1 */
+       ao2_ref(test_message2_1, -1);
+
+       /* Update snapshot 2 */
+       test_message2_2 = cache_test_message_create(cache_type, "2", "2");
+       ast_test_validate(test, NULL != test_message2_2);
+       stasis_publish(topic, test_message2_2);
+
+       actual_len = consumer_wait_for(consumer, 3);
+       ast_test_validate(test, 3 == actual_len);
+
+       actual_update = stasis_message_data(consumer->messages_rxed[2]);
+       ast_test_validate(test, topic == actual_update->topic);
+       ast_test_validate(test, test_message2_1 == actual_update->old_snapshot);
+       ast_test_validate(test, test_message2_2 == actual_update->new_snapshot);
+       ast_test_validate(test, test_message2_2 == stasis_cache_get(caching_topic, cache_type, "2"));
+       /* stasis_cache_get returned a ref, so unref test_message2_2 */
+       ao2_ref(test_message2_2, -1);
+
+       /* Clear snapshot 1 */
+       test_message1_clear = stasis_cache_clear_create(cache_type, "1");
+       ast_test_validate(test, NULL != test_message1_clear);
+       stasis_publish(topic, test_message1_clear);
+
+       actual_len = consumer_wait_for(consumer, 4);
+       ast_test_validate(test, 4 == actual_len);
+
+       actual_update = stasis_message_data(consumer->messages_rxed[3]);
+       ast_test_validate(test, topic == actual_update->topic);
+       ast_test_validate(test, test_message1_1 == actual_update->old_snapshot);
+       ast_test_validate(test, NULL == actual_update->new_snapshot);
+       ast_test_validate(test, NULL == stasis_cache_get(caching_topic, cache_type, "1"));
+
+       return AST_TEST_PASS;
+}
+
+static int unload_module(void)
+{
+       AST_TEST_UNREGISTER(message_type);
+       AST_TEST_UNREGISTER(message);
+       AST_TEST_UNREGISTER(subscription_messages);
+       AST_TEST_UNREGISTER(publish);
+       AST_TEST_UNREGISTER(unsubscribe_stops_messages);
+       AST_TEST_UNREGISTER(forward);
+       AST_TEST_UNREGISTER(cache_passthrough);
+       AST_TEST_UNREGISTER(cache);
+       return 0;
+}
+
+static int load_module(void)
+{
+       AST_TEST_REGISTER(message_type);
+       AST_TEST_REGISTER(message);
+       AST_TEST_REGISTER(subscription_messages);
+       AST_TEST_REGISTER(publish);
+       AST_TEST_REGISTER(unsubscribe_stops_messages);
+       AST_TEST_REGISTER(forward);
+       AST_TEST_REGISTER(cache_passthrough);
+       AST_TEST_REGISTER(cache);
+       return AST_MODULE_LOAD_SUCCESS;
+}
+
+AST_MODULE_INFO(ASTERISK_GPL_KEY, 0, "Stasis testing",
+               .load = load_module,
+               .unload = unload_module
+       );