2 * Asterisk -- An open source telephony toolkit.
4 * Copyright (C) 2013, Digium, Inc.
6 * David M. Lee, II <dlee@digium.com>
8 * See http://www.asterisk.org for more information about
9 * the Asterisk project. Please do not directly contact
10 * any of the maintainers of this project for assistance;
11 * the project provides a web site, mailing lists and IRC
12 * channels for your use.
14 * This program is free software, distributed under the terms of
15 * the GNU General Public License Version 2. See the LICENSE file
16 * at the top of the source tree.
19 #ifndef _ASTERISK_STASIS_H
20 #define _ASTERISK_STASIS_H
24 * \brief Stasis Message Bus API. See \ref stasis "Stasis Message Bus API" for
25 * detailed documentation.
27 * \author David M. Lee, II <dlee@digium.com>
30 * \page stasis Stasis Message Bus API
34 * The Stasis Message Bus is a loosely typed mechanism for distributing messages
35 * within Asterisk. It is designed to be:
36 * - Loosely coupled; new message types can be added in seperate modules.
37 * - Easy to use; publishing and subscribing are straightforward operations.
39 * There are three main concepts for using the Stasis Message Bus:
40 * - \ref stasis_message
42 * - \ref stasis_subscription
46 * Central to the Stasis Message Bus is the \ref stasis_message, the messages
47 * that are sent on the bus. These messages have:
48 * - a type (as defined by a \ref stasis_message_type)
49 * - a value - a \c void pointer to an AO2 object
50 * - a timestamp when it was created
52 * Once a \ref stasis_message has been created, it is immutable and cannot
53 * change. The same goes for the value of the message (although this cannot be
54 * enforced in code). Messages themselves are reference-counted, AO2 objects,
55 * along with their values. By being both reference counted and immutable,
56 * messages can be shared throughout the system without any concerns for
59 * The type of a message is defined by an instance of \ref stasis_message_type,
60 * which can be created by calling stasis_message_type_create(). Message types
61 * are named, which is useful in debugging. It is recommended that the string
62 * name for a message type match the name of the struct that's stored in the
63 * message. For example, name for \ref stasis_cache_update's message type is \c
64 * "stasis_cache_update".
68 * A \ref stasis_topic is an object to which \ref stasis_subscriber's may be
69 * subscribed, and \ref stasis_message's may be published. Any message published
70 * to the topic is dispatched to all of its subscribers. The topic itself may be
71 * named, which is useful in debugging.
73 * Topics themselves are reference counted objects. Since topics are referred to
74 * by their subscibers, they will not be freed until all of their subscribers
75 * have unsubscribed. Topics are also thread safe, so no worries about
76 * publishing/subscribing/unsubscribing to a topic concurrently from multiple
77 * threads. It's also designed to handle the case of unsubscribing from a topic
78 * from within the subscription handler.
82 * There is one special case of topics that's worth noting: forwarding
83 * messages. It's a fairly common use case to want to forward all the messages
84 * published on one topic to another one (for example, an aggregator topic that
85 * publishes all the events from a set of other topics). This can be
86 * accomplished easily using stasis_forward_all(). This sets up the forwarding
87 * between the two topics, and returns a \ref stasis_subscription, which can be
88 * unsubscribed to stop the forwarding.
92 * Another common use case is to want to cache certain messages that are
93 * published on the bus. Usually these events are snapshots of the current state
94 * in the system, and it's desirable to query that state from the cache without
95 * locking the original object. It's also desirable for subscribers of the
96 * caching topic to receive messages that have both the old cache value and the
97 * new value being put into the cache. For this, we have
98 * stasis_caching_topic_create(), providing it with the topic which publishes
99 * the messages that you wish to cache, and a function that can identify
100 * cacheable messages.
102 * The returned \ref stasis_caching_topic provides a topic that forwards
103 * non-cacheable messages unchanged. A cacheable message is wrapped in a \ref
104 * stasis_cache_update message which provides the old snapshot (or \c NULL if
105 * this is a new cache entry), and the new snapshot (or \c NULL if the entry was
106 * removed from the cache). A stasis_cache_clear_create() message must be sent
107 * to the topic in order to remove entries from the cache.
109 * In order to unsubscribe a \ref stasis_caching_topic from the upstream topic,
110 * call stasis_caching_unsubscribe(). Due to cyclic references, the \ref
111 * stasis_caching_topic will not be freed until after it has been unsubscribed,
112 * and all other ao2_ref()'s have been cleaned up.
114 * \par stasis_subscriber
116 * Any topic may be subscribed to by simply providing stasis_subscribe() the
117 * \ref stasis_topic to subscribe to, a handler function and \c void pointer to
118 * data that is passed back to the handler. Invocations on the subscription's
119 * handler are serialized, but different invocations may occur on different
120 * threads (this usually isn't important unless you use thread locals or
121 * something similar).
123 * In order to stop receiving messages, call stasis_unsubscribe() with your \ref
124 * stasis_subscription. Due to cyclic references, the \ref
125 * stasis_subscription will not be freed until after it has been unsubscribed,
126 * and all other ao2_ref()'s have been cleaned up.
130 * Subscriptions have two options for unsubscribing, depending upon the context
131 * in which you need to unsubscribe.
133 * If your subscription is owned by a module, and you must unsubscribe from the
134 * module_unload() function, then you'll want to use the
135 * stasis_unsubscribe_and_join() function. This will block until the final
136 * message has been received on the subscription. Otherwise, there's the danger
137 * of invoking the callback function after it has been unloaded.
139 * If your subscription is owned by an object, then your object should have an
140 * explicit shutdown() function, which calls stasis_unsubscribe(). In your
141 * subscription handler, when the stasis_subscription_final_message() has been
142 * received, decrement the refcount on your object. In your object's destructor,
143 * you may assert that stasis_subscription_is_done() to validate that the
144 * subscription's callback will no longer be invoked.
146 * \b Note: You may be tempted to simply call stasis_unsubscribe_and_join() from
147 * an object's destructor. While code that does this may work most of the time,
148 * it's got one big downside. There's a general assumption that object
149 * destruction is non-blocking. If you block the destruction waiting for the
150 * subscription to complete, there's the danger that the subscription may
151 * process a message which will bump the refcount up by one. Then it does
152 * whatever it does, decrements the refcount, which then proceeds to re-destroy
153 * the object. Now you've got hard to reproduce bugs that only show up under
157 #include "asterisk/utils.h"
162 * \brief Metadata about a \ref stasis_message.
165 struct stasis_message_type;
168 * \brief Register a new message type.
170 * \ref stasis_message_type is an AO2 object, so ao2_cleanup() when you're done
173 * \param name Name of the new type.
174 * \return Pointer to the new type.
175 * \return \c NULL on error.
178 struct stasis_message_type *stasis_message_type_create(const char *name);
181 * \brief Gets the name of a given message type
182 * \param type The type to get.
183 * \return Name of the type.
184 * \return \c NULL if \a type is \c NULL.
187 const char *stasis_message_type_name(const struct stasis_message_type *type);
190 * \brief Opaque type for a Stasis message.
193 struct stasis_message;
196 * \brief Create a new message.
198 * This message is an \c ao2 object, and must be ao2_cleanup()'ed when you are done
199 * with it. Messages are also immutable, and must not be modified after they
200 * are initialized. Especially the \a data in the message.
202 * \param type Type of the message
203 * \param data Immutable data that is the actual contents of the message
204 * \return New message
205 * \return \c NULL on error
208 struct stasis_message *stasis_message_create(struct stasis_message_type *type, void *data);
211 * \brief Get the message type for a \ref stasis_message.
212 * \param msg Message to type
213 * \return Type of \a msg
214 * \return \c NULL if \a msg is \c NULL.
217 struct stasis_message_type *stasis_message_type(const struct stasis_message *msg);
220 * \brief Get the data contained in a message.
221 * \param msg Message.
222 * \return Immutable data pointer
223 * \return \c NULL if msg is \c NULL.
226 void *stasis_message_data(const struct stasis_message *msg);
229 * \brief Get the time when a message was created.
230 * \param msg Message.
231 * \return Pointer to the \a timeval when the message was created.
232 * \return \c NULL if msg is \c NULL.
235 const struct timeval *stasis_message_timestamp(const struct stasis_message *msg);
242 * \brief A topic to which messages may be posted, and subscribers, well, subscribe
248 * \brief Create a new topic.
249 * \param name Name of the new topic.
250 * \return New topic instance.
251 * \return \c NULL on error.
254 struct stasis_topic *stasis_topic_create(const char *name);
257 * \brief Return the name of a topic.
258 * \param topic Topic.
259 * \return Name of the topic.
260 * \return \c NULL if topic is \c NULL.
263 const char *stasis_topic_name(const struct stasis_topic *topic);
266 * \brief Publish a message to a topic's subscribers.
267 * \param topic Topic.
268 * \param message Message to publish.
271 void stasis_publish(struct stasis_topic *topic, struct stasis_message *message);
274 * \brief Publish a message from a specified topic to all the subscribers of a
275 * possibly different topic.
276 * \param topic Topic to publish message to.
277 * \param topic Original topic message was from.
278 * \param message Message
281 void stasis_forward_message(struct stasis_topic *topic,
282 struct stasis_topic *publisher_topic,
283 struct stasis_message *message);
290 * \brief Opaque type for a Stasis subscription.
293 struct stasis_subscription;
296 * \brief Callback function type for Stasis subscriptions.
297 * \param data Data field provided with subscription.
298 * \param topic Topic to which the message was published.
299 * \param message Published message.
302 typedef void (*stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message);
305 * \brief Create a subscription.
307 * In addition to being AO2 managed memory (requiring an ao2_cleanup() to free
308 * up this reference), the subscription must be explicitly unsubscribed from its
309 * topic using stasis_unsubscribe().
311 * The invocations of the callback are serialized, but may not always occur on
312 * the same thread. The invocation order of different subscriptions is
315 * \param topic Topic to subscribe to.
316 * \param callback Callback function for subscription messages.
317 * \param data Data to be passed to the callback, in addition to the message.
318 * \return New \ref stasis_subscription object.
319 * \return \c NULL on error.
322 struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic,
323 stasis_subscription_cb callback, void *data);
326 * \brief Cancel a subscription.
328 * Note that in an asynchronous system, there may still be messages queued or
329 * in transit to the subscription's callback. These will still be delivered.
330 * There will be a final 'SubscriptionCancelled' message, indicating the
331 * delivery of the final message.
333 * \param subscription Subscription to cancel.
334 * \return \c NULL for convenience
337 struct stasis_subscription *stasis_unsubscribe(
338 struct stasis_subscription *subscription);
341 * \brief Block until the last message is processed on a subscription.
343 * This function will not return until the \a subscription's callback for the
344 * stasis_subscription_final_message() completes. This allows cleanup routines
345 * to run before unblocking the joining thread.
347 * \param subscription Subscription to block on.
350 void stasis_subscription_join(struct stasis_subscription *subscription);
353 * \brief Returns whether \a subscription has received its final message.
355 * Note that a subscription is considered done even while the
356 * stasis_subscription_final_message() is being processed. This allows cleanup
357 * routines to check the status of the subscription.
359 * \param subscription Subscription.
360 * \return True (non-zero) if stasis_subscription_final_message() has been
362 * \return False (zero) if waiting for the end.
364 int stasis_subscription_is_done(struct stasis_subscription *subscription);
367 * \brief Cancel a subscription, blocking until the last message is processed.
369 * While normally it's recommended to stasis_unsubscribe() and wait for
370 * stasis_subscription_final_message(), there are times (like during a module
371 * unload) where you have to wait for the final message (otherwise you'll call
372 * a function in a shared module that no longer exists).
374 * \param subscription Subscription to cancel.
375 * \return \c NULL for convenience
378 struct stasis_subscription *stasis_unsubscribe_and_join(
379 struct stasis_subscription *subscription);
382 * \brief Create a subscription which forwards all messages from one topic to
385 * Note that the \a topic parameter of the invoked callback will the be \a topic
386 * the message was sent to, not the topic the subscriber subscribed to.
388 * \param from_topic Topic to forward.
389 * \param to_topic Destination topic of forwarded messages.
390 * \return New forwarding subscription.
391 * \return \c NULL on error.
394 struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic,
395 struct stasis_topic *to_topic);
398 * \brief Get the unique ID for the subscription.
400 * \param sub Subscription for which to get the unique ID.
401 * \return Unique ID for the subscription.
404 const char *stasis_subscription_uniqueid(const struct stasis_subscription *sub);
407 * \brief Returns whether a subscription is currently subscribed.
409 * Note that there may still be messages queued up to be dispatched to this
410 * subscription, but the stasis_subscription_final_message() has been enqueued.
412 * \param sub Subscription to check
413 * \return False (zero) if subscription is not subscribed.
414 * \return True (non-zero) if still subscribed.
416 int stasis_subscription_is_subscribed(const struct stasis_subscription *sub);
419 * \brief Determine whether a message is the final message to be received on a subscription.
421 * \param sub Subscription on which the message was received.
422 * \param msg Message to check.
423 * \return zero if the provided message is not the final message.
424 * \return non-zero if the provided message is the final message.
427 int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg);
429 /*! \addtogroup StasisTopicsAndMessages
434 * \brief Holds details about changes to subscriptions for the specified topic
437 struct stasis_subscription_change {
438 AST_DECLARE_STRING_FIELDS(
439 AST_STRING_FIELD(uniqueid); /*!< The unique ID associated with this subscription */
440 AST_STRING_FIELD(description); /*!< The description of the change to the subscription associated with the uniqueid */
442 struct stasis_topic *topic; /*!< The topic the subscription is/was subscribing to */
446 * \brief Gets the message type for subscription change notices
447 * \return The stasis_message_type for subscription change notices
450 struct stasis_message_type *stasis_subscription_change_type(void);
455 * \brief Pool for topic aggregation
457 struct stasis_topic_pool;
460 * \brief Create a topic pool that routes messages from dynamically generated topics to the given topic
461 * \param pooled_topic Topic to which messages will be routed
462 * \return the new stasis_topic_pool
463 * \return \c NULL on failure
465 struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic);
468 * \brief Find or create a topic in the pool
469 * \param pool Pool for which to get the topic
470 * \param topic_name Name of the topic to get
471 * \return The already stored or newly allocated topic
472 * \return \c NULL if the topic was not found and could not be allocated
474 struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name);
478 /*! \addtogroup StasisTopicsAndMessages
483 * \brief Message type for cache update messages.
484 * \return Message type for cache update messages.
487 struct stasis_message_type *stasis_cache_update_type(void);
490 * \brief Cache update message
493 struct stasis_cache_update {
494 /*! \brief Topic that published \c new_snapshot */
495 struct stasis_topic *topic;
496 /*! \brief Convenience reference to snapshot type */
497 struct stasis_message_type *type;
498 /*! \brief Old value from the cache */
499 struct stasis_message *old_snapshot;
500 /*! \brief New value */
501 struct stasis_message *new_snapshot;
505 * \brief Cache clear message.
507 struct stasis_cache_clear {
508 /*! Type of object being cleared from the cache */
509 struct stasis_message_type *type;
510 /*! Id of the object being cleared from the cache */
515 * \brief Message type for \ref stasis_cache_clear.
518 struct stasis_message_type *stasis_cache_clear_type(void);
521 * \brief A message which instructs the caching topic to remove an entry from its cache.
522 * \param type Message type.
523 * \param id Unique id of the snapshot to clear.
524 * \return Message which, when sent to the \a topic, will clear the item from the cache.
525 * \return \c NULL on error.
528 struct stasis_message *stasis_cache_clear_create(struct stasis_message_type *type, const char *id);
535 * \brief A topic wrapper, which caches certain messages.
538 struct stasis_caching_topic;
541 * \brief Callback extract a unique identity from a snapshot message.
543 * This identity is unique to the underlying object of the snapshot, such as the
544 * UniqueId field of a channel.
546 * \param message Message to extract id from.
547 * \return String representing the snapshot's id.
548 * \return \c NULL if the message_type of the message isn't a handled snapshot.
551 typedef const char *(*snapshot_get_id)(struct stasis_message *message);
554 * \brief Create a topic which monitors and caches messages from another topic.
556 * The idea is that some topics publish 'snapshots' of some other object's state
557 * that should be cached. When these snapshot messages are received, the cache
558 * is updated, and a stasis_cache_update() message is forwarded, which has both
559 * the original snapshot message and the new message.
561 * \param original_topic Topic publishing snapshot messages.
562 * \param id_fn Callback to extract the id from a snapshot message.
563 * \return New topic which changes snapshot messages to stasis_cache_update()
564 * messages, and forwards all other messages from the original topic.
567 struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, snapshot_get_id id_fn);
570 * \brief Unsubscribes a caching topic from its upstream topic.
572 * This function returns immediately, so be sure to cleanup when
573 * stasis_subscription_final_message() is received.
575 * \param caching_topic Caching topic to unsubscribe
576 * \return \c NULL for convenience
579 struct stasis_caching_topic *stasis_caching_unsubscribe(
580 struct stasis_caching_topic *caching_topic);
583 * \brief Unsubscribes a caching topic from its upstream topic, blocking until
584 * all messages have been forwarded.
586 * See stasis_unsubscriben_and_join() for more info on when to use this as
587 * opposed to stasis_caching_unsubscribe().
589 * \param caching_topic Caching topic to unsubscribe
590 * \return \c NULL for convenience
593 struct stasis_caching_topic *stasis_caching_unsubscribe_and_join(
594 struct stasis_caching_topic *caching_topic);
597 * \brief Returns the topic of cached events from a caching topics.
598 * \param caching_topic The caching topic.
599 * \return The topic that publishes cache update events, along with passthrough events
600 * from the underlying topic.
601 * \return \c NULL if \a caching_topic is \c NULL.
604 struct stasis_topic *stasis_caching_get_topic(struct stasis_caching_topic *caching_topic);
607 * \brief Retrieve an item from the cache.
608 * \param caching_topic The topic returned from stasis_caching_topic_create().
609 * \param type Type of message to retrieve.
610 * \param id Identity of the snapshot to retrieve.
611 * \return Message from the cache. The cache still owns the message, so
612 * ao2_ref() if you want to keep it.
613 * \return \c NULL if message is not found.
616 struct stasis_message *stasis_cache_get(struct stasis_caching_topic *caching_topic,
617 struct stasis_message_type *type,
621 * \brief Dump cached items to a subscription
622 * \param caching_topic The topic returned from stasis_caching_topic_create().
623 * \param type Type of message to dump (any type if \c NULL).
624 * \return ao2_container containing all matches (must be unreffed by caller)
625 * \return \c NULL on allocation error
628 struct ao2_container *stasis_cache_dump(struct stasis_caching_topic *caching_topic,
629 struct stasis_message_type *type);
637 * \brief Log a message about invalid attempt to access a type.
639 void stasis_log_bad_type_access(const char *name);
642 * \brief Boiler-plate removing macro for defining message types.
644 * \param name Name of message type.
647 #define STASIS_MESSAGE_TYPE_DEFN(name) \
648 static struct stasis_message_type *_priv_ ## name; \
649 struct stasis_message_type *name(void) { \
650 if (_priv_ ## name == NULL) { \
651 stasis_log_bad_type_access(#name); \
653 return _priv_ ## name; \
657 * \brief Boiler-plate removing macro for initializing message types.
659 * \param name Name of message type.
660 * \return 0 if initialization is successful.
661 * \return Non-zero on failure.
664 #define STASIS_MESSAGE_TYPE_INIT(name) \
666 ast_assert(_priv_ ## name == NULL); \
667 _priv_ ## name = stasis_message_type_create(#name); \
668 _priv_ ## name ? 0 : -1; \
672 * \brief Boiler-plate removing macro for cleaning up message types.
674 * Note that if your type is defined in core instead of a loadable module, you
675 * should call message type cleanup from an ast_register_cleanup() handler
676 * instead of an ast_register_atexit() handler.
678 * The reason is that during an immediate shutdown, loadable modules (which may
679 * refer to core message types) are not unloaded. While the atexit handlers are
680 * run, there's a window of time where a module subscription might reference a
681 * core message type after it's been cleaned up. Which is bad.
683 * \param name Name of message type.
686 #define STASIS_MESSAGE_TYPE_CLEANUP(name) \
688 ao2_cleanup(_priv_ ## name); \
689 _priv_ ## name = NULL; \
697 * \brief Initialize the Stasis subsystem
698 * \return 0 on success.
699 * \return Non-zero on error.
702 int stasis_init(void);
706 * \brief called by stasis_init() for cache initialization.
707 * \return 0 on success.
708 * \return Non-zero on error.
711 int stasis_cache_init(void);
716 * \defgroup StasisTopicsAndMessages Stasis topics, and their messages.
718 * This group contains the topics, messages and corresponding message types
719 * found within Asterisk.
722 #endif /* _ASTERISK_STASIS_H */