Stasis: Allow message types to be blocked
[asterisk/asterisk.git] / include / asterisk / stasis.h
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 #ifndef _ASTERISK_STASIS_H
20 #define _ASTERISK_STASIS_H
21
22 /*! \file
23  *
24  * \brief Stasis Message Bus API. See \ref stasis "Stasis Message Bus API" for
25  * detailed documentation.
26  *
27  * \author David M. Lee, II <dlee@digium.com>
28  * \since 12
29  *
30  * \page stasis Stasis Message Bus API
31  *
32  * \par Intro
33  *
34  * The Stasis Message Bus is a loosely typed mechanism for distributing messages
35  * within Asterisk. It is designed to be:
36  *  - Loosely coupled; new message types can be added in seperate modules.
37  *  - Easy to use; publishing and subscribing are straightforward operations.
38  *
39  * There are three main concepts for using the Stasis Message Bus:
40  *  - \ref stasis_message
41  *  - \ref stasis_topic
42  *  - \ref stasis_subscription
43  *
44  * \par stasis_message
45  *
46  * Central to the Stasis Message Bus is the \ref stasis_message, the messages
47  * that are sent on the bus. These messages have:
48  *  - a type (as defined by a \ref stasis_message_type)
49  *  - a value - a \c void pointer to an AO2 object
50  *  - a timestamp when it was created
51  *
52  * Once a \ref stasis_message has been created, it is immutable and cannot
53  * change. The same goes for the value of the message (although this cannot be
54  * enforced in code). Messages themselves are reference-counted, AO2 objects,
55  * along with their values. By being both reference counted and immutable,
56  * messages can be shared throughout the system without any concerns for
57  * threading.
58  *
59  * The type of a message is defined by an instance of \ref stasis_message_type,
60  * which can be created by calling stasis_message_type_create(). Message types
61  * are named, which is useful in debugging. It is recommended that the string
62  * name for a message type match the name of the struct that's stored in the
63  * message. For example, name for \ref stasis_cache_update's message type is \c
64  * "stasis_cache_update".
65  *
66  * \par stasis_topic
67  *
68  * A \ref stasis_topic is an object to which \ref stasis_subscriber's may be
69  * subscribed, and \ref stasis_message's may be published. Any message published
70  * to the topic is dispatched to all of its subscribers. The topic itself may be
71  * named, which is useful in debugging.
72  *
73  * Topics themselves are reference counted objects. Since topics are referred to
74  * by their subscibers, they will not be freed until all of their subscribers
75  * have unsubscribed. Topics are also thread safe, so no worries about
76  * publishing/subscribing/unsubscribing to a topic concurrently from multiple
77  * threads. It's also designed to handle the case of unsubscribing from a topic
78  * from within the subscription handler.
79  *
80  * \par Forwarding
81  *
82  * There is one special case of topics that's worth noting: forwarding
83  * messages. It's a fairly common use case to want to forward all the messages
84  * published on one topic to another one (for example, an aggregator topic that
85  * publishes all the events from a set of other topics). This can be
86  * accomplished easily using stasis_forward_all(). This sets up the forwarding
87  * between the two topics, and returns a \ref stasis_subscription, which can be
88  * unsubscribed to stop the forwarding.
89  *
90  * \par Caching
91  *
92  * Another common use case is to want to cache certain messages that are
93  * published on the bus. Usually these events are snapshots of the current state
94  * in the system, and it's desirable to query that state from the cache without
95  * locking the original object. It's also desirable for subscribers of the
96  * caching topic to receive messages that have both the old cache value and the
97  * new value being put into the cache. For this, we have stasis_cache_create()
98  * and stasis_caching_topic_create(), providing them with the topic which
99  * publishes the messages that you wish to cache, and a function that can
100  * identify cacheable messages.
101  *
102  * The \ref stasis_cache is designed so that it may be shared amongst several
103  * \ref stasis_caching_topic objects. This allows you to have individual caching
104  * topics per-object (i.e. so you can subscribe to updates for a single object),
105  * and still have a single cache to query for the state of all objects. While a
106  * cache may be shared amongst different message types, such a usage is probably
107  * not a good idea.
108  *
109  * The \ref stasis_cache can only be written to by \ref stasis_caching_topics.
110  * It's a thread safe container, so freely use the stasis_cache_get() and
111  * stasis_cache_dump() to query the cache.
112  *
113  * The \ref stasis_caching_topic discards non-cacheable messages. A cacheable
114  * message is wrapped in a \ref stasis_cache_update message which provides the
115  * old snapshot (or \c NULL if this is a new cache entry), and the new snapshot
116  * (or \c NULL if the entry was removed from the cache). A
117  * stasis_cache_clear_create() message must be sent to the topic in order to
118  * remove entries from the cache.
119  *
120  * In order to unsubscribe a \ref stasis_caching_topic from the upstream topic,
121  * call stasis_caching_unsubscribe(). Due to cyclic references, the \ref
122  * stasis_caching_topic will not be freed until after it has been unsubscribed,
123  * and all other ao2_ref()'s have been cleaned up.
124  *
125  * The \ref stasis_cache object is a normal AO2 managed object, which can be
126  * release with ao2_cleanup().
127  *
128  * \par stasis_subscriber
129  *
130  * Any topic may be subscribed to by simply providing stasis_subscribe() the
131  * \ref stasis_topic to subscribe to, a handler function and \c void pointer to
132  * data that is passed back to the handler. Invocations on the subscription's
133  * handler are serialized, but different invocations may occur on different
134  * threads (this usually isn't important unless you use thread locals or
135  * something similar).
136  *
137  * In order to stop receiving messages, call stasis_unsubscribe() with your \ref
138  * stasis_subscription. Due to cyclic references, the \ref
139  * stasis_subscription will not be freed until after it has been unsubscribed,
140  * and all other ao2_ref()'s have been cleaned up.
141  *
142  * \par Shutdown
143  *
144  * Subscriptions have two options for unsubscribing, depending upon the context
145  * in which you need to unsubscribe.
146  *
147  * If your subscription is owned by a module, and you must unsubscribe from the
148  * module_unload() function, then you'll want to use the
149  * stasis_unsubscribe_and_join() function. This will block until the final
150  * message has been received on the subscription. Otherwise, there's the danger
151  * of invoking the callback function after it has been unloaded.
152  *
153  * If your subscription is owned by an object, then your object should have an
154  * explicit shutdown() function, which calls stasis_unsubscribe(). In your
155  * subscription handler, when the stasis_subscription_final_message() has been
156  * received, decrement the refcount on your object. In your object's destructor,
157  * you may assert that stasis_subscription_is_done() to validate that the
158  * subscription's callback will no longer be invoked.
159  *
160  * \b Note: You may be tempted to simply call stasis_unsubscribe_and_join() from
161  * an object's destructor. While code that does this may work most of the time,
162  * it's got one big downside. There's a general assumption that object
163  * destruction is non-blocking. If you block the destruction waiting for the
164  * subscription to complete, there's the danger that the subscription may
165  * process a message which will bump the refcount up by one. Then it does
166  * whatever it does, decrements the refcount, which then proceeds to re-destroy
167  * the object. Now you've got hard to reproduce bugs that only show up under
168  * certain loads.
169  */
170
171 #include "asterisk/json.h"
172 #include "asterisk/manager.h"
173 #include "asterisk/utils.h"
174 #include "asterisk/event.h"
175
176 /*! @{ */
177
178 /*!
179  * \brief Metadata about a \ref stasis_message.
180  * \since 12
181  */
182 struct stasis_message_type;
183
184 /*!
185  * \brief Opaque type for a Stasis message.
186  * \since 12
187  */
188 struct stasis_message;
189
190 /*!
191  * \brief Opaque type for a Stasis subscription.
192  * \since 12
193  */
194 struct stasis_subscription;
195
196 /*!
197  * \brief Structure containing callbacks for Stasis message sanitization
198  *
199  * \note If either callback is implemented, both should be implemented since
200  * not all callers may have access to the full snapshot.
201  */
202 struct stasis_message_sanitizer {
203         /*!
204          * \brief Callback which determines whether a channel should be sanitized from
205          * a message based on the channel's unique ID
206          *
207          * \param channel_id The unique ID of the channel
208          *
209          * \retval non-zero if the channel should be left out of the message
210          * \retval zero if the channel should remain in the message
211          */
212         int (*channel_id)(const char *channel_id);
213
214         /*!
215          * \brief Callback which determines whether a channel should be sanitized from
216          * a message based on the channel's snapshot
217          *
218          * \param snapshot A snapshot generated from the channel
219          *
220          * \retval non-zero if the channel should be left out of the message
221          * \retval zero if the channel should remain in the message
222          */
223         int (*channel_snapshot)(const struct ast_channel_snapshot *snapshot);
224 };
225
226 /*!
227  * \brief Virtual table providing methods for messages.
228  * \since 12
229  */
230 struct stasis_message_vtable {
231         /*!
232          * \brief Build the JSON representation of the message.
233          *
234          * May be \c NULL, or may return \c NULL, to indicate no representation.
235          * The returned object should be ast_json_unref()'ed.
236          *
237          * \param message Message to convert to JSON string.
238          * \param sanitize Snapshot sanitization callback.
239          *
240          * \return Newly allocated JSON message.
241          * \return \c NULL on error.
242          * \return \c NULL if JSON format is not supported.
243          */
244         struct ast_json *(*to_json)(struct stasis_message *message, const struct stasis_message_sanitizer *sanitize);
245
246         /*!
247          * \brief Build the AMI representation of the message.
248          *
249          * May be \c NULL, or may return \c NULL, to indicate no representation.
250          * The returned object should be ao2_cleanup()'ed.
251          *
252          * \param message Message to convert to AMI string.
253          * \return Newly allocated \ref ast_manager_event_blob.
254          * \return \c NULL on error.
255          * \return \c NULL if AMI format is not supported.
256          */
257         struct ast_manager_event_blob *(*to_ami)(
258                 struct stasis_message *message);
259
260         /*!
261          * \since 12.3.0
262          * \brief Build the \ref ast_event representation of the message.
263          *
264          * May be \c NULL, or may return \c NULL, to indicate no representation.
265          * The returned object should be free'd.
266          *
267          * \param message Message to convert to an \ref ast_event.
268          * \return Newly allocated \ref ast_event.
269          * \return \c NULL on error.
270          * \return \c NULL if AMI format is not supported.
271          */
272         struct ast_event *(*to_event)(
273                 struct stasis_message *message);
274 };
275
276 /*!
277  * \brief Return code for Stasis message type creation attempts
278  */
279 enum stasis_message_type_result {
280         STASIS_MESSAGE_TYPE_ERROR = -1, /*!< Message type was not created due to allocation failure */
281         STASIS_MESSAGE_TYPE_SUCCESS,    /*!< Message type was created successfully */
282         STASIS_MESSAGE_TYPE_DECLINED,   /*!< Message type was not created due to configuration */
283 };
284
285 /*!
286  * \brief Create a new message type.
287  *
288  * \ref stasis_message_type is an AO2 object, so ao2_cleanup() when you're done
289  * with it.
290  *
291  * \param name Name of the new type.
292  * \param vtable Virtual table of message methods. May be \c NULL.
293  * \param[out] result The location where the new message type will be placed
294  *
295  * \note Stasis message type creation may be declined if the message type is disabled
296  *
297  * \returns A stasis_message_type_result enum
298  * \since 12
299  */
300 enum stasis_message_type_result stasis_message_type_create(const char *name,
301         struct stasis_message_vtable *vtable, struct stasis_message_type **result);
302
303 /*!
304  * \brief Gets the name of a given message type
305  * \param type The type to get.
306  * \return Name of the type.
307  * \return \c NULL if \a type is \c NULL.
308  * \since 12
309  */
310 const char *stasis_message_type_name(const struct stasis_message_type *type);
311
312 /*!
313  * \brief Check whether a message type is declined
314  *
315  * \param name The name of the message type to check
316  *
317  * \retval zero The message type is not declined
318  * \retval non-zero The message type is declined
319  */
320 int stasis_message_type_declined(const char *name);
321
322 /*!
323  * \brief Create a new message.
324  *
325  * This message is an \c ao2 object, and must be ao2_cleanup()'ed when you are done
326  * with it. Messages are also immutable, and must not be modified after they
327  * are initialized. Especially the \a data in the message.
328  *
329  * \param type Type of the message
330  * \param data Immutable data that is the actual contents of the message
331  *
332  * \return New message
333  * \return \c NULL on error
334  *
335  * \since 12
336  */
337 struct stasis_message *stasis_message_create(struct stasis_message_type *type, void *data);
338
339 /*!
340  * \brief Create a new message for an entity.
341  *
342  * This message is an \c ao2 object, and must be ao2_cleanup()'ed when you are done
343  * with it. Messages are also immutable, and must not be modified after they
344  * are initialized. Especially the \a data in the message.
345  *
346  * \param type Type of the message
347  * \param data Immutable data that is the actual contents of the message
348  * \param eid What entity originated this message. (NULL for aggregate)
349  *
350  * \note An aggregate message is a combined representation of the local
351  * and remote entities publishing the message data.  e.g., An aggregate
352  * device state represents the combined device state from the local and
353  * any remote entities publishing state for a device.  e.g., An aggregate
354  * MWI message is the old/new MWI counts accumulated from the local and
355  * any remote entities publishing to a mailbox.
356  *
357  * \retval New message
358  * \retval \c NULL on error
359  *
360  * \since 12.2.0
361  */
362 struct stasis_message *stasis_message_create_full(struct stasis_message_type *type, void *data, const struct ast_eid *eid);
363
364 /*!
365  * \brief Get the entity id for a \ref stasis_message.
366  * \since 12.2.0
367  *
368  * \param msg Message to get eid.
369  *
370  * \retval Entity id of \a msg
371  * \retval \c NULL if \a msg is an aggregate or \a msg is \c NULL.
372  */
373 const struct ast_eid *stasis_message_eid(const struct stasis_message *msg);
374
375 /*!
376  * \brief Get the message type for a \ref stasis_message.
377  * \param msg Message to type
378  * \return Type of \a msg
379  * \return \c NULL if \a msg is \c NULL.
380  * \since 12
381  */
382 struct stasis_message_type *stasis_message_type(const struct stasis_message *msg);
383
384 /*!
385  * \brief Get the data contained in a message.
386  * \param msg Message.
387  * \return Immutable data pointer
388  * \return \c NULL if msg is \c NULL.
389  * \since 12
390  */
391 void *stasis_message_data(const struct stasis_message *msg);
392
393 /*!
394  * \brief Get the time when a message was created.
395  * \param msg Message.
396  * \return Pointer to the \a timeval when the message was created.
397  * \return \c NULL if msg is \c NULL.
398  * \since 12
399  */
400 const struct timeval *stasis_message_timestamp(const struct stasis_message *msg);
401
402 /*!
403  * \brief Build the JSON representation of the message.
404  *
405  * May return \c NULL, to indicate no representation. The returned object should
406  * be ast_json_unref()'ed.
407  *
408  * \param message Message to convert to JSON string.
409  * \param sanitize Snapshot sanitization callback.
410  *
411  * \return Newly allocated string with JSON message.
412  * \return \c NULL on error.
413  * \return \c NULL if JSON format is not supported.
414  */
415 struct ast_json *stasis_message_to_json(struct stasis_message *message, struct stasis_message_sanitizer *sanitize);
416
417 /*!
418  * \brief Build the AMI representation of the message.
419  *
420  * May return \c NULL, to indicate no representation. The returned object should
421  * be ao2_cleanup()'ed.
422  *
423  * \param message Message to convert to AMI.
424  * \return \c NULL on error.
425  * \return \c NULL if AMI format is not supported.
426  */
427 struct ast_manager_event_blob *stasis_message_to_ami(
428         struct stasis_message *message);
429
430 /*!
431  * \brief Build the \ref AstGenericEvents representation of the message.
432  *
433  * May return \c NULL, to indicate no representation. The returned object should
434  * be disposed of via \ref ast_event_destroy.
435  *
436  * \param message Message to convert to AMI.
437  * \return \c NULL on error.
438  * \return \c NULL if AMI format is not supported.
439  */
440 struct ast_event *stasis_message_to_event(
441         struct stasis_message *message);
442
443 /*! @} */
444
445 /*! @{ */
446
447 /*!
448  * \brief A topic to which messages may be posted, and subscribers, well, subscribe
449  * \since 12
450  */
451 struct stasis_topic;
452
453 /*!
454  * \brief Create a new topic.
455  * \param name Name of the new topic.
456  * \return New topic instance.
457  * \return \c NULL on error.
458  * \since 12
459  */
460 struct stasis_topic *stasis_topic_create(const char *name);
461
462 /*!
463  * \brief Return the name of a topic.
464  * \param topic Topic.
465  * \return Name of the topic.
466  * \return \c NULL if topic is \c NULL.
467  * \since 12
468  */
469 const char *stasis_topic_name(const struct stasis_topic *topic);
470
471 /*!
472  * \brief Publish a message to a topic's subscribers.
473  * \param topic Topic.
474  * \param message Message to publish.
475  *
476  * This call is asynchronous and will return immediately upon queueing
477  * the message for delivery to the topic's subscribers.
478  *
479  * \since 12
480  */
481 void stasis_publish(struct stasis_topic *topic, struct stasis_message *message);
482
483 /*!
484  * \brief Publish a message to a topic's subscribers, synchronizing
485  * on the specified subscriber
486  * \param sub Subscription to synchronize on.
487  * \param message Message to publish.
488  *
489  * The caller of stasis_publish_sync will block until the specified
490  * subscriber completes handling of the message.
491  *
492  * All other subscribers to the topic the \ref stasis_subpscription
493  * is subscribed to are also delivered the message; this delivery however
494  * happens asynchronously.
495  *
496  * \since 12.1.0
497  */
498 void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message);
499
500 /*! @} */
501
502 /*! @{ */
503
504 /*!
505  * \brief Callback function type for Stasis subscriptions.
506  * \param data Data field provided with subscription.
507  * \param message Published message.
508  * \since 12
509  */
510 typedef void (*stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_message *message);
511
512 /*!
513  * \brief Create a subscription.
514  *
515  * In addition to being AO2 managed memory (requiring an ao2_cleanup() to free
516  * up this reference), the subscription must be explicitly unsubscribed from its
517  * topic using stasis_unsubscribe().
518  *
519  * The invocations of the callback are serialized, but may not always occur on
520  * the same thread. The invocation order of different subscriptions is
521  * unspecified.
522  *
523  * \param topic Topic to subscribe to.
524  * \param callback Callback function for subscription messages.
525  * \param data Data to be passed to the callback, in addition to the message.
526  * \return New \ref stasis_subscription object.
527  * \return \c NULL on error.
528  * \since 12
529  */
530 struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic,
531         stasis_subscription_cb callback, void *data);
532
533 /*!
534  * \brief Cancel a subscription.
535  *
536  * Note that in an asynchronous system, there may still be messages queued or
537  * in transit to the subscription's callback. These will still be delivered.
538  * There will be a final 'SubscriptionCancelled' message, indicating the
539  * delivery of the final message.
540  *
541  * \param subscription Subscription to cancel.
542  * \return \c NULL for convenience
543  * \since 12
544  */
545 struct stasis_subscription *stasis_unsubscribe(
546         struct stasis_subscription *subscription);
547
548 /*!
549  * \brief Block until the last message is processed on a subscription.
550  *
551  * This function will not return until the \a subscription's callback for the
552  * stasis_subscription_final_message() completes. This allows cleanup routines
553  * to run before unblocking the joining thread.
554  *
555  * \param subscription Subscription to block on.
556  * \since 12
557  */
558 void stasis_subscription_join(struct stasis_subscription *subscription);
559
560 /*!
561  * \brief Returns whether \a subscription has received its final message.
562  *
563  * Note that a subscription is considered done even while the
564  * stasis_subscription_final_message() is being processed. This allows cleanup
565  * routines to check the status of the subscription.
566  *
567  * \param subscription Subscription.
568  * \return True (non-zero) if stasis_subscription_final_message() has been
569  *         received.
570  * \return False (zero) if waiting for the end.
571  */
572 int stasis_subscription_is_done(struct stasis_subscription *subscription);
573
574 /*!
575  * \brief Cancel a subscription, blocking until the last message is processed.
576  *
577  * While normally it's recommended to stasis_unsubscribe() and wait for
578  * stasis_subscription_final_message(), there are times (like during a module
579  * unload) where you have to wait for the final message (otherwise you'll call
580  * a function in a shared module that no longer exists).
581  *
582  * \param subscription Subscription to cancel.
583  * \return \c NULL for convenience
584  * \since 12
585  */
586 struct stasis_subscription *stasis_unsubscribe_and_join(
587         struct stasis_subscription *subscription);
588
589 struct stasis_forward;
590
591 /*!
592  * \brief Create a subscription which forwards all messages from one topic to
593  * another.
594  *
595  * Note that the \a topic parameter of the invoked callback will the be the
596  * \a topic the message was sent to, not the topic the subscriber subscribed to.
597  *
598  * \param from_topic Topic to forward.
599  * \param to_topic Destination topic of forwarded messages.
600  * \return New forwarding subscription.
601  * \return \c NULL on error.
602  * \since 12
603  */
604 struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic,
605         struct stasis_topic *to_topic);
606
607 struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward);
608
609 /*!
610  * \brief Get the unique ID for the subscription.
611  *
612  * \param sub Subscription for which to get the unique ID.
613  * \return Unique ID for the subscription.
614  * \since 12
615  */
616 const char *stasis_subscription_uniqueid(const struct stasis_subscription *sub);
617
618 /*!
619  * \brief Returns whether a subscription is currently subscribed.
620  *
621  * Note that there may still be messages queued up to be dispatched to this
622  * subscription, but the stasis_subscription_final_message() has been enqueued.
623  *
624  * \param sub Subscription to check
625  * \return False (zero) if subscription is not subscribed.
626  * \return True (non-zero) if still subscribed.
627  */
628 int stasis_subscription_is_subscribed(const struct stasis_subscription *sub);
629
630 /*!
631  * \brief Determine whether a message is the final message to be received on a subscription.
632  *
633  * \param sub Subscription on which the message was received.
634  * \param msg Message to check.
635  * \return zero if the provided message is not the final message.
636  * \return non-zero if the provided message is the final message.
637  * \since 12
638  */
639 int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg);
640
641 /*! \addtogroup StasisTopicsAndMessages
642  * @{
643  */
644
645 /*!
646  * \brief Holds details about changes to subscriptions for the specified topic
647  * \since 12
648  */
649 struct stasis_subscription_change {
650         AST_DECLARE_STRING_FIELDS(
651                 AST_STRING_FIELD(uniqueid);     /*!< The unique ID associated with this subscription */
652                 AST_STRING_FIELD(description);  /*!< The description of the change to the subscription associated with the uniqueid */
653         );
654         struct stasis_topic *topic;             /*!< The topic the subscription is/was subscribing to */
655 };
656
657 /*!
658  * \brief Gets the message type for subscription change notices
659  * \return The stasis_message_type for subscription change notices
660  * \since 12
661  */
662 struct stasis_message_type *stasis_subscription_change_type(void);
663
664 /*! @} */
665
666 /*! @{ */
667
668 /*!
669  * \brief Pool for topic aggregation
670  */
671 struct stasis_topic_pool;
672
673 /*!
674  * \brief Create a topic pool that routes messages from dynamically generated topics to the given topic
675  * \param pooled_topic Topic to which messages will be routed
676  * \return the new stasis_topic_pool
677  * \return \c NULL on failure
678  */
679 struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic);
680
681 /*!
682  * \brief Find or create a topic in the pool
683  * \param pool Pool for which to get the topic
684  * \param topic_name Name of the topic to get
685  * \return The already stored or newly allocated topic
686  * \return \c NULL if the topic was not found and could not be allocated
687  */
688 struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name);
689
690 /*! @} */
691
692 /*! \addtogroup StasisTopicsAndMessages
693  * @{
694  */
695
696 /*!
697  * \brief Message type for cache update messages.
698  * \return Message type for cache update messages.
699  * \since 12
700  */
701 struct stasis_message_type *stasis_cache_update_type(void);
702
703 /*!
704  * \brief Cache update message
705  * \since 12
706  */
707 struct stasis_cache_update {
708         /*! \brief Convenience reference to snapshot type */
709         struct stasis_message_type *type;
710         /*! \brief Old value from the cache */
711         struct stasis_message *old_snapshot;
712         /*! \brief New value */
713         struct stasis_message *new_snapshot;
714 };
715
716 /*!
717  * \brief Message type for clearing a message from a stasis cache.
718  * \since 12
719  */
720 struct stasis_message_type *stasis_cache_clear_type(void);
721
722 /*! @} */
723
724 /*! @{ */
725
726 /*!
727  * \brief A message cache, for use with \ref stasis_caching_topic.
728  * \since 12
729  */
730 struct stasis_cache;
731
732 /*! Cache entry used for calculating the aggregate snapshot. */
733 struct stasis_cache_entry;
734
735 /*!
736  * \brief A topic wrapper, which caches certain messages.
737  * \since 12
738  */
739 struct stasis_caching_topic;
740
741
742 /*!
743  * \brief Callback extract a unique identity from a snapshot message.
744  *
745  * This identity is unique to the underlying object of the snapshot, such as the
746  * UniqueId field of a channel.
747  *
748  * \param message Message to extract id from.
749  * \return String representing the snapshot's id.
750  * \return \c NULL if the message_type of the message isn't a handled snapshot.
751  * \since 12
752  */
753 typedef const char *(*snapshot_get_id)(struct stasis_message *message);
754
755 /*!
756  * \brief Callback to calculate the aggregate cache entry.
757  * \since 12.2.0
758  *
759  * \param entry Cache entry to calculate a new aggregate snapshot.
760  * \param new_snapshot The shapshot that is being updated.
761  *
762  * \note Return a ref bumped pointer from stasis_cache_entry_get_aggregate()
763  * if a new aggregate could not be calculated because of error.
764  *
765  * \note An aggregate message is a combined representation of the local
766  * and remote entities publishing the message data.  e.g., An aggregate
767  * device state represents the combined device state from the local and
768  * any remote entities publishing state for a device.  e.g., An aggregate
769  * MWI message is the old/new MWI counts accumulated from the local and
770  * any remote entities publishing to a mailbox.
771  *
772  * \return New aggregate-snapshot calculated on success.
773  * Caller has a reference on return.
774  */
775 typedef struct stasis_message *(*cache_aggregate_calc_fn)(struct stasis_cache_entry *entry, struct stasis_message *new_snapshot);
776
777 /*!
778  * \brief Callback to publish the aggregate cache entry message.
779  * \since 12.2.0
780  *
781  * \details
782  * Once an aggregate message is calculated.  This callback publishes the
783  * message so subscribers will know the new value of an aggregated state.
784  *
785  * \param topic The aggregate message may be published to this topic.
786  *        It is the topic to which the cache itself is subscribed.
787  * \param aggregate The aggregate shapshot message to publish.
788  *
789  * \note It is up to the function to determine if there is a better topic
790  * the aggregate message should be published over.
791  *
792  * \note An aggregate message is a combined representation of the local
793  * and remote entities publishing the message data.  e.g., An aggregate
794  * device state represents the combined device state from the local and
795  * any remote entities publishing state for a device.  e.g., An aggregate
796  * MWI message is the old/new MWI counts accumulated from the local and
797  * any remote entities publishing to a mailbox.
798  *
799  * \return Nothing
800  */
801 typedef void (*cache_aggregate_publish_fn)(struct stasis_topic *topic, struct stasis_message *aggregate);
802
803 /*!
804  * \brief Get the aggregate cache entry snapshot.
805  * \since 12.2.0
806  *
807  * \param entry Cache entry to get the aggregate snapshot.
808  *
809  * \note A reference is not given to the returned pointer so don't unref it.
810  *
811  * \note An aggregate message is a combined representation of the local
812  * and remote entities publishing the message data.  e.g., An aggregate
813  * device state represents the combined device state from the local and
814  * any remote entities publishing state for a device.  e.g., An aggregate
815  * MWI message is the old/new MWI counts accumulated from the local and
816  * any remote entities publishing to a mailbox.
817  *
818  * \retval Aggregate-snapshot in cache.
819  * \retval NULL if not present.
820  */
821 struct stasis_message *stasis_cache_entry_get_aggregate(struct stasis_cache_entry *entry);
822
823 /*!
824  * \brief Get the local entity's cache entry snapshot.
825  * \since 12.2.0
826  *
827  * \param entry Cache entry to get the local entity's snapshot.
828  *
829  * \note A reference is not given to the returned pointer so don't unref it.
830  *
831  * \retval Internal-snapshot in cache.
832  * \retval NULL if not present.
833  */
834 struct stasis_message *stasis_cache_entry_get_local(struct stasis_cache_entry *entry);
835
836 /*!
837  * \brief Get a remote entity's cache entry snapshot by index.
838  * \since 12.2.0
839  *
840  * \param entry Cache entry to get a remote entity's snapshot.
841  * \param idx Which remote entity's snapshot to get.
842  *
843  * \note A reference is not given to the returned pointer so don't unref it.
844  *
845  * \retval Remote-entity-snapshot in cache.
846  * \retval NULL if not present.
847  */
848 struct stasis_message *stasis_cache_entry_get_remote(struct stasis_cache_entry *entry, int idx);
849
850 /*!
851  * \brief Create a cache.
852  *
853  * This is the backend store for a \ref stasis_caching_topic. The cache is
854  * thread safe, allowing concurrent reads and writes.
855  *
856  * The returned object is AO2 managed, so ao2_cleanup() when you're done.
857  *
858  * \param id_fn Callback to extract the id from a snapshot message.
859  *
860  * \retval New cache indexed by \a id_fn.
861  * \retval \c NULL on error
862  *
863  * \since 12
864  */
865 struct stasis_cache *stasis_cache_create(snapshot_get_id id_fn);
866
867 /*!
868  * \brief Create a cache.
869  *
870  * This is the backend store for a \ref stasis_caching_topic. The cache is
871  * thread safe, allowing concurrent reads and writes.
872  *
873  * The returned object is AO2 managed, so ao2_cleanup() when you're done.
874  *
875  * \param id_fn Callback to extract the id from a snapshot message.
876  * \param aggregate_calc_fn Callback to calculate the aggregate cache entry.
877  * \param aggregate_publish_fn Callback to publish the aggregate cache entry.
878  *
879  * \note An aggregate message is a combined representation of the local
880  * and remote entities publishing the message data.  e.g., An aggregate
881  * device state represents the combined device state from the local and
882  * any remote entities publishing state for a device.  e.g., An aggregate
883  * MWI message is the old/new MWI counts accumulated from the local and
884  * any remote entities publishing to a mailbox.
885  *
886  * \retval New cache indexed by \a id_fn.
887  * \retval \c NULL on error
888  *
889  * \since 12.2.0
890  */
891 struct stasis_cache *stasis_cache_create_full(snapshot_get_id id_fn, cache_aggregate_calc_fn aggregate_calc_fn, cache_aggregate_publish_fn aggregate_publish_fn);
892
893 /*!
894  * \brief Create a topic which monitors and caches messages from another topic.
895  *
896  * The idea is that some topics publish 'snapshots' of some other object's state
897  * that should be cached. When these snapshot messages are received, the cache
898  * is updated, and a stasis_cache_update() message is forwarded, which has both
899  * the original snapshot message and the new message.
900  *
901  * The returned object is AO2 managed, so ao2_cleanup() when done with it.
902  *
903  * \param original_topic Topic publishing snapshot messages.
904  * \param cache Backend cache in which to keep snapshots.
905  * \return New topic which changes snapshot messages to stasis_cache_update()
906  *         messages, and forwards all other messages from the original topic.
907  * \return \c NULL on error
908  * \since 12
909  */
910 struct stasis_caching_topic *stasis_caching_topic_create(
911         struct stasis_topic *original_topic, struct stasis_cache *cache);
912
913 /*!
914  * \brief Unsubscribes a caching topic from its upstream topic.
915  *
916  * This function returns immediately, so be sure to cleanup when
917  * stasis_subscription_final_message() is received.
918  *
919  * \param caching_topic Caching topic to unsubscribe
920  * \return \c NULL for convenience
921  * \since 12
922  */
923 struct stasis_caching_topic *stasis_caching_unsubscribe(
924         struct stasis_caching_topic *caching_topic);
925
926 /*!
927  * \brief Unsubscribes a caching topic from its upstream topic, blocking until
928  * all messages have been forwarded.
929  *
930  * See stasis_unsubscriben_and_join() for more info on when to use this as
931  * opposed to stasis_caching_unsubscribe().
932  *
933  * \param caching_topic Caching topic to unsubscribe
934  * \return \c NULL for convenience
935  * \since 12
936  */
937 struct stasis_caching_topic *stasis_caching_unsubscribe_and_join(
938         struct stasis_caching_topic *caching_topic);
939
940 /*!
941  * \brief Returns the topic of cached events from a caching topics.
942  * \param caching_topic The caching topic.
943  * \return The topic that publishes cache update events, along with passthrough
944  *         events from the underlying topic.
945  * \return \c NULL if \a caching_topic is \c NULL.
946  * \since 12
947  */
948 struct stasis_topic *stasis_caching_get_topic(
949         struct stasis_caching_topic *caching_topic);
950
951 /*!
952  * \brief A message which instructs the caching topic to remove an entry from
953  * its cache.
954  *
955  * \param message Message representative of the cache entry that should be
956  *                cleared. This will become the data held in the
957  *                stasis_cache_clear message.
958  *
959  * \return Message which, when sent to a \ref stasis_caching_topic, will clear
960  *         the item from the cache.
961  * \return \c NULL on error.
962  * \since 12
963  */
964 struct stasis_message *stasis_cache_clear_create(struct stasis_message *message);
965
966 /*!
967  * \brief Retrieve an item from the cache for the ast_eid_default entity.
968  *
969  * The returned item is AO2 managed, so ao2_cleanup() when you're done with it.
970  *
971  * \param cache The cache to query.
972  * \param type Type of message to retrieve.
973  * \param id Identity of the snapshot to retrieve.
974  *
975  * \retval Message from the cache.
976  * \retval \c NULL if message is not found.
977  *
978  * \since 12
979  */
980 struct stasis_message *stasis_cache_get(struct stasis_cache *cache, struct stasis_message_type *type, const char *id);
981
982 /*!
983  * \brief Retrieve an item from the cache for a specific entity.
984  *
985  * The returned item is AO2 managed, so ao2_cleanup() when you're done with it.
986  *
987  * \param cache The cache to query.
988  * \param type Type of message to retrieve.
989  * \param id Identity of the snapshot to retrieve.
990  * \param eid Specific entity id to retrieve.  NULL for aggregate.
991  *
992  * \note An aggregate message is a combined representation of the local
993  * and remote entities publishing the message data.  e.g., An aggregate
994  * device state represents the combined device state from the local and
995  * any remote entities publishing state for a device.  e.g., An aggregate
996  * MWI message is the old/new MWI counts accumulated from the local and
997  * any remote entities publishing to a mailbox.
998  *
999  * \retval Message from the cache.
1000  * \retval \c NULL if message is not found.
1001  *
1002  * \since 12.2.0
1003  */
1004 struct stasis_message *stasis_cache_get_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const char *id, const struct ast_eid *eid);
1005
1006 /*!
1007  * \brief Retrieve all matching entity items from the cache.
1008  * \since 12.2.0
1009  *
1010  * \param cache The cache to query.
1011  * \param type Type of message to retrieve.
1012  * \param id Identity of the snapshot to retrieve.
1013  *
1014  * \retval Container of matching items found.
1015  * \retval \c NULL if error.
1016  */
1017 struct ao2_container *stasis_cache_get_all(struct stasis_cache *cache, struct stasis_message_type *type, const char *id);
1018
1019 /*!
1020  * \brief Dump cached items to a subscription for the ast_eid_default entity.
1021  *
1022  * \param cache The cache to query.
1023  * \param type Type of message to dump (any type if \c NULL).
1024  *
1025  * \retval ao2_container containing all matches (must be unreffed by caller)
1026  * \retval \c NULL on allocation error
1027  *
1028  * \since 12
1029  */
1030 struct ao2_container *stasis_cache_dump(struct stasis_cache *cache, struct stasis_message_type *type);
1031
1032 /*!
1033  * \brief Dump cached items to a subscription for a specific entity.
1034  * \since 12.2.0
1035  *
1036  * \param cache The cache to query.
1037  * \param type Type of message to dump (any type if \c NULL).
1038  * \param eid Specific entity id to retrieve.  NULL for aggregate.
1039  *
1040  * \retval ao2_container containing all matches (must be unreffed by caller)
1041  * \retval \c NULL on allocation error
1042  */
1043 struct ao2_container *stasis_cache_dump_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const struct ast_eid *eid);
1044
1045 /*!
1046  * \brief Dump all entity items from the cache to a subscription.
1047  * \since 12.2.0
1048  *
1049  * \param cache The cache to query.
1050  * \param type Type of message to dump (any type if \c NULL).
1051  *
1052  * \retval ao2_container containing all matches (must be unreffed by caller)
1053  * \retval \c NULL on allocation error
1054  */
1055 struct ao2_container *stasis_cache_dump_all(struct stasis_cache *cache, struct stasis_message_type *type);
1056
1057 /*!
1058  * \brief Object type code for multi user object snapshots
1059  */
1060 enum stasis_user_multi_object_snapshot_type {
1061         STASIS_UMOS_CHANNEL = 0,     /*!< Channel Snapshots */
1062         STASIS_UMOS_BRIDGE,          /*!< Bridge Snapshots */
1063         STASIS_UMOS_ENDPOINT,        /*!< Endpoint Snapshots */
1064 };
1065
1066 /*! \brief Number of snapshot types */
1067 #define STASIS_UMOS_MAX (STASIS_UMOS_ENDPOINT + 1)
1068
1069 /*!
1070  * \brief Message type for custom user defined events with multi object blobs
1071  * \return The stasis_message_type for user event
1072  * \since 12.3.0
1073  */
1074 struct stasis_message_type *ast_multi_user_event_type(void);
1075
1076 /*!
1077  * \brief Create a stasis multi object blob
1078  * \since 12.3.0
1079  *
1080  * \details
1081  * Multi object blob can store a combination of arbitrary json values
1082  * (the blob) and also snapshots of various other system objects (such
1083  * as channels, bridges, etc) for delivery through a stasis message.
1084  * The multi object blob is first created, then optionally objects
1085  * are added to it, before being attached to a message and delivered
1086  * to stasis topic.
1087  *
1088  * \param blob Json blob
1089  *
1090  * \note When used for an ast_multi_user_event_type message, the
1091  * json blob should contain at minimum {eventname: name}.
1092  *
1093  * \retval ast_multi_object_blob* if succeeded
1094  * \retval NULL if creation failed
1095  */
1096 struct ast_multi_object_blob *ast_multi_object_blob_create(struct ast_json *blob);
1097
1098 /*!
1099  * \brief Add an object to a multi object blob previously created
1100  * \since 12.3.0
1101  *
1102  * \param multi The multi object blob previously created
1103  * \param type Type code for the object such as channel, bridge, etc.
1104  * \param object Snapshot object of the type supplied to typename
1105  *
1106  * \return Nothing
1107  */
1108 void ast_multi_object_blob_add(struct ast_multi_object_blob *multi, enum stasis_user_multi_object_snapshot_type type, void *object);
1109
1110 /*!
1111  * \brief Create and publish a stasis message blob on a channel with it's snapshot
1112  * \since 12.3.0
1113  *
1114  * \details
1115  * For compatibility with app_userevent, this creates a multi object
1116  * blob message, attaches the channel snapshot to it, and publishes it
1117  * to the channel's topic.
1118  *
1119  * \param chan The channel to snapshot and publish event to
1120  * \param type The message type
1121  * \param blob A json blob to publish with the snapshot
1122  *
1123  * \return Nothing
1124  */
1125 void ast_multi_object_blob_single_channel_publish(struct ast_channel *chan, struct stasis_message_type *type, struct ast_json *blob);
1126
1127
1128 /*! @} */
1129
1130 /*! @{ */
1131
1132 /*!
1133  * \internal
1134  * \brief Log a message about invalid attempt to access a type.
1135  */
1136 void stasis_log_bad_type_access(const char *name);
1137
1138 /*!
1139  * \brief Boiler-plate messaging macro for defining public message types.
1140  *
1141  * \code
1142  *      STASIS_MESSAGE_TYPE_DEFN(ast_foo_type,
1143  *              .to_ami = foo_to_ami,
1144  *              .to_json = foo_to_json,
1145  *              .to_event = foo_to_event,
1146  *              );
1147  * \endcode
1148  *
1149  * \param name Name of message type.
1150  * \param ... Virtual table methods for messages of this type.
1151  * \since 12
1152  */
1153 #define STASIS_MESSAGE_TYPE_DEFN(name, ...)                             \
1154         static struct stasis_message_vtable _priv_ ## name ## _v = {    \
1155                 __VA_ARGS__                                             \
1156         };                                                              \
1157         static struct stasis_message_type *_priv_ ## name;              \
1158         struct stasis_message_type *name(void) {                        \
1159                 if (_priv_ ## name == NULL) {                           \
1160                         stasis_log_bad_type_access(#name);              \
1161                 }                                                       \
1162                 return _priv_ ## name;                                  \
1163         }
1164
1165 /*!
1166  * \brief Boiler-plate messaging macro for defining local message types.
1167  *
1168  * \code
1169  *      STASIS_MESSAGE_TYPE_DEFN_LOCAL(ast_foo_type,
1170  *              .to_ami = foo_to_ami,
1171  *              .to_json = foo_to_json,
1172  *              .to_event = foo_to_event,
1173  *              );
1174  * \endcode
1175  *
1176  * \param name Name of message type.
1177  * \param ... Virtual table methods for messages of this type.
1178  * \since 12
1179  */
1180 #define STASIS_MESSAGE_TYPE_DEFN_LOCAL(name, ...)                       \
1181         static struct stasis_message_vtable _priv_ ## name ## _v = {    \
1182                 __VA_ARGS__                                             \
1183         };                                                              \
1184         static struct stasis_message_type *_priv_ ## name;              \
1185         static struct stasis_message_type *name(void) {                 \
1186                 if (_priv_ ## name == NULL) {                           \
1187                         stasis_log_bad_type_access(#name);              \
1188                 }                                                       \
1189                 return _priv_ ## name;                                  \
1190         }
1191
1192 /*!
1193 * \brief Boiler-plate messaging macro for initializing message types.
1194  *
1195  * \code
1196  *      if (STASIS_MESSAGE_TYPE_INIT(ast_foo_type) != 0) {
1197  *              return -1;
1198  *      }
1199  * \endcode
1200  *
1201  * \param name Name of message type.
1202  * \return 0 if initialization is successful.
1203  * \return Non-zero on failure.
1204  * \since 12
1205  */
1206 #define STASIS_MESSAGE_TYPE_INIT(name)                                  \
1207         ({                                                              \
1208                 ast_assert(_priv_ ## name == NULL);                     \
1209                 stasis_message_type_create(#name,       \
1210                         &_priv_ ## name ## _v, &_priv_ ## name) == STASIS_MESSAGE_TYPE_ERROR ? 1 : 0;   \
1211         })
1212
1213 /*!
1214  * \brief Boiler-plate messaging macro for cleaning up message types.
1215  *
1216  * Note that if your type is defined in core instead of a loadable module, you
1217  * should call message type cleanup from an ast_register_cleanup() handler
1218  * instead of an ast_register_atexit() handler.
1219  *
1220  * The reason is that during an immediate shutdown, loadable modules (which may
1221  * refer to core message types) are not unloaded. While the atexit handlers are
1222  * run, there's a window of time where a module subscription might reference a
1223  * core message type after it's been cleaned up. Which is bad.
1224  *
1225  * \param name Name of message type.
1226  * \since 12
1227  */
1228 #define STASIS_MESSAGE_TYPE_CLEANUP(name)       \
1229         ({                                      \
1230                 ao2_cleanup(_priv_ ## name);    \
1231                 _priv_ ## name = NULL;          \
1232         })
1233
1234 /*! @} */
1235
1236 /*! @{ */
1237
1238 /*!
1239  * \brief Initialize the Stasis subsystem.
1240  * \return 0 on success.
1241  * \return Non-zero on error.
1242  * \since 12
1243  */
1244 int stasis_init(void);
1245
1246 /*! @} */
1247
1248 /*! @{ */
1249
1250 /*!
1251  * \internal
1252  * \brief called by stasis_init() for cache initialization.
1253  * \return 0 on success.
1254  * \return Non-zero on error.
1255  * \since 12
1256  */
1257 int stasis_cache_init(void);
1258
1259 /*!
1260  * \internal
1261  * \brief called by stasis_init() for config initialization.
1262  * \return 0 on success.
1263  * \return Non-zero on error.
1264  * \since 12
1265  */
1266 int stasis_config_init(void);
1267
1268 /*! @} */
1269
1270 /*!
1271  * \defgroup StasisTopicsAndMessages Stasis topics, and their messages.
1272  *
1273  * This group contains the topics, messages and corresponding message types
1274  * found within Asterisk.
1275  */
1276
1277 #endif /* _ASTERISK_STASIS_H */