2 * Asterisk -- An open source telephony toolkit.
4 * Copyright (C) 2013, Digium, Inc.
6 * David M. Lee, II <dlee@digium.com>
8 * See http://www.asterisk.org for more information about
9 * the Asterisk project. Please do not directly contact
10 * any of the maintainers of this project for assistance;
11 * the project provides a web site, mailing lists and IRC
12 * channels for your use.
14 * This program is free software, distributed under the terms of
15 * the GNU General Public License Version 2. See the LICENSE file
16 * at the top of the source tree.
19 #ifndef _ASTERISK_STASIS_H
20 #define _ASTERISK_STASIS_H
24 * \brief Stasis Message Bus API. See \ref stasis "Stasis Message Bus API" for
25 * detailed documentation.
27 * \author David M. Lee, II <dlee@digium.com>
30 * \page stasis Stasis Message Bus API
34 * The Stasis Message Bus is a loosely typed mechanism for distributing messages
35 * within Asterisk. It is designed to be:
36 * - Loosely coupled; new message types can be added in seperate modules.
37 * - Easy to use; publishing and subscribing are straightforward operations.
39 * There are three main concepts for using the Stasis Message Bus:
40 * - \ref stasis_message
42 * - \ref stasis_subscription
46 * Central to the Stasis Message Bus is the \ref stasis_message, the messages
47 * that are sent on the bus. These messages have:
48 * - a type (as defined by a \ref stasis_message_type)
49 * - a value - a \c void pointer to an AO2 object
50 * - a timestamp when it was created
52 * Once a \ref stasis_message has been created, it is immutable and cannot
53 * change. The same goes for the value of the message (although this cannot be
54 * enforced in code). Messages themselves are reference-counted, AO2 objects,
55 * along with their values. By being both reference counted and immutable,
56 * messages can be shared throughout the system without any concerns for
59 * The type of a message is defined by an instance of \ref stasis_message_type,
60 * which can be created by calling stasis_message_type_create(). Message types
61 * are named, which is useful in debugging. It is recommended that the string
62 * name for a message type match the name of the struct that's stored in the
63 * message. For example, name for \ref stasis_cache_update's message type is \c
64 * "stasis_cache_update".
68 * A \ref stasis_topic is an object to which \ref stasis_subscriber's may be
69 * subscribed, and \ref stasis_message's may be published. Any message published
70 * to the topic is dispatched to all of its subscribers. The topic itself may be
71 * named, which is useful in debugging.
73 * Topics themselves are reference counted objects. Since topics are referred to
74 * by their subscibers, they will not be freed until all of their subscribers
75 * have unsubscribed. Topics are also thread safe, so no worries about
76 * publishing/subscribing/unsubscribing to a topic concurrently from multiple
77 * threads. It's also designed to handle the case of unsubscribing from a topic
78 * from within the subscription handler.
82 * There is one special case of topics that's worth noting: forwarding
83 * messages. It's a fairly common use case to want to forward all the messages
84 * published on one topic to another one (for example, an aggregator topic that
85 * publishes all the events from a set of other topics). This can be
86 * accomplished easily using stasis_forward_all(). This sets up the forwarding
87 * between the two topics, and returns a \ref stasis_subscription, which can be
88 * unsubscribed to stop the forwarding.
92 * Another common use case is to want to cache certain messages that are
93 * published on the bus. Usually these events are snapshots of the current state
94 * in the system, and it's desirable to query that state from the cache without
95 * locking the original object. It's also desirable for subscribers of the
96 * caching topic to receive messages that have both the old cache value and the
97 * new value being put into the cache. For this, we have stasis_cache_create()
98 * and stasis_caching_topic_create(), providing them with the topic which
99 * publishes the messages that you wish to cache, and a function that can
100 * identify cacheable messages.
102 * The \ref stasis_cache is designed so that it may be shared amongst several
103 * \ref stasis_caching_topic objects. This allows you to have individual caching
104 * topics per-object (i.e. so you can subscribe to updates for a single object),
105 * and still have a single cache to query for the state of all objects. While a
106 * cache may be shared amongst different message types, such a usage is probably
109 * The \ref stasis_cache can only be written to by \ref stasis_caching_topics.
110 * It's a thread safe container, so freely use the stasis_cache_get() and
111 * stasis_cache_dump() to query the cache.
113 * The \ref stasis_caching_topic discards non-cacheable messages. A cacheable
114 * message is wrapped in a \ref stasis_cache_update message which provides the
115 * old snapshot (or \c NULL if this is a new cache entry), and the new snapshot
116 * (or \c NULL if the entry was removed from the cache). A
117 * stasis_cache_clear_create() message must be sent to the topic in order to
118 * remove entries from the cache.
120 * In order to unsubscribe a \ref stasis_caching_topic from the upstream topic,
121 * call stasis_caching_unsubscribe(). Due to cyclic references, the \ref
122 * stasis_caching_topic will not be freed until after it has been unsubscribed,
123 * and all other ao2_ref()'s have been cleaned up.
125 * The \ref stasis_cache object is a normal AO2 managed object, which can be
126 * release with ao2_cleanup().
128 * \par stasis_subscriber
130 * Any topic may be subscribed to by simply providing stasis_subscribe() the
131 * \ref stasis_topic to subscribe to, a handler function and \c void pointer to
132 * data that is passed back to the handler. Invocations on the subscription's
133 * handler are serialized, but different invocations may occur on different
134 * threads (this usually isn't important unless you use thread locals or
135 * something similar).
137 * In order to stop receiving messages, call stasis_unsubscribe() with your \ref
138 * stasis_subscription. Due to cyclic references, the \ref
139 * stasis_subscription will not be freed until after it has been unsubscribed,
140 * and all other ao2_ref()'s have been cleaned up.
144 * Subscriptions have two options for unsubscribing, depending upon the context
145 * in which you need to unsubscribe.
147 * If your subscription is owned by a module, and you must unsubscribe from the
148 * module_unload() function, then you'll want to use the
149 * stasis_unsubscribe_and_join() function. This will block until the final
150 * message has been received on the subscription. Otherwise, there's the danger
151 * of invoking the callback function after it has been unloaded.
153 * If your subscription is owned by an object, then your object should have an
154 * explicit shutdown() function, which calls stasis_unsubscribe(). In your
155 * subscription handler, when the stasis_subscription_final_message() has been
156 * received, decrement the refcount on your object. In your object's destructor,
157 * you may assert that stasis_subscription_is_done() to validate that the
158 * subscription's callback will no longer be invoked.
160 * \b Note: You may be tempted to simply call stasis_unsubscribe_and_join() from
161 * an object's destructor. While code that does this may work most of the time,
162 * it's got one big downside. There's a general assumption that object
163 * destruction is non-blocking. If you block the destruction waiting for the
164 * subscription to complete, there's the danger that the subscription may
165 * process a message which will bump the refcount up by one. Then it does
166 * whatever it does, decrements the refcount, which then proceeds to re-destroy
167 * the object. Now you've got hard to reproduce bugs that only show up under
171 #include "asterisk/json.h"
172 #include "asterisk/manager.h"
173 #include "asterisk/utils.h"
174 #include "asterisk/event.h"
177 * \brief Metadata about a \ref stasis_message.
180 struct stasis_message_type;
183 * \brief Opaque type for a Stasis message.
186 struct stasis_message;
189 * \brief Opaque type for a Stasis subscription.
192 struct stasis_subscription;
195 * \brief Structure containing callbacks for Stasis message sanitization
197 * \note If either callback is implemented, both should be implemented since
198 * not all callers may have access to the full snapshot.
200 struct stasis_message_sanitizer {
202 * \brief Callback which determines whether a channel should be sanitized from
203 * a message based on the channel's unique ID
205 * \param channel_id The unique ID of the channel
207 * \retval non-zero if the channel should be left out of the message
208 * \retval zero if the channel should remain in the message
210 int (*channel_id)(const char *channel_id);
213 * \brief Callback which determines whether a channel should be sanitized from
214 * a message based on the channel's snapshot
216 * \param snapshot A snapshot generated from the channel
218 * \retval non-zero if the channel should be left out of the message
219 * \retval zero if the channel should remain in the message
221 int (*channel_snapshot)(const struct ast_channel_snapshot *snapshot);
224 * \brief Callback which determines whether a channel should be sanitized from
225 * a message based on the channel
227 * \param chan The channel to be checked
229 * \retval non-zero if the channel should be left out of the message
230 * \retval zero if the channel should remain in the message
232 int (*channel)(const struct ast_channel *chan);
236 * \brief Virtual table providing methods for messages.
239 struct stasis_message_vtable {
241 * \brief Build the JSON representation of the message.
243 * May be \c NULL, or may return \c NULL, to indicate no representation.
244 * The returned object should be ast_json_unref()'ed.
246 * \param message Message to convert to JSON string.
247 * \param sanitize Snapshot sanitization callback.
249 * \return Newly allocated JSON message.
250 * \return \c NULL on error.
251 * \return \c NULL if JSON format is not supported.
253 struct ast_json *(*to_json)(struct stasis_message *message, const struct stasis_message_sanitizer *sanitize);
256 * \brief Build the AMI representation of the message.
258 * May be \c NULL, or may return \c NULL, to indicate no representation.
259 * The returned object should be ao2_cleanup()'ed.
261 * \param message Message to convert to AMI string.
262 * \return Newly allocated \ref ast_manager_event_blob.
263 * \return \c NULL on error.
264 * \return \c NULL if AMI format is not supported.
266 struct ast_manager_event_blob *(*to_ami)(
267 struct stasis_message *message);
271 * \brief Build the \ref ast_event representation of the message.
273 * May be \c NULL, or may return \c NULL, to indicate no representation.
274 * The returned object should be free'd.
276 * \param message Message to convert to an \ref ast_event.
277 * \return Newly allocated \ref ast_event.
278 * \return \c NULL on error.
279 * \return \c NULL if AMI format is not supported.
281 struct ast_event *(*to_event)(
282 struct stasis_message *message);
286 * \brief Return code for Stasis message type creation attempts
288 enum stasis_message_type_result {
289 STASIS_MESSAGE_TYPE_ERROR = -1, /*!< Message type was not created due to allocation failure */
290 STASIS_MESSAGE_TYPE_SUCCESS, /*!< Message type was created successfully */
291 STASIS_MESSAGE_TYPE_DECLINED, /*!< Message type was not created due to configuration */
295 * \brief Create a new message type.
297 * \ref stasis_message_type is an AO2 object, so ao2_cleanup() when you're done
300 * \param name Name of the new type.
301 * \param vtable Virtual table of message methods. May be \c NULL.
302 * \param[out] result The location where the new message type will be placed
304 * \note Stasis message type creation may be declined if the message type is disabled
306 * \returns A stasis_message_type_result enum
309 enum stasis_message_type_result stasis_message_type_create(const char *name,
310 struct stasis_message_vtable *vtable, struct stasis_message_type **result);
313 * \brief Gets the name of a given message type
314 * \param type The type to get.
315 * \return Name of the type.
316 * \return \c NULL if \a type is \c NULL.
319 const char *stasis_message_type_name(const struct stasis_message_type *type);
322 * \brief Check whether a message type is declined
324 * \param name The name of the message type to check
326 * \retval zero The message type is not declined
327 * \retval non-zero The message type is declined
329 int stasis_message_type_declined(const char *name);
332 * \brief Create a new message.
334 * This message is an \c ao2 object, and must be ao2_cleanup()'ed when you are done
335 * with it. Messages are also immutable, and must not be modified after they
336 * are initialized. Especially the \a data in the message.
338 * \param type Type of the message
339 * \param data Immutable data that is the actual contents of the message
341 * \return New message
342 * \return \c NULL on error
346 struct stasis_message *stasis_message_create(struct stasis_message_type *type, void *data);
349 * \brief Create a new message for an entity.
351 * This message is an \c ao2 object, and must be ao2_cleanup()'ed when you are done
352 * with it. Messages are also immutable, and must not be modified after they
353 * are initialized. Especially the \a data in the message.
355 * \param type Type of the message
356 * \param data Immutable data that is the actual contents of the message
357 * \param eid What entity originated this message. (NULL for aggregate)
359 * \note An aggregate message is a combined representation of the local
360 * and remote entities publishing the message data. e.g., An aggregate
361 * device state represents the combined device state from the local and
362 * any remote entities publishing state for a device. e.g., An aggregate
363 * MWI message is the old/new MWI counts accumulated from the local and
364 * any remote entities publishing to a mailbox.
366 * \retval New message
367 * \retval \c NULL on error
371 struct stasis_message *stasis_message_create_full(struct stasis_message_type *type, void *data, const struct ast_eid *eid);
374 * \brief Get the entity id for a \ref stasis_message.
377 * \param msg Message to get eid.
379 * \retval Entity id of \a msg
380 * \retval \c NULL if \a msg is an aggregate or \a msg is \c NULL.
382 const struct ast_eid *stasis_message_eid(const struct stasis_message *msg);
385 * \brief Get the message type for a \ref stasis_message.
386 * \param msg Message to type
387 * \return Type of \a msg
388 * \return \c NULL if \a msg is \c NULL.
391 struct stasis_message_type *stasis_message_type(const struct stasis_message *msg);
394 * \brief Get the data contained in a message.
395 * \param msg Message.
396 * \return Immutable data pointer
397 * \return \c NULL if msg is \c NULL.
400 void *stasis_message_data(const struct stasis_message *msg);
403 * \brief Get the time when a message was created.
404 * \param msg Message.
405 * \return Pointer to the \a timeval when the message was created.
406 * \return \c NULL if msg is \c NULL.
409 const struct timeval *stasis_message_timestamp(const struct stasis_message *msg);
412 * \brief Build the JSON representation of the message.
414 * May return \c NULL, to indicate no representation. The returned object should
415 * be ast_json_unref()'ed.
417 * \param message Message to convert to JSON string.
418 * \param sanitize Snapshot sanitization callback.
420 * \return Newly allocated string with JSON message.
421 * \return \c NULL on error.
422 * \return \c NULL if JSON format is not supported.
424 struct ast_json *stasis_message_to_json(struct stasis_message *message, struct stasis_message_sanitizer *sanitize);
427 * \brief Build the AMI representation of the message.
429 * May return \c NULL, to indicate no representation. The returned object should
430 * be ao2_cleanup()'ed.
432 * \param message Message to convert to AMI.
433 * \return \c NULL on error.
434 * \return \c NULL if AMI format is not supported.
436 struct ast_manager_event_blob *stasis_message_to_ami(
437 struct stasis_message *message);
440 * \brief Build the \ref AstGenericEvents representation of the message.
442 * May return \c NULL, to indicate no representation. The returned object should
443 * be disposed of via \ref ast_event_destroy.
445 * \param message Message to convert to AMI.
446 * \return \c NULL on error.
447 * \return \c NULL if AMI format is not supported.
449 struct ast_event *stasis_message_to_event(
450 struct stasis_message *message);
453 * \brief A topic to which messages may be posted, and subscribers, well, subscribe
459 * \brief Create a new topic.
460 * \param name Name of the new topic.
461 * \return New topic instance.
462 * \return \c NULL on error.
465 struct stasis_topic *stasis_topic_create(const char *name);
468 * \brief Return the name of a topic.
469 * \param topic Topic.
470 * \return Name of the topic.
471 * \return \c NULL if topic is \c NULL.
474 const char *stasis_topic_name(const struct stasis_topic *topic);
477 * \brief Publish a message to a topic's subscribers.
478 * \param topic Topic.
479 * \param message Message to publish.
481 * This call is asynchronous and will return immediately upon queueing
482 * the message for delivery to the topic's subscribers.
486 void stasis_publish(struct stasis_topic *topic, struct stasis_message *message);
489 * \brief Publish a message to a topic's subscribers, synchronizing
490 * on the specified subscriber
491 * \param sub Subscription to synchronize on.
492 * \param message Message to publish.
494 * The caller of stasis_publish_sync will block until the specified
495 * subscriber completes handling of the message.
497 * All other subscribers to the topic the \ref stasis_subpscription
498 * is subscribed to are also delivered the message; this delivery however
499 * happens asynchronously.
503 void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message);
506 * \brief Callback function type for Stasis subscriptions.
507 * \param data Data field provided with subscription.
508 * \param message Published message.
511 typedef void (*stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_message *message);
514 * \brief Stasis subscription callback function that does nothing.
516 * \note This callback should be used for events are not directly processed, but need
517 * to be generated so data can be retrieved from cache later. Subscriptions with this
518 * callback can be released with \ref stasis_unsubscribe, even during module unload.
522 void stasis_subscription_cb_noop(void *data, struct stasis_subscription *sub, struct stasis_message *message);
525 * \brief Create a subscription.
527 * In addition to being AO2 managed memory (requiring an ao2_cleanup() to free
528 * up this reference), the subscription must be explicitly unsubscribed from its
529 * topic using stasis_unsubscribe().
531 * The invocations of the callback are serialized, but may not always occur on
532 * the same thread. The invocation order of different subscriptions is
535 * \param topic Topic to subscribe to.
536 * \param callback Callback function for subscription messages.
537 * \param data Data to be passed to the callback, in addition to the message.
538 * \return New \ref stasis_subscription object.
539 * \return \c NULL on error.
542 struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic,
543 stasis_subscription_cb callback, void *data);
546 * \brief Create a subscription whose callbacks occur on a thread pool
548 * In addition to being AO2 managed memory (requiring an ao2_cleanup() to free
549 * up this reference), the subscription must be explicitly unsubscribed from its
550 * topic using stasis_unsubscribe().
552 * The invocations of the callback are serialized, but will almost certainly not
553 * always happen on the same thread. The invocation order of different subscriptions
556 * Unlike \ref stasis_subscribe, this function will explicitly use a threadpool to
557 * dispatch items to its \c callback. This form of subscription should be used
558 * when many subscriptions may be made to the specified \c topic.
560 * \param topic Topic to subscribe to.
561 * \param callback Callback function for subscription messages.
562 * \param data Data to be passed to the callback, in addition to the message.
563 * \return New \ref stasis_subscription object.
564 * \return \c NULL on error.
567 struct stasis_subscription *stasis_subscribe_pool(struct stasis_topic *topic,
568 stasis_subscription_cb callback, void *data);
571 * \brief Cancel a subscription.
573 * Note that in an asynchronous system, there may still be messages queued or
574 * in transit to the subscription's callback. These will still be delivered.
575 * There will be a final 'SubscriptionCancelled' message, indicating the
576 * delivery of the final message.
578 * \param subscription Subscription to cancel.
579 * \return \c NULL for convenience
582 struct stasis_subscription *stasis_unsubscribe(
583 struct stasis_subscription *subscription);
586 * \brief Block until the last message is processed on a subscription.
588 * This function will not return until the \a subscription's callback for the
589 * stasis_subscription_final_message() completes. This allows cleanup routines
590 * to run before unblocking the joining thread.
592 * \param subscription Subscription to block on.
595 void stasis_subscription_join(struct stasis_subscription *subscription);
598 * \brief Returns whether \a subscription has received its final message.
600 * Note that a subscription is considered done even while the
601 * stasis_subscription_final_message() is being processed. This allows cleanup
602 * routines to check the status of the subscription.
604 * \param subscription Subscription.
605 * \return True (non-zero) if stasis_subscription_final_message() has been
607 * \return False (zero) if waiting for the end.
609 int stasis_subscription_is_done(struct stasis_subscription *subscription);
612 * \brief Cancel a subscription, blocking until the last message is processed.
614 * While normally it's recommended to stasis_unsubscribe() and wait for
615 * stasis_subscription_final_message(), there are times (like during a module
616 * unload) where you have to wait for the final message (otherwise you'll call
617 * a function in a shared module that no longer exists).
619 * \param subscription Subscription to cancel.
620 * \return \c NULL for convenience
623 struct stasis_subscription *stasis_unsubscribe_and_join(
624 struct stasis_subscription *subscription);
626 struct stasis_forward;
629 * \brief Create a subscription which forwards all messages from one topic to
632 * Note that the \a topic parameter of the invoked callback will the be the
633 * \a topic the message was sent to, not the topic the subscriber subscribed to.
635 * \param from_topic Topic to forward.
636 * \param to_topic Destination topic of forwarded messages.
637 * \return New forwarding subscription.
638 * \return \c NULL on error.
641 struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic,
642 struct stasis_topic *to_topic);
644 struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward);
647 * \brief Get the unique ID for the subscription.
649 * \param sub Subscription for which to get the unique ID.
650 * \return Unique ID for the subscription.
653 const char *stasis_subscription_uniqueid(const struct stasis_subscription *sub);
656 * \brief Returns whether a subscription is currently subscribed.
658 * Note that there may still be messages queued up to be dispatched to this
659 * subscription, but the stasis_subscription_final_message() has been enqueued.
661 * \param sub Subscription to check
662 * \return False (zero) if subscription is not subscribed.
663 * \return True (non-zero) if still subscribed.
665 int stasis_subscription_is_subscribed(const struct stasis_subscription *sub);
668 * \brief Determine whether a message is the final message to be received on a subscription.
670 * \param sub Subscription on which the message was received.
671 * \param msg Message to check.
672 * \return zero if the provided message is not the final message.
673 * \return non-zero if the provided message is the final message.
676 int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg);
678 /*! \addtogroup StasisTopicsAndMessages
683 * \brief Holds details about changes to subscriptions for the specified topic
686 struct stasis_subscription_change {
687 AST_DECLARE_STRING_FIELDS(
688 AST_STRING_FIELD(uniqueid); /*!< The unique ID associated with this subscription */
689 AST_STRING_FIELD(description); /*!< The description of the change to the subscription associated with the uniqueid */
691 struct stasis_topic *topic; /*!< The topic the subscription is/was subscribing to */
695 * \brief Gets the message type for subscription change notices
696 * \return The stasis_message_type for subscription change notices
699 struct stasis_message_type *stasis_subscription_change_type(void);
704 * \brief Pool for topic aggregation
706 struct stasis_topic_pool;
709 * \brief Create a topic pool that routes messages from dynamically generated topics to the given topic
710 * \param pooled_topic Topic to which messages will be routed
711 * \return the new stasis_topic_pool
712 * \return \c NULL on failure
714 struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic);
717 * \brief Find or create a topic in the pool
718 * \param pool Pool for which to get the topic
719 * \param topic_name Name of the topic to get
720 * \return The already stored or newly allocated topic
721 * \return \c NULL if the topic was not found and could not be allocated
723 struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name);
725 /*! \addtogroup StasisTopicsAndMessages
730 * \brief Message type for cache update messages.
731 * \return Message type for cache update messages.
734 struct stasis_message_type *stasis_cache_update_type(void);
737 * \brief Cache update message
740 struct stasis_cache_update {
741 /*! \brief Convenience reference to snapshot type */
742 struct stasis_message_type *type;
743 /*! \brief Old value from the cache */
744 struct stasis_message *old_snapshot;
745 /*! \brief New value */
746 struct stasis_message *new_snapshot;
750 * \brief Message type for clearing a message from a stasis cache.
753 struct stasis_message_type *stasis_cache_clear_type(void);
758 * \brief A message cache, for use with \ref stasis_caching_topic.
763 /*! Cache entry used for calculating the aggregate snapshot. */
764 struct stasis_cache_entry;
767 * \brief A topic wrapper, which caches certain messages.
770 struct stasis_caching_topic;
774 * \brief Callback extract a unique identity from a snapshot message.
776 * This identity is unique to the underlying object of the snapshot, such as the
777 * UniqueId field of a channel.
779 * \param message Message to extract id from.
780 * \return String representing the snapshot's id.
781 * \return \c NULL if the message_type of the message isn't a handled snapshot.
784 typedef const char *(*snapshot_get_id)(struct stasis_message *message);
787 * \brief Callback to calculate the aggregate cache entry.
790 * \param entry Cache entry to calculate a new aggregate snapshot.
791 * \param new_snapshot The shapshot that is being updated.
793 * \note Return a ref bumped pointer from stasis_cache_entry_get_aggregate()
794 * if a new aggregate could not be calculated because of error.
796 * \note An aggregate message is a combined representation of the local
797 * and remote entities publishing the message data. e.g., An aggregate
798 * device state represents the combined device state from the local and
799 * any remote entities publishing state for a device. e.g., An aggregate
800 * MWI message is the old/new MWI counts accumulated from the local and
801 * any remote entities publishing to a mailbox.
803 * \return New aggregate-snapshot calculated on success.
804 * Caller has a reference on return.
806 typedef struct stasis_message *(*cache_aggregate_calc_fn)(struct stasis_cache_entry *entry, struct stasis_message *new_snapshot);
809 * \brief Callback to publish the aggregate cache entry message.
813 * Once an aggregate message is calculated. This callback publishes the
814 * message so subscribers will know the new value of an aggregated state.
816 * \param topic The aggregate message may be published to this topic.
817 * It is the topic to which the cache itself is subscribed.
818 * \param aggregate The aggregate shapshot message to publish.
820 * \note It is up to the function to determine if there is a better topic
821 * the aggregate message should be published over.
823 * \note An aggregate message is a combined representation of the local
824 * and remote entities publishing the message data. e.g., An aggregate
825 * device state represents the combined device state from the local and
826 * any remote entities publishing state for a device. e.g., An aggregate
827 * MWI message is the old/new MWI counts accumulated from the local and
828 * any remote entities publishing to a mailbox.
832 typedef void (*cache_aggregate_publish_fn)(struct stasis_topic *topic, struct stasis_message *aggregate);
835 * \brief Get the aggregate cache entry snapshot.
838 * \param entry Cache entry to get the aggregate snapshot.
840 * \note A reference is not given to the returned pointer so don't unref it.
842 * \note An aggregate message is a combined representation of the local
843 * and remote entities publishing the message data. e.g., An aggregate
844 * device state represents the combined device state from the local and
845 * any remote entities publishing state for a device. e.g., An aggregate
846 * MWI message is the old/new MWI counts accumulated from the local and
847 * any remote entities publishing to a mailbox.
849 * \retval Aggregate-snapshot in cache.
850 * \retval NULL if not present.
852 struct stasis_message *stasis_cache_entry_get_aggregate(struct stasis_cache_entry *entry);
855 * \brief Get the local entity's cache entry snapshot.
858 * \param entry Cache entry to get the local entity's snapshot.
860 * \note A reference is not given to the returned pointer so don't unref it.
862 * \retval Internal-snapshot in cache.
863 * \retval NULL if not present.
865 struct stasis_message *stasis_cache_entry_get_local(struct stasis_cache_entry *entry);
868 * \brief Get a remote entity's cache entry snapshot by index.
871 * \param entry Cache entry to get a remote entity's snapshot.
872 * \param idx Which remote entity's snapshot to get.
874 * \note A reference is not given to the returned pointer so don't unref it.
876 * \retval Remote-entity-snapshot in cache.
877 * \retval NULL if not present.
879 struct stasis_message *stasis_cache_entry_get_remote(struct stasis_cache_entry *entry, int idx);
882 * \brief Create a cache.
884 * This is the backend store for a \ref stasis_caching_topic. The cache is
885 * thread safe, allowing concurrent reads and writes.
887 * The returned object is AO2 managed, so ao2_cleanup() when you're done.
889 * \param id_fn Callback to extract the id from a snapshot message.
891 * \retval New cache indexed by \a id_fn.
892 * \retval \c NULL on error
896 struct stasis_cache *stasis_cache_create(snapshot_get_id id_fn);
899 * \brief Create a cache.
901 * This is the backend store for a \ref stasis_caching_topic. The cache is
902 * thread safe, allowing concurrent reads and writes.
904 * The returned object is AO2 managed, so ao2_cleanup() when you're done.
906 * \param id_fn Callback to extract the id from a snapshot message.
907 * \param aggregate_calc_fn Callback to calculate the aggregate cache entry.
908 * \param aggregate_publish_fn Callback to publish the aggregate cache entry.
910 * \note An aggregate message is a combined representation of the local
911 * and remote entities publishing the message data. e.g., An aggregate
912 * device state represents the combined device state from the local and
913 * any remote entities publishing state for a device. e.g., An aggregate
914 * MWI message is the old/new MWI counts accumulated from the local and
915 * any remote entities publishing to a mailbox.
917 * \retval New cache indexed by \a id_fn.
918 * \retval \c NULL on error
922 struct stasis_cache *stasis_cache_create_full(snapshot_get_id id_fn, cache_aggregate_calc_fn aggregate_calc_fn, cache_aggregate_publish_fn aggregate_publish_fn);
925 * \brief Create a topic which monitors and caches messages from another topic.
927 * The idea is that some topics publish 'snapshots' of some other object's state
928 * that should be cached. When these snapshot messages are received, the cache
929 * is updated, and a stasis_cache_update() message is forwarded, which has both
930 * the original snapshot message and the new message.
932 * The returned object is AO2 managed, so ao2_cleanup() when done with it.
934 * \param original_topic Topic publishing snapshot messages.
935 * \param cache Backend cache in which to keep snapshots.
936 * \return New topic which changes snapshot messages to stasis_cache_update()
937 * messages, and forwards all other messages from the original topic.
938 * \return \c NULL on error
941 struct stasis_caching_topic *stasis_caching_topic_create(
942 struct stasis_topic *original_topic, struct stasis_cache *cache);
945 * \brief Unsubscribes a caching topic from its upstream topic.
947 * This function returns immediately, so be sure to cleanup when
948 * stasis_subscription_final_message() is received.
950 * \param caching_topic Caching topic to unsubscribe
951 * \return \c NULL for convenience
954 struct stasis_caching_topic *stasis_caching_unsubscribe(
955 struct stasis_caching_topic *caching_topic);
958 * \brief Unsubscribes a caching topic from its upstream topic, blocking until
959 * all messages have been forwarded.
961 * See stasis_unsubscriben_and_join() for more info on when to use this as
962 * opposed to stasis_caching_unsubscribe().
964 * \param caching_topic Caching topic to unsubscribe
965 * \return \c NULL for convenience
968 struct stasis_caching_topic *stasis_caching_unsubscribe_and_join(
969 struct stasis_caching_topic *caching_topic);
972 * \brief Returns the topic of cached events from a caching topics.
973 * \param caching_topic The caching topic.
974 * \return The topic that publishes cache update events, along with passthrough
975 * events from the underlying topic.
976 * \return \c NULL if \a caching_topic is \c NULL.
979 struct stasis_topic *stasis_caching_get_topic(
980 struct stasis_caching_topic *caching_topic);
983 * \brief A message which instructs the caching topic to remove an entry from
986 * \param message Message representative of the cache entry that should be
987 * cleared. This will become the data held in the
988 * stasis_cache_clear message.
990 * \return Message which, when sent to a \ref stasis_caching_topic, will clear
991 * the item from the cache.
992 * \return \c NULL on error.
995 struct stasis_message *stasis_cache_clear_create(struct stasis_message *message);
998 * \brief Retrieve an item from the cache for the ast_eid_default entity.
1000 * The returned item is AO2 managed, so ao2_cleanup() when you're done with it.
1002 * \param cache The cache to query.
1003 * \param type Type of message to retrieve.
1004 * \param id Identity of the snapshot to retrieve.
1006 * \retval Message from the cache.
1007 * \retval \c NULL if message is not found.
1011 struct stasis_message *stasis_cache_get(struct stasis_cache *cache, struct stasis_message_type *type, const char *id);
1014 * \brief Retrieve an item from the cache for a specific entity.
1016 * The returned item is AO2 managed, so ao2_cleanup() when you're done with it.
1018 * \param cache The cache to query.
1019 * \param type Type of message to retrieve.
1020 * \param id Identity of the snapshot to retrieve.
1021 * \param eid Specific entity id to retrieve. NULL for aggregate.
1023 * \note An aggregate message is a combined representation of the local
1024 * and remote entities publishing the message data. e.g., An aggregate
1025 * device state represents the combined device state from the local and
1026 * any remote entities publishing state for a device. e.g., An aggregate
1027 * MWI message is the old/new MWI counts accumulated from the local and
1028 * any remote entities publishing to a mailbox.
1030 * \retval Message from the cache.
1031 * \retval \c NULL if message is not found.
1035 struct stasis_message *stasis_cache_get_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const char *id, const struct ast_eid *eid);
1038 * \brief Retrieve all matching entity items from the cache.
1041 * \param cache The cache to query.
1042 * \param type Type of message to retrieve.
1043 * \param id Identity of the snapshot to retrieve.
1045 * \retval Container of matching items found.
1046 * \retval \c NULL if error.
1048 struct ao2_container *stasis_cache_get_all(struct stasis_cache *cache, struct stasis_message_type *type, const char *id);
1051 * \brief Dump cached items to a subscription for the ast_eid_default entity.
1053 * \param cache The cache to query.
1054 * \param type Type of message to dump (any type if \c NULL).
1056 * \retval ao2_container containing all matches (must be unreffed by caller)
1057 * \retval \c NULL on allocation error
1061 struct ao2_container *stasis_cache_dump(struct stasis_cache *cache, struct stasis_message_type *type);
1064 * \brief Dump cached items to a subscription for a specific entity.
1067 * \param cache The cache to query.
1068 * \param type Type of message to dump (any type if \c NULL).
1069 * \param eid Specific entity id to retrieve. NULL for aggregate.
1071 * \retval ao2_container containing all matches (must be unreffed by caller)
1072 * \retval \c NULL on allocation error
1074 struct ao2_container *stasis_cache_dump_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const struct ast_eid *eid);
1077 * \brief Dump all entity items from the cache to a subscription.
1080 * \param cache The cache to query.
1081 * \param type Type of message to dump (any type if \c NULL).
1083 * \retval ao2_container containing all matches (must be unreffed by caller)
1084 * \retval \c NULL on allocation error
1086 struct ao2_container *stasis_cache_dump_all(struct stasis_cache *cache, struct stasis_message_type *type);
1088 /*! \addtogroup StasisTopicsAndMessages
1093 * \brief Object type code for multi user object snapshots
1095 enum stasis_user_multi_object_snapshot_type {
1096 STASIS_UMOS_CHANNEL = 0, /*!< Channel Snapshots */
1097 STASIS_UMOS_BRIDGE, /*!< Bridge Snapshots */
1098 STASIS_UMOS_ENDPOINT, /*!< Endpoint Snapshots */
1101 /*! \brief Number of snapshot types */
1102 #define STASIS_UMOS_MAX (STASIS_UMOS_ENDPOINT + 1)
1105 * \brief Message type for custom user defined events with multi object blobs
1106 * \return The stasis_message_type for user event
1109 struct stasis_message_type *ast_multi_user_event_type(void);
1112 * \brief Create a stasis multi object blob
1116 * Multi object blob can store a combination of arbitrary json values
1117 * (the blob) and also snapshots of various other system objects (such
1118 * as channels, bridges, etc) for delivery through a stasis message.
1119 * The multi object blob is first created, then optionally objects
1120 * are added to it, before being attached to a message and delivered
1123 * \param blob Json blob
1125 * \note When used for an ast_multi_user_event_type message, the
1126 * json blob should contain at minimum {eventname: name}.
1128 * \retval ast_multi_object_blob* if succeeded
1129 * \retval NULL if creation failed
1131 struct ast_multi_object_blob *ast_multi_object_blob_create(struct ast_json *blob);
1134 * \brief Add an object to a multi object blob previously created
1137 * \param multi The multi object blob previously created
1138 * \param type Type code for the object such as channel, bridge, etc.
1139 * \param object Snapshot object of the type supplied to typename
1143 void ast_multi_object_blob_add(struct ast_multi_object_blob *multi, enum stasis_user_multi_object_snapshot_type type, void *object);
1146 * \brief Create and publish a stasis message blob on a channel with it's snapshot
1150 * For compatibility with app_userevent, this creates a multi object
1151 * blob message, attaches the channel snapshot to it, and publishes it
1152 * to the channel's topic.
1154 * \param chan The channel to snapshot and publish event to
1155 * \param type The message type
1156 * \param blob A json blob to publish with the snapshot
1160 void ast_multi_object_blob_single_channel_publish(struct ast_channel *chan, struct stasis_message_type *type, struct ast_json *blob);
1167 * \brief Log a message about invalid attempt to access a type.
1169 void stasis_log_bad_type_access(const char *name);
1172 * \brief Boiler-plate messaging macro for defining public message types.
1175 * STASIS_MESSAGE_TYPE_DEFN(ast_foo_type,
1176 * .to_ami = foo_to_ami,
1177 * .to_json = foo_to_json,
1178 * .to_event = foo_to_event,
1182 * \param name Name of message type.
1183 * \param ... Virtual table methods for messages of this type.
1186 #define STASIS_MESSAGE_TYPE_DEFN(name, ...) \
1187 static struct stasis_message_vtable _priv_ ## name ## _v = { \
1190 static struct stasis_message_type *_priv_ ## name; \
1191 struct stasis_message_type *name(void) { \
1192 if (_priv_ ## name == NULL) { \
1193 stasis_log_bad_type_access(#name); \
1195 return _priv_ ## name; \
1199 * \brief Boiler-plate messaging macro for defining local message types.
1202 * STASIS_MESSAGE_TYPE_DEFN_LOCAL(ast_foo_type,
1203 * .to_ami = foo_to_ami,
1204 * .to_json = foo_to_json,
1205 * .to_event = foo_to_event,
1209 * \param name Name of message type.
1210 * \param ... Virtual table methods for messages of this type.
1213 #define STASIS_MESSAGE_TYPE_DEFN_LOCAL(name, ...) \
1214 static struct stasis_message_vtable _priv_ ## name ## _v = { \
1217 static struct stasis_message_type *_priv_ ## name; \
1218 static struct stasis_message_type *name(void) { \
1219 if (_priv_ ## name == NULL) { \
1220 stasis_log_bad_type_access(#name); \
1222 return _priv_ ## name; \
1226 * \brief Boiler-plate messaging macro for initializing message types.
1229 * if (STASIS_MESSAGE_TYPE_INIT(ast_foo_type) != 0) {
1234 * \param name Name of message type.
1235 * \return 0 if initialization is successful.
1236 * \return Non-zero on failure.
1239 #define STASIS_MESSAGE_TYPE_INIT(name) \
1241 ast_assert(_priv_ ## name == NULL); \
1242 stasis_message_type_create(#name, \
1243 &_priv_ ## name ## _v, &_priv_ ## name) == STASIS_MESSAGE_TYPE_ERROR ? 1 : 0; \
1247 * \brief Boiler-plate messaging macro for cleaning up message types.
1249 * Note that if your type is defined in core instead of a loadable module, you
1250 * should call message type cleanup from an ast_register_cleanup() handler
1251 * instead of an ast_register_atexit() handler.
1253 * The reason is that during an immediate shutdown, loadable modules (which may
1254 * refer to core message types) are not unloaded. While the atexit handlers are
1255 * run, there's a window of time where a module subscription might reference a
1256 * core message type after it's been cleaned up. Which is bad.
1258 * \param name Name of message type.
1261 #define STASIS_MESSAGE_TYPE_CLEANUP(name) \
1263 ao2_cleanup(_priv_ ## name); \
1264 _priv_ ## name = NULL; \
1268 * \brief Initialize the Stasis subsystem.
1269 * \return 0 on success.
1270 * \return Non-zero on error.
1273 int stasis_init(void);
1277 * \brief called by stasis_init() for cache initialization.
1278 * \return 0 on success.
1279 * \return Non-zero on error.
1282 int stasis_cache_init(void);
1286 * \brief called by stasis_init() for config initialization.
1287 * \return 0 on success.
1288 * \return Non-zero on error.
1291 int stasis_config_init(void);
1294 * \defgroup StasisTopicsAndMessages Stasis topics, and their messages.
1296 * \brief This group contains the topics, messages and corresponding message types
1297 * found within Asterisk.
1300 #endif /* _ASTERISK_STASIS_H */