20870e6d6b9b22b04437903fcf95c8c1eb528b87
[asterisk/asterisk.git] / include / asterisk / stasis.h
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 #ifndef _ASTERISK_STASIS_H
20 #define _ASTERISK_STASIS_H
21
22 /*! \file
23  *
24  * \brief Stasis Message Bus API. See \ref stasis "Stasis Message Bus API" for
25  * detailed documentation.
26  *
27  * \author David M. Lee, II <dlee@digium.com>
28  * \since 12
29  *
30  * \page stasis Stasis Message Bus API
31  *
32  * \par Intro
33  *
34  * The Stasis Message Bus is a loosely typed mechanism for distributing messages
35  * within Asterisk. It is designed to be:
36  *  - Loosely coupled; new message types can be added in seperate modules.
37  *  - Easy to use; publishing and subscribing are straightforward operations.
38  *
39  * There are three main concepts for using the Stasis Message Bus:
40  *  - \ref stasis_message
41  *  - \ref stasis_topic
42  *  - \ref stasis_subscription
43  *
44  * \par stasis_message
45  *
46  * Central to the Stasis Message Bus is the \ref stasis_message, the messages
47  * that are sent on the bus. These messages have:
48  *  - a type (as defined by a \ref stasis_message_type)
49  *  - a value - a \c void pointer to an AO2 object
50  *  - a timestamp when it was created
51  *
52  * Once a \ref stasis_message has been created, it is immutable and cannot
53  * change. The same goes for the value of the message (although this cannot be
54  * enforced in code). Messages themselves are reference-counted, AO2 objects,
55  * along with their values. By being both reference counted and immutable,
56  * messages can be shared throughout the system without any concerns for
57  * threading.
58  *
59  * The type of a message is defined by an instance of \ref stasis_message_type,
60  * which can be created by calling stasis_message_type_create(). Message types
61  * are named, which is useful in debugging. It is recommended that the string
62  * name for a message type match the name of the struct that's stored in the
63  * message. For example, name for \ref stasis_cache_update's message type is \c
64  * "stasis_cache_update".
65  *
66  * \par stasis_topic
67  *
68  * A \ref stasis_topic is an object to which \ref stasis_subscriber's may be
69  * subscribed, and \ref stasis_message's may be published. Any message published
70  * to the topic is dispatched to all of its subscribers. The topic itself may be
71  * named, which is useful in debugging.
72  *
73  * Topics themselves are reference counted objects. Since topics are referred to
74  * by their subscibers, they will not be freed until all of their subscribers
75  * have unsubscribed. Topics are also thread safe, so no worries about
76  * publishing/subscribing/unsubscribing to a topic concurrently from multiple
77  * threads. It's also designed to handle the case of unsubscribing from a topic
78  * from within the subscription handler.
79  *
80  * \par Forwarding
81  *
82  * There is one special case of topics that's worth noting: forwarding
83  * messages. It's a fairly common use case to want to forward all the messages
84  * published on one topic to another one (for example, an aggregator topic that
85  * publishes all the events from a set of other topics). This can be
86  * accomplished easily using stasis_forward_all(). This sets up the forwarding
87  * between the two topics, and returns a \ref stasis_subscription, which can be
88  * unsubscribed to stop the forwarding.
89  *
90  * \par Caching
91  *
92  * Another common use case is to want to cache certain messages that are
93  * published on the bus. Usually these events are snapshots of the current state
94  * in the system, and it's desirable to query that state from the cache without
95  * locking the original object. It's also desirable for subscribers of the
96  * caching topic to receive messages that have both the old cache value and the
97  * new value being put into the cache. For this, we have stasis_cache_create()
98  * and stasis_caching_topic_create(), providing them with the topic which
99  * publishes the messages that you wish to cache, and a function that can
100  * identify cacheable messages.
101  *
102  * The \ref stasis_cache is designed so that it may be shared amongst several
103  * \ref stasis_caching_topic objects. This allows you to have individual caching
104  * topics per-object (i.e. so you can subscribe to updates for a single object),
105  * and still have a single cache to query for the state of all objects. While a
106  * cache may be shared amongst different message types, such a usage is probably
107  * not a good idea.
108  *
109  * The \ref stasis_cache can only be written to by \ref stasis_caching_topics.
110  * It's a thread safe container, so freely use the stasis_cache_get() and
111  * stasis_cache_dump() to query the cache.
112  *
113  * The \ref stasis_caching_topic discards non-cacheable messages. A cacheable
114  * message is wrapped in a \ref stasis_cache_update message which provides the
115  * old snapshot (or \c NULL if this is a new cache entry), and the new snapshot
116  * (or \c NULL if the entry was removed from the cache). A
117  * stasis_cache_clear_create() message must be sent to the topic in order to
118  * remove entries from the cache.
119  *
120  * In order to unsubscribe a \ref stasis_caching_topic from the upstream topic,
121  * call stasis_caching_unsubscribe(). Due to cyclic references, the \ref
122  * stasis_caching_topic will not be freed until after it has been unsubscribed,
123  * and all other ao2_ref()'s have been cleaned up.
124  *
125  * The \ref stasis_cache object is a normal AO2 managed object, which can be
126  * release with ao2_cleanup().
127  *
128  * \par stasis_subscriber
129  *
130  * Any topic may be subscribed to by simply providing stasis_subscribe() the
131  * \ref stasis_topic to subscribe to, a handler function and \c void pointer to
132  * data that is passed back to the handler. Invocations on the subscription's
133  * handler are serialized, but different invocations may occur on different
134  * threads (this usually isn't important unless you use thread locals or
135  * something similar).
136  *
137  * In order to stop receiving messages, call stasis_unsubscribe() with your \ref
138  * stasis_subscription. Due to cyclic references, the \ref
139  * stasis_subscription will not be freed until after it has been unsubscribed,
140  * and all other ao2_ref()'s have been cleaned up.
141  *
142  * \par Shutdown
143  *
144  * Subscriptions have two options for unsubscribing, depending upon the context
145  * in which you need to unsubscribe.
146  *
147  * If your subscription is owned by a module, and you must unsubscribe from the
148  * module_unload() function, then you'll want to use the
149  * stasis_unsubscribe_and_join() function. This will block until the final
150  * message has been received on the subscription. Otherwise, there's the danger
151  * of invoking the callback function after it has been unloaded.
152  *
153  * If your subscription is owned by an object, then your object should have an
154  * explicit shutdown() function, which calls stasis_unsubscribe(). In your
155  * subscription handler, when the stasis_subscription_final_message() has been
156  * received, decrement the refcount on your object. In your object's destructor,
157  * you may assert that stasis_subscription_is_done() to validate that the
158  * subscription's callback will no longer be invoked.
159  *
160  * \b Note: You may be tempted to simply call stasis_unsubscribe_and_join() from
161  * an object's destructor. While code that does this may work most of the time,
162  * it's got one big downside. There's a general assumption that object
163  * destruction is non-blocking. If you block the destruction waiting for the
164  * subscription to complete, there's the danger that the subscription may
165  * process a message which will bump the refcount up by one. Then it does
166  * whatever it does, decrements the refcount, which then proceeds to re-destroy
167  * the object. Now you've got hard to reproduce bugs that only show up under
168  * certain loads.
169  */
170
171 #include "asterisk/json.h"
172 #include "asterisk/manager.h"
173 #include "asterisk/utils.h"
174
175 /*! @{ */
176
177 /*!
178  * \brief Metadata about a \ref stasis_message.
179  * \since 12
180  */
181 struct stasis_message_type;
182
183 /*!
184  * \brief Opaque type for a Stasis message.
185  * \since 12
186  */
187 struct stasis_message;
188
189 /*!
190  * \brief Opaque type for a Stasis subscription.
191  * \since 12
192  */
193 struct stasis_subscription;
194
195 /*!
196  * \brief Structure containing callbacks for Stasis message sanitization
197  *
198  * \note If either callback is implemented, both should be implemented since
199  * not all callers may have access to the full snapshot.
200  */
201 struct stasis_message_sanitizer {
202         /*!
203          * \brief Callback which determines whether a channel should be sanitized from
204          * a message based on the channel's unique ID
205          *
206          * \param channel_id The unique ID of the channel
207          *
208          * \retval non-zero if the channel should be left out of the message
209          * \retval zero if the channel should remain in the message
210          */
211         int (*channel_id)(const char *channel_id);
212
213         /*!
214          * \brief Callback which determines whether a channel should be sanitized from
215          * a message based on the channel's snapshot
216          *
217          * \param snapshot A snapshot generated from the channel
218          *
219          * \retval non-zero if the channel should be left out of the message
220          * \retval zero if the channel should remain in the message
221          */
222         int (*channel_snapshot)(const struct ast_channel_snapshot *snapshot);
223 };
224
225 /*!
226  * \brief Virtual table providing methods for messages.
227  * \since 12
228  */
229 struct stasis_message_vtable {
230         /*!
231          * \brief Build the JSON representation of the message.
232          *
233          * May be \c NULL, or may return \c NULL, to indicate no representation.
234          * The returned object should be ast_json_unref()'ed.
235          *
236          * \param message Message to convert to JSON string.
237          * \param sanitize Snapshot sanitization callback.
238          *
239          * \return Newly allocated JSON message.
240          * \return \c NULL on error.
241          * \return \c NULL if JSON format is not supported.
242          */
243         struct ast_json *(*to_json)(struct stasis_message *message, const struct stasis_message_sanitizer *sanitize);
244
245         /*!
246          * \brief Build the AMI representation of the message.
247          *
248          * May be \c NULL, or may return \c NULL, to indicate no representation.
249          * The returned object should be ao2_cleanup()'ed.
250          *
251          * \param message Message to convert to AMI string.
252          * \return Newly allocated \ref ast_manager_event_blob.
253          * \return \c NULL on error.
254          * \return \c NULL if AMI format is not supported.
255          */
256         struct ast_manager_event_blob *(*to_ami)(
257                 struct stasis_message *message);
258 };
259
260 /*!
261  * \brief Create a new message type.
262  *
263  * \ref stasis_message_type is an AO2 object, so ao2_cleanup() when you're done
264  * with it.
265  *
266  * \param name Name of the new type.
267  * \param vtable Virtual table of message methods. May be \c NULL.
268  * \return Pointer to the new type.
269  * \return \c NULL on error.
270  * \since 12
271  */
272 struct stasis_message_type *stasis_message_type_create(const char *name,
273         struct stasis_message_vtable *vtable);
274
275 /*!
276  * \brief Gets the name of a given message type
277  * \param type The type to get.
278  * \return Name of the type.
279  * \return \c NULL if \a type is \c NULL.
280  * \since 12
281  */
282 const char *stasis_message_type_name(const struct stasis_message_type *type);
283
284 /*!
285  * \brief Create a new message.
286  *
287  * This message is an \c ao2 object, and must be ao2_cleanup()'ed when you are done
288  * with it. Messages are also immutable, and must not be modified after they
289  * are initialized. Especially the \a data in the message.
290  *
291  * \param type Type of the message
292  * \param data Immutable data that is the actual contents of the message
293  *
294  * \return New message
295  * \return \c NULL on error
296  *
297  * \since 12
298  */
299 struct stasis_message *stasis_message_create(struct stasis_message_type *type, void *data);
300
301 /*!
302  * \brief Create a new message for an entity.
303  *
304  * This message is an \c ao2 object, and must be ao2_cleanup()'ed when you are done
305  * with it. Messages are also immutable, and must not be modified after they
306  * are initialized. Especially the \a data in the message.
307  *
308  * \param type Type of the message
309  * \param data Immutable data that is the actual contents of the message
310  * \param eid What entity originated this message. (NULL for aggregate)
311  *
312  * \note An aggregate message is a combined representation of the local
313  * and remote entities publishing the message data.  e.g., An aggregate
314  * device state represents the combined device state from the local and
315  * any remote entities publishing state for a device.  e.g., An aggregate
316  * MWI message is the old/new MWI counts accumulated from the local and
317  * any remote entities publishing to a mailbox.
318  *
319  * \retval New message
320  * \retval \c NULL on error
321  *
322  * \since 12.2.0
323  */
324 struct stasis_message *stasis_message_create_full(struct stasis_message_type *type, void *data, const struct ast_eid *eid);
325
326 /*!
327  * \brief Get the entity id for a \ref stasis_message.
328  * \since 12.2.0
329  *
330  * \param msg Message to get eid.
331  *
332  * \retval Entity id of \a msg
333  * \retval \c NULL if \a msg is an aggregate or \a msg is \c NULL.
334  */
335 const struct ast_eid *stasis_message_eid(const struct stasis_message *msg);
336
337 /*!
338  * \brief Get the message type for a \ref stasis_message.
339  * \param msg Message to type
340  * \return Type of \a msg
341  * \return \c NULL if \a msg is \c NULL.
342  * \since 12
343  */
344 struct stasis_message_type *stasis_message_type(const struct stasis_message *msg);
345
346 /*!
347  * \brief Get the data contained in a message.
348  * \param msg Message.
349  * \return Immutable data pointer
350  * \return \c NULL if msg is \c NULL.
351  * \since 12
352  */
353 void *stasis_message_data(const struct stasis_message *msg);
354
355 /*!
356  * \brief Get the time when a message was created.
357  * \param msg Message.
358  * \return Pointer to the \a timeval when the message was created.
359  * \return \c NULL if msg is \c NULL.
360  * \since 12
361  */
362 const struct timeval *stasis_message_timestamp(const struct stasis_message *msg);
363
364 /*!
365  * \brief Build the JSON representation of the message.
366  *
367  * May return \c NULL, to indicate no representation. The returned object should
368  * be ast_json_unref()'ed.
369  *
370  * \param message Message to convert to JSON string.
371  * \param sanitize Snapshot sanitization callback.
372  *
373  * \return Newly allocated string with JSON message.
374  * \return \c NULL on error.
375  * \return \c NULL if JSON format is not supported.
376  */
377 struct ast_json *stasis_message_to_json(struct stasis_message *message, struct stasis_message_sanitizer *sanitize);
378
379 /*!
380  * \brief Build the AMI representation of the message.
381  *
382  * May return \c NULL, to indicate no representation. The returned object should
383  * be ao2_cleanup()'ed.
384  *
385  * \param message Message to convert to AMI.
386  * \return \c NULL on error.
387  * \return \c NULL if AMI format is not supported.
388  */
389 struct ast_manager_event_blob *stasis_message_to_ami(
390         struct stasis_message *message);
391
392 /*! @} */
393
394 /*! @{ */
395
396 /*!
397  * \brief A topic to which messages may be posted, and subscribers, well, subscribe
398  * \since 12
399  */
400 struct stasis_topic;
401
402 /*!
403  * \brief Create a new topic.
404  * \param name Name of the new topic.
405  * \return New topic instance.
406  * \return \c NULL on error.
407  * \since 12
408  */
409 struct stasis_topic *stasis_topic_create(const char *name);
410
411 /*!
412  * \brief Return the name of a topic.
413  * \param topic Topic.
414  * \return Name of the topic.
415  * \return \c NULL if topic is \c NULL.
416  * \since 12
417  */
418 const char *stasis_topic_name(const struct stasis_topic *topic);
419
420 /*!
421  * \brief Publish a message to a topic's subscribers.
422  * \param topic Topic.
423  * \param message Message to publish.
424  *
425  * This call is asynchronous and will return immediately upon queueing
426  * the message for delivery to the topic's subscribers.
427  *
428  * \since 12
429  */
430 void stasis_publish(struct stasis_topic *topic, struct stasis_message *message);
431
432 /*!
433  * \brief Publish a message to a topic's subscribers, synchronizing
434  * on the specified subscriber
435  * \param sub Subscription to synchronize on.
436  * \param message Message to publish.
437  *
438  * The caller of stasis_publish_sync will block until the specified
439  * subscriber completes handling of the message.
440  *
441  * All other subscribers to the topic the \ref stasis_subpscription
442  * is subscribed to are also delivered the message; this delivery however
443  * happens asynchronously.
444  *
445  * \since 12.1.0
446  */
447 void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message);
448
449 /*! @} */
450
451 /*! @{ */
452
453 /*!
454  * \brief Callback function type for Stasis subscriptions.
455  * \param data Data field provided with subscription.
456  * \param message Published message.
457  * \since 12
458  */
459 typedef void (*stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_message *message);
460
461 /*!
462  * \brief Create a subscription.
463  *
464  * In addition to being AO2 managed memory (requiring an ao2_cleanup() to free
465  * up this reference), the subscription must be explicitly unsubscribed from its
466  * topic using stasis_unsubscribe().
467  *
468  * The invocations of the callback are serialized, but may not always occur on
469  * the same thread. The invocation order of different subscriptions is
470  * unspecified.
471  *
472  * \param topic Topic to subscribe to.
473  * \param callback Callback function for subscription messages.
474  * \param data Data to be passed to the callback, in addition to the message.
475  * \return New \ref stasis_subscription object.
476  * \return \c NULL on error.
477  * \since 12
478  */
479 struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic,
480         stasis_subscription_cb callback, void *data);
481
482 /*!
483  * \brief Cancel a subscription.
484  *
485  * Note that in an asynchronous system, there may still be messages queued or
486  * in transit to the subscription's callback. These will still be delivered.
487  * There will be a final 'SubscriptionCancelled' message, indicating the
488  * delivery of the final message.
489  *
490  * \param subscription Subscription to cancel.
491  * \return \c NULL for convenience
492  * \since 12
493  */
494 struct stasis_subscription *stasis_unsubscribe(
495         struct stasis_subscription *subscription);
496
497 /*!
498  * \brief Block until the last message is processed on a subscription.
499  *
500  * This function will not return until the \a subscription's callback for the
501  * stasis_subscription_final_message() completes. This allows cleanup routines
502  * to run before unblocking the joining thread.
503  *
504  * \param subscription Subscription to block on.
505  * \since 12
506  */
507 void stasis_subscription_join(struct stasis_subscription *subscription);
508
509 /*!
510  * \brief Returns whether \a subscription has received its final message.
511  *
512  * Note that a subscription is considered done even while the
513  * stasis_subscription_final_message() is being processed. This allows cleanup
514  * routines to check the status of the subscription.
515  *
516  * \param subscription Subscription.
517  * \return True (non-zero) if stasis_subscription_final_message() has been
518  *         received.
519  * \return False (zero) if waiting for the end.
520  */
521 int stasis_subscription_is_done(struct stasis_subscription *subscription);
522
523 /*!
524  * \brief Cancel a subscription, blocking until the last message is processed.
525  *
526  * While normally it's recommended to stasis_unsubscribe() and wait for
527  * stasis_subscription_final_message(), there are times (like during a module
528  * unload) where you have to wait for the final message (otherwise you'll call
529  * a function in a shared module that no longer exists).
530  *
531  * \param subscription Subscription to cancel.
532  * \return \c NULL for convenience
533  * \since 12
534  */
535 struct stasis_subscription *stasis_unsubscribe_and_join(
536         struct stasis_subscription *subscription);
537
538 struct stasis_forward;
539
540 /*!
541  * \brief Create a subscription which forwards all messages from one topic to
542  * another.
543  *
544  * Note that the \a topic parameter of the invoked callback will the be the
545  * \a topic the message was sent to, not the topic the subscriber subscribed to.
546  *
547  * \param from_topic Topic to forward.
548  * \param to_topic Destination topic of forwarded messages.
549  * \return New forwarding subscription.
550  * \return \c NULL on error.
551  * \since 12
552  */
553 struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic,
554         struct stasis_topic *to_topic);
555
556 struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward);
557
558 /*!
559  * \brief Get the unique ID for the subscription.
560  *
561  * \param sub Subscription for which to get the unique ID.
562  * \return Unique ID for the subscription.
563  * \since 12
564  */
565 const char *stasis_subscription_uniqueid(const struct stasis_subscription *sub);
566
567 /*!
568  * \brief Returns whether a subscription is currently subscribed.
569  *
570  * Note that there may still be messages queued up to be dispatched to this
571  * subscription, but the stasis_subscription_final_message() has been enqueued.
572  *
573  * \param sub Subscription to check
574  * \return False (zero) if subscription is not subscribed.
575  * \return True (non-zero) if still subscribed.
576  */
577 int stasis_subscription_is_subscribed(const struct stasis_subscription *sub);
578
579 /*!
580  * \brief Determine whether a message is the final message to be received on a subscription.
581  *
582  * \param sub Subscription on which the message was received.
583  * \param msg Message to check.
584  * \return zero if the provided message is not the final message.
585  * \return non-zero if the provided message is the final message.
586  * \since 12
587  */
588 int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg);
589
590 /*! \addtogroup StasisTopicsAndMessages
591  * @{
592  */
593
594 /*!
595  * \brief Holds details about changes to subscriptions for the specified topic
596  * \since 12
597  */
598 struct stasis_subscription_change {
599         AST_DECLARE_STRING_FIELDS(
600                 AST_STRING_FIELD(uniqueid);     /*!< The unique ID associated with this subscription */
601                 AST_STRING_FIELD(description);  /*!< The description of the change to the subscription associated with the uniqueid */
602         );
603         struct stasis_topic *topic;             /*!< The topic the subscription is/was subscribing to */
604 };
605
606 /*!
607  * \brief Gets the message type for subscription change notices
608  * \return The stasis_message_type for subscription change notices
609  * \since 12
610  */
611 struct stasis_message_type *stasis_subscription_change_type(void);
612
613 /*! @} */
614
615 /*! @{ */
616
617 /*!
618  * \brief Pool for topic aggregation
619  */
620 struct stasis_topic_pool;
621
622 /*!
623  * \brief Create a topic pool that routes messages from dynamically generated topics to the given topic
624  * \param pooled_topic Topic to which messages will be routed
625  * \return the new stasis_topic_pool
626  * \return \c NULL on failure
627  */
628 struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic);
629
630 /*!
631  * \brief Find or create a topic in the pool
632  * \param pool Pool for which to get the topic
633  * \param topic_name Name of the topic to get
634  * \return The already stored or newly allocated topic
635  * \return \c NULL if the topic was not found and could not be allocated
636  */
637 struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name);
638
639 /*! @} */
640
641 /*! \addtogroup StasisTopicsAndMessages
642  * @{
643  */
644
645 /*!
646  * \brief Message type for cache update messages.
647  * \return Message type for cache update messages.
648  * \since 12
649  */
650 struct stasis_message_type *stasis_cache_update_type(void);
651
652 /*!
653  * \brief Cache update message
654  * \since 12
655  */
656 struct stasis_cache_update {
657         /*! \brief Convenience reference to snapshot type */
658         struct stasis_message_type *type;
659         /*! \brief Old value from the cache */
660         struct stasis_message *old_snapshot;
661         /*! \brief New value */
662         struct stasis_message *new_snapshot;
663 };
664
665 /*!
666  * \brief Message type for clearing a message from a stasis cache.
667  * \since 12
668  */
669 struct stasis_message_type *stasis_cache_clear_type(void);
670
671 /*! @} */
672
673 /*! @{ */
674
675 /*!
676  * \brief A message cache, for use with \ref stasis_caching_topic.
677  * \since 12
678  */
679 struct stasis_cache;
680
681 /*! Cache entry used for calculating the aggregate snapshot. */
682 struct stasis_cache_entry;
683
684 /*!
685  * \brief A topic wrapper, which caches certain messages.
686  * \since 12
687  */
688 struct stasis_caching_topic;
689
690
691 /*!
692  * \brief Callback extract a unique identity from a snapshot message.
693  *
694  * This identity is unique to the underlying object of the snapshot, such as the
695  * UniqueId field of a channel.
696  *
697  * \param message Message to extract id from.
698  * \return String representing the snapshot's id.
699  * \return \c NULL if the message_type of the message isn't a handled snapshot.
700  * \since 12
701  */
702 typedef const char *(*snapshot_get_id)(struct stasis_message *message);
703
704 /*!
705  * \brief Callback to calculate the aggregate cache entry.
706  * \since 12.2.0
707  *
708  * \param entry Cache entry to calculate a new aggregate snapshot.
709  * \param new_snapshot The shapshot that is being updated.
710  *
711  * \note Return a ref bumped pointer from stasis_cache_entry_get_aggregate()
712  * if a new aggregate could not be calculated because of error.
713  *
714  * \note An aggregate message is a combined representation of the local
715  * and remote entities publishing the message data.  e.g., An aggregate
716  * device state represents the combined device state from the local and
717  * any remote entities publishing state for a device.  e.g., An aggregate
718  * MWI message is the old/new MWI counts accumulated from the local and
719  * any remote entities publishing to a mailbox.
720  *
721  * \return New aggregate-snapshot calculated on success.
722  * Caller has a reference on return.
723  */
724 typedef struct stasis_message *(*cache_aggregate_calc_fn)(struct stasis_cache_entry *entry, struct stasis_message *new_snapshot);
725
726 /*!
727  * \brief Callback to publish the aggregate cache entry message.
728  * \since 12.2.0
729  *
730  * \details
731  * Once an aggregate message is calculated.  This callback publishes the
732  * message so subscribers will know the new value of an aggregated state.
733  *
734  * \param topic The aggregate message may be published to this topic.
735  *        It is the topic to which the cache itself is subscribed.
736  * \param aggregate The aggregate shapshot message to publish.
737  *
738  * \note It is up to the function to determine if there is a better topic
739  * the aggregate message should be published over.
740  *
741  * \note An aggregate message is a combined representation of the local
742  * and remote entities publishing the message data.  e.g., An aggregate
743  * device state represents the combined device state from the local and
744  * any remote entities publishing state for a device.  e.g., An aggregate
745  * MWI message is the old/new MWI counts accumulated from the local and
746  * any remote entities publishing to a mailbox.
747  *
748  * \return Nothing
749  */
750 typedef void (*cache_aggregate_publish_fn)(struct stasis_topic *topic, struct stasis_message *aggregate);
751
752 /*!
753  * \brief Get the aggregate cache entry snapshot.
754  * \since 12.2.0
755  *
756  * \param entry Cache entry to get the aggregate snapshot.
757  *
758  * \note A reference is not given to the returned pointer so don't unref it.
759  *
760  * \note An aggregate message is a combined representation of the local
761  * and remote entities publishing the message data.  e.g., An aggregate
762  * device state represents the combined device state from the local and
763  * any remote entities publishing state for a device.  e.g., An aggregate
764  * MWI message is the old/new MWI counts accumulated from the local and
765  * any remote entities publishing to a mailbox.
766  *
767  * \retval Aggregate-snapshot in cache.
768  * \retval NULL if not present.
769  */
770 struct stasis_message *stasis_cache_entry_get_aggregate(struct stasis_cache_entry *entry);
771
772 /*!
773  * \brief Get the local entity's cache entry snapshot.
774  * \since 12.2.0
775  *
776  * \param entry Cache entry to get the local entity's snapshot.
777  *
778  * \note A reference is not given to the returned pointer so don't unref it.
779  *
780  * \retval Internal-snapshot in cache.
781  * \retval NULL if not present.
782  */
783 struct stasis_message *stasis_cache_entry_get_local(struct stasis_cache_entry *entry);
784
785 /*!
786  * \brief Get a remote entity's cache entry snapshot by index.
787  * \since 12.2.0
788  *
789  * \param entry Cache entry to get a remote entity's snapshot.
790  * \param idx Which remote entity's snapshot to get.
791  *
792  * \note A reference is not given to the returned pointer so don't unref it.
793  *
794  * \retval Remote-entity-snapshot in cache.
795  * \retval NULL if not present.
796  */
797 struct stasis_message *stasis_cache_entry_get_remote(struct stasis_cache_entry *entry, int idx);
798
799 /*!
800  * \brief Create a cache.
801  *
802  * This is the backend store for a \ref stasis_caching_topic. The cache is
803  * thread safe, allowing concurrent reads and writes.
804  *
805  * The returned object is AO2 managed, so ao2_cleanup() when you're done.
806  *
807  * \param id_fn Callback to extract the id from a snapshot message.
808  *
809  * \retval New cache indexed by \a id_fn.
810  * \retval \c NULL on error
811  *
812  * \since 12
813  */
814 struct stasis_cache *stasis_cache_create(snapshot_get_id id_fn);
815
816 /*!
817  * \brief Create a cache.
818  *
819  * This is the backend store for a \ref stasis_caching_topic. The cache is
820  * thread safe, allowing concurrent reads and writes.
821  *
822  * The returned object is AO2 managed, so ao2_cleanup() when you're done.
823  *
824  * \param id_fn Callback to extract the id from a snapshot message.
825  * \param aggregate_calc_fn Callback to calculate the aggregate cache entry.
826  * \param aggregate_publish_fn Callback to publish the aggregate cache entry.
827  *
828  * \note An aggregate message is a combined representation of the local
829  * and remote entities publishing the message data.  e.g., An aggregate
830  * device state represents the combined device state from the local and
831  * any remote entities publishing state for a device.  e.g., An aggregate
832  * MWI message is the old/new MWI counts accumulated from the local and
833  * any remote entities publishing to a mailbox.
834  *
835  * \retval New cache indexed by \a id_fn.
836  * \retval \c NULL on error
837  *
838  * \since 12.2.0
839  */
840 struct stasis_cache *stasis_cache_create_full(snapshot_get_id id_fn, cache_aggregate_calc_fn aggregate_calc_fn, cache_aggregate_publish_fn aggregate_publish_fn);
841
842 /*!
843  * \brief Create a topic which monitors and caches messages from another topic.
844  *
845  * The idea is that some topics publish 'snapshots' of some other object's state
846  * that should be cached. When these snapshot messages are received, the cache
847  * is updated, and a stasis_cache_update() message is forwarded, which has both
848  * the original snapshot message and the new message.
849  *
850  * The returned object is AO2 managed, so ao2_cleanup() when done with it.
851  *
852  * \param original_topic Topic publishing snapshot messages.
853  * \param cache Backend cache in which to keep snapshots.
854  * \return New topic which changes snapshot messages to stasis_cache_update()
855  *         messages, and forwards all other messages from the original topic.
856  * \return \c NULL on error
857  * \since 12
858  */
859 struct stasis_caching_topic *stasis_caching_topic_create(
860         struct stasis_topic *original_topic, struct stasis_cache *cache);
861
862 /*!
863  * \brief Unsubscribes a caching topic from its upstream topic.
864  *
865  * This function returns immediately, so be sure to cleanup when
866  * stasis_subscription_final_message() is received.
867  *
868  * \param caching_topic Caching topic to unsubscribe
869  * \return \c NULL for convenience
870  * \since 12
871  */
872 struct stasis_caching_topic *stasis_caching_unsubscribe(
873         struct stasis_caching_topic *caching_topic);
874
875 /*!
876  * \brief Unsubscribes a caching topic from its upstream topic, blocking until
877  * all messages have been forwarded.
878  *
879  * See stasis_unsubscriben_and_join() for more info on when to use this as
880  * opposed to stasis_caching_unsubscribe().
881  *
882  * \param caching_topic Caching topic to unsubscribe
883  * \return \c NULL for convenience
884  * \since 12
885  */
886 struct stasis_caching_topic *stasis_caching_unsubscribe_and_join(
887         struct stasis_caching_topic *caching_topic);
888
889 /*!
890  * \brief Returns the topic of cached events from a caching topics.
891  * \param caching_topic The caching topic.
892  * \return The topic that publishes cache update events, along with passthrough
893  *         events from the underlying topic.
894  * \return \c NULL if \a caching_topic is \c NULL.
895  * \since 12
896  */
897 struct stasis_topic *stasis_caching_get_topic(
898         struct stasis_caching_topic *caching_topic);
899
900 /*!
901  * \brief A message which instructs the caching topic to remove an entry from
902  * its cache.
903  *
904  * \param message Message representative of the cache entry that should be
905  *                cleared. This will become the data held in the
906  *                stasis_cache_clear message.
907  *
908  * \return Message which, when sent to a \ref stasis_caching_topic, will clear
909  *         the item from the cache.
910  * \return \c NULL on error.
911  * \since 12
912  */
913 struct stasis_message *stasis_cache_clear_create(struct stasis_message *message);
914
915 /*!
916  * \brief Retrieve an item from the cache for the ast_eid_default entity.
917  *
918  * The returned item is AO2 managed, so ao2_cleanup() when you're done with it.
919  *
920  * \param cache The cache to query.
921  * \param type Type of message to retrieve.
922  * \param id Identity of the snapshot to retrieve.
923  *
924  * \retval Message from the cache.
925  * \retval \c NULL if message is not found.
926  *
927  * \since 12
928  */
929 struct stasis_message *stasis_cache_get(struct stasis_cache *cache, struct stasis_message_type *type, const char *id);
930
931 /*!
932  * \brief Retrieve an item from the cache for a specific entity.
933  *
934  * The returned item is AO2 managed, so ao2_cleanup() when you're done with it.
935  *
936  * \param cache The cache to query.
937  * \param type Type of message to retrieve.
938  * \param id Identity of the snapshot to retrieve.
939  * \param eid Specific entity id to retrieve.  NULL for aggregate.
940  *
941  * \note An aggregate message is a combined representation of the local
942  * and remote entities publishing the message data.  e.g., An aggregate
943  * device state represents the combined device state from the local and
944  * any remote entities publishing state for a device.  e.g., An aggregate
945  * MWI message is the old/new MWI counts accumulated from the local and
946  * any remote entities publishing to a mailbox.
947  *
948  * \retval Message from the cache.
949  * \retval \c NULL if message is not found.
950  *
951  * \since 12.2.0
952  */
953 struct stasis_message *stasis_cache_get_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const char *id, const struct ast_eid *eid);
954
955 /*!
956  * \brief Retrieve all matching entity items from the cache.
957  * \since 12.2.0
958  *
959  * \param cache The cache to query.
960  * \param type Type of message to retrieve.
961  * \param id Identity of the snapshot to retrieve.
962  *
963  * \retval Container of matching items found.
964  * \retval \c NULL if error.
965  */
966 struct ao2_container *stasis_cache_get_all(struct stasis_cache *cache, struct stasis_message_type *type, const char *id);
967
968 /*!
969  * \brief Dump cached items to a subscription for the ast_eid_default entity.
970  *
971  * \param cache The cache to query.
972  * \param type Type of message to dump (any type if \c NULL).
973  *
974  * \retval ao2_container containing all matches (must be unreffed by caller)
975  * \retval \c NULL on allocation error
976  *
977  * \since 12
978  */
979 struct ao2_container *stasis_cache_dump(struct stasis_cache *cache, struct stasis_message_type *type);
980
981 /*!
982  * \brief Dump cached items to a subscription for a specific entity.
983  * \since 12.2.0
984  *
985  * \param cache The cache to query.
986  * \param type Type of message to dump (any type if \c NULL).
987  * \param eid Specific entity id to retrieve.  NULL for aggregate.
988  *
989  * \retval ao2_container containing all matches (must be unreffed by caller)
990  * \retval \c NULL on allocation error
991  */
992 struct ao2_container *stasis_cache_dump_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const struct ast_eid *eid);
993
994 /*!
995  * \brief Dump all entity items from the cache to a subscription.
996  * \since 12.2.0
997  *
998  * \param cache The cache to query.
999  * \param type Type of message to dump (any type if \c NULL).
1000  *
1001  * \retval ao2_container containing all matches (must be unreffed by caller)
1002  * \retval \c NULL on allocation error
1003  */
1004 struct ao2_container *stasis_cache_dump_all(struct stasis_cache *cache, struct stasis_message_type *type);
1005
1006 /*! @} */
1007
1008 /*! @{ */
1009
1010 /*!
1011  * \internal
1012  * \brief Log a message about invalid attempt to access a type.
1013  */
1014 void stasis_log_bad_type_access(const char *name);
1015
1016 /*!
1017  * \brief Boiler-plate messaging macro for defining public message types.
1018  *
1019  * \code
1020  *      STASIS_MESSAGE_TYPE_DEFN(ast_foo_type,
1021  *              .to_ami = foo_to_ami,
1022  *              .to_json = foo_to_json,
1023  *              );
1024  * \endcode
1025  *
1026  * \param name Name of message type.
1027  * \param ... Virtual table methods for messages of this type.
1028  * \since 12
1029  */
1030 #define STASIS_MESSAGE_TYPE_DEFN(name, ...)                             \
1031         static struct stasis_message_vtable _priv_ ## name ## _v = {    \
1032                 __VA_ARGS__                                             \
1033         };                                                              \
1034         static struct stasis_message_type *_priv_ ## name;              \
1035         struct stasis_message_type *name(void) {                        \
1036                 if (_priv_ ## name == NULL) {                           \
1037                         stasis_log_bad_type_access(#name);              \
1038                 }                                                       \
1039                 return _priv_ ## name;                                  \
1040         }
1041
1042 /*!
1043  * \brief Boiler-plate messaging macro for defining local message types.
1044  *
1045  * \code
1046  *      STASIS_MESSAGE_TYPE_DEFN_LOCAL(ast_foo_type,
1047  *              .to_ami = foo_to_ami,
1048  *              .to_json = foo_to_json,
1049  *              );
1050  * \endcode
1051  *
1052  * \param name Name of message type.
1053  * \param ... Virtual table methods for messages of this type.
1054  * \since 12
1055  */
1056 #define STASIS_MESSAGE_TYPE_DEFN_LOCAL(name, ...)                       \
1057         static struct stasis_message_vtable _priv_ ## name ## _v = {    \
1058                 __VA_ARGS__                                             \
1059         };                                                              \
1060         static struct stasis_message_type *_priv_ ## name;              \
1061         static struct stasis_message_type *name(void) {                 \
1062                 if (_priv_ ## name == NULL) {                           \
1063                         stasis_log_bad_type_access(#name);              \
1064                 }                                                       \
1065                 return _priv_ ## name;                                  \
1066         }
1067
1068 /*!
1069 * \brief Boiler-plate messaging macro for initializing message types.
1070  *
1071  * \code
1072  *      if (STASIS_MESSAGE_TYPE_INIT(ast_foo_type) != 0) {
1073  *              return -1;
1074  *      }
1075  * \endcode
1076  *
1077  * \param name Name of message type.
1078  * \return 0 if initialization is successful.
1079  * \return Non-zero on failure.
1080  * \since 12
1081  */
1082 #define STASIS_MESSAGE_TYPE_INIT(name)                                  \
1083         ({                                                              \
1084                 ast_assert(_priv_ ## name == NULL);                     \
1085                 _priv_ ## name = stasis_message_type_create(#name,      \
1086                         &_priv_ ## name ## _v);                         \
1087                 _priv_ ## name ? 0 : -1;                                \
1088         })
1089
1090 /*!
1091  * \brief Boiler-plate messaging macro for cleaning up message types.
1092  *
1093  * Note that if your type is defined in core instead of a loadable module, you
1094  * should call message type cleanup from an ast_register_cleanup() handler
1095  * instead of an ast_register_atexit() handler.
1096  *
1097  * The reason is that during an immediate shutdown, loadable modules (which may
1098  * refer to core message types) are not unloaded. While the atexit handlers are
1099  * run, there's a window of time where a module subscription might reference a
1100  * core message type after it's been cleaned up. Which is bad.
1101  *
1102  * \param name Name of message type.
1103  * \since 12
1104  */
1105 #define STASIS_MESSAGE_TYPE_CLEANUP(name)       \
1106         ({                                      \
1107                 ao2_cleanup(_priv_ ## name);    \
1108                 _priv_ ## name = NULL;          \
1109         })
1110
1111 /*! @} */
1112
1113 /*! @{ */
1114
1115 /*!
1116  * \brief Initialize the Stasis subsystem.
1117  * \return 0 on success.
1118  * \return Non-zero on error.
1119  * \since 12
1120  */
1121 int stasis_init(void);
1122
1123 /*! @} */
1124
1125 /*! @{ */
1126
1127 /*!
1128  * \internal
1129  * \brief called by stasis_init() for cache initialization.
1130  * \return 0 on success.
1131  * \return Non-zero on error.
1132  * \since 12
1133  */
1134 int stasis_cache_init(void);
1135
1136 /*!
1137  * \internal
1138  * \brief called by stasis_init() for config initialization.
1139  * \return 0 on success.
1140  * \return Non-zero on error.
1141  * \since 12
1142  */
1143 int stasis_config_init(void);
1144
1145 /*! @} */
1146
1147 /*!
1148  * \defgroup StasisTopicsAndMessages Stasis topics, and their messages.
1149  *
1150  * This group contains the topics, messages and corresponding message types
1151  * found within Asterisk.
1152  */
1153
1154 #endif /* _ASTERISK_STASIS_H */