1d35bf87c9da602c76295e78e02557bb91364d27
[asterisk/asterisk.git] / include / asterisk / stasis.h
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 #ifndef _ASTERISK_STASIS_H
20 #define _ASTERISK_STASIS_H
21
22 /*! \file
23  *
24  * \brief Stasis Message Bus API. See \ref stasis "Stasis Message Bus API" for
25  * detailed documentation.
26  *
27  * \author David M. Lee, II <dlee@digium.com>
28  * \since 12
29  *
30  * \page stasis Stasis Message Bus API
31  *
32  * \par Intro
33  *
34  * The Stasis Message Bus is a loosely typed mechanism for distributing messages
35  * within Asterisk. It is designed to be:
36  *  - Loosely coupled; new message types can be added in seperate modules.
37  *  - Easy to use; publishing and subscribing are straightforward operations.
38  *
39  * There are three main concepts for using the Stasis Message Bus:
40  *  - \ref stasis_message
41  *  - \ref stasis_topic
42  *  - \ref stasis_subscription
43  *
44  * \par stasis_message
45  *
46  * Central to the Stasis Message Bus is the \ref stasis_message, the messages
47  * that are sent on the bus. These messages have:
48  *  - a type (as defined by a \ref stasis_message_type)
49  *  - a value - a \c void pointer to an AO2 object
50  *  - a timestamp when it was created
51  *
52  * Once a \ref stasis_message has been created, it is immutable and cannot
53  * change. The same goes for the value of the message (although this cannot be
54  * enforced in code). Messages themselves are reference-counted, AO2 objects,
55  * along with their values. By being both reference counted and immutable,
56  * messages can be shared throughout the system without any concerns for
57  * threading. (Well, the objects must be allocated with \ref
58  * AO2_ALLOC_OPT_LOCK_MUTEX so that the reference counting operations are thread
59  * safe. But other than that, no worries).
60  *
61  * The type of a message is defined by an instance of \ref stasis_message_type,
62  * which can be created by calling stasis_message_type_create(). Message types
63  * are named, which is useful in debugging. It is recommended that the string
64  * name for a message type match the name of the struct that's stored in the
65  * message. For example, name for \ref stasis_cache_update's message type is \c
66  * "stasis_cache_update".
67  *
68  * \par stasis_topic
69  *
70  * A \ref stasis_topic is an object to which \ref stasis_subscriber's may be
71  * subscribed, and \ref stasis_message's may be published. Any message published
72  * to the topic is dispatched to all of its subscribers. The topic itself may be
73  * named, which is useful in debugging.
74  *
75  * Topics themselves are reference counted objects. Since topics are referred to
76  * by their subscibers, they will not be freed until all of their subscribers
77  * have unsubscribed. Topics are also thread safe, so no worries about
78  * publishing/subscribing/unsubscribing to a topic concurrently from multiple
79  * threads. It's also designed to handle the case of unsubscribing from a topic
80  * from within the subscription handler.
81  *
82  * \par Forwarding
83  *
84  * There is one special case of topics that's worth noting: forwarding
85  * messages. It's a fairly common use case to want to forward all the messages
86  * published on one topic to another one (for example, an aggregator topic that
87  * publishes all the events from a set of other topics). This can be
88  * accomplished easily using stasis_forward_all(). This sets up the forwarding
89  * between the two topics, and returns a \ref stasis_subscription, which can be
90  * unsubscribed to stop the forwarding.
91  *
92  * \par Caching
93  *
94  * Another common use case is to want to cache certain messages that are
95  * published on the bus. Usually these events are snapshots of the current state
96  * in the system, and it's desirable to query that state from the cache without
97  * locking the original object. It's also desirable for subscribers of the
98  * caching topic to receive messages that have both the old cache value and the
99  * new value being put into the cache. For this, we have
100  * stasis_caching_topic_create(), providing it with the topic which publishes
101  * the messages that you wish to cache, and a function that can identify
102  * cacheable messages.
103  *
104  * The returned \ref stasis_caching_topic provides a topic that forwards
105  * non-cacheable messages unchanged. A cacheable message is wrapped in a \ref
106  * stasis_cache_update message which provides the old snapshot (or \c NULL if
107  * this is a new cache entry), and the new snapshot (or \c NULL if the entry was
108  * removed from the cache). A stasis_cache_clear_create() message must be sent
109  * to the topic in order to remove entries from the cache.
110  *
111  * In order to unsubscribe a \ref stasis_caching_topic from the upstream topic,
112  * call stasis_caching_unsubscribe(). Due to cyclic references, the \ref
113  * stasis_caching_topic will not be freed until after it has been unsubscribed,
114  * and all other ao2_ref()'s have been cleaned up.
115  *
116  * \par stasis_subscriber
117  *
118  * Any topic may be subscribed to by simply providing stasis_subscribe() the
119  * \ref stasis_topic to subscribe to, a handler function and \c void pointer to
120  * data that is passed back to the handler. Invocations on the subscription's
121  * handler are serialized, but different invocations may occur on different
122  * threads (this usually isn't important unless you use thread locals or
123  * something similar).
124  *
125  * In order to stop receiving messages, call stasis_unsubscribe() with your \ref
126  * stasis_subscription. Due to cyclic references, the \ref
127  * stasis_subscription will not be freed until after it has been unsubscribed,
128  * and all other ao2_ref()'s have been cleaned up.
129  */
130
131 #include "asterisk/utils.h"
132
133 /*! @{ */
134
135 /*!
136  * \brief Metadata about a \ref stasis_message.
137  * \since 12
138  */
139 struct stasis_message_type;
140
141 /*!
142  * \brief Register a new message type.
143  *
144  * \ref stasis_message_type is an AO2 object, so ao2_cleanup() when you're done
145  * with it.
146  *
147  * \param name Name of the new type.
148  * \return Pointer to the new type.
149  * \return \c NULL on error.
150  * \since 12
151  */
152 struct stasis_message_type *stasis_message_type_create(const char *name);
153
154 /*!
155  * \brief Gets the name of a given message type
156  * \param type The type to get.
157  * \return Name of the type.
158  * \return \c NULL if \a type is \c NULL.
159  * \since 12
160  */
161 const char *stasis_message_type_name(const struct stasis_message_type *type);
162
163 /*!
164  * \brief Opaque type for a Stasis message.
165  * \since 12
166  */
167 struct stasis_message;
168
169 /*!
170  * \brief Create a new message.
171  *
172  * This message is an \c ao2 object, and must be ao2_cleanup()'ed when you are done
173  * with it. Messages are also immutable, and must not be modified after they
174  * are initialized. Especially the \a data in the message.
175  *
176  * \param type Type of the message
177  * \param data Immutable data that is the actual contents of the message
178  * \return New message
179  * \return \c NULL on error
180  * \since 12
181  */
182 struct stasis_message *stasis_message_create(struct stasis_message_type *type, void *data);
183
184 /*!
185  * \brief Get the message type for a \ref stasis_message.
186  * \param msg Message to type
187  * \return Type of \a msg
188  * \return \c NULL if \a msg is \c NULL.
189  * \since 12
190  */
191 struct stasis_message_type *stasis_message_type(const struct stasis_message *msg);
192
193 /*!
194  * \brief Get the data contained in a message.
195  * \param msg Message.
196  * \return Immutable data pointer
197  * \return \c NULL if msg is \c NULL.
198  * \since 12
199  */
200 void *stasis_message_data(const struct stasis_message *msg);
201
202 /*!
203  * \brief Get the time when a message was created.
204  * \param msg Message.
205  * \return Pointer to the \a timeval when the message was created.
206  * \return \c NULL if msg is \c NULL.
207  * \since 12
208  */
209 const struct timeval *stasis_message_timestamp(const struct stasis_message *msg);
210
211 /*! @} */
212
213 /*! @{ */
214
215 /*!
216  * \brief A topic to which messages may be posted, and subscribers, well, subscribe
217  * \since 12
218  */
219 struct stasis_topic;
220
221 /*!
222  * \brief Create a new topic.
223  * \param name Name of the new topic.
224  * \return New topic instance.
225  * \return \c NULL on error.
226  * \since 12
227  */
228 struct stasis_topic *stasis_topic_create(const char *name);
229
230 /*!
231  * \brief Return the name of a topic.
232  * \param topic Topic.
233  * \return Name of the topic.
234  * \return \c NULL if topic is \c NULL.
235  * \since 12
236  */
237 const char *stasis_topic_name(const struct stasis_topic *topic);
238
239 /*!
240  * \brief Publish a message to a topic's subscribers.
241  * \param topic Topic.
242  * \param message Message to publish.
243  * \since 12
244  */
245 void stasis_publish(struct stasis_topic *topic, struct stasis_message *message);
246
247 /*!
248  * \brief Publish a message from a specified topic to all the subscribers of a
249  * possibly different topic.
250  * \param topic Topic to publish message to.
251  * \param topic Original topic message was from.
252  * \param message Message
253  * \since 12
254  */
255 void stasis_forward_message(struct stasis_topic *topic,
256                             struct stasis_topic *publisher_topic,
257                             struct stasis_message *message);
258
259 /*! @} */
260
261 /*! @{ */
262
263 /*!
264  * \brief Opaque type for a Stasis subscription.
265  * \since 12
266  */
267 struct stasis_subscription;
268
269 /*!
270  * \brief Callback function type for Stasis subscriptions.
271  * \param data Data field provided with subscription.
272  * \param topic Topic to which the message was published.
273  * \param message Published message.
274  * \since 12
275  */
276 typedef void (*stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message);
277
278 /*!
279  * \brief Create a subscription.
280  *
281  * In addition to being AO2 managed memory (requiring an ao2_cleanup() to free
282  * up this reference), the subscription must be explicitly unsubscribed from its
283  * topic using stasis_unsubscribe().
284  *
285  * The invocations of the callback are serialized, but may not always occur on
286  * the same thread. The invocation order of different subscriptions is
287  * unspecified.
288  *
289  * \param topic Topic to subscribe to.
290  * \param callback Callback function for subscription messages.
291  * \param data Data to be passed to the callback, in addition to the message.
292  * \return New \ref stasis_subscription object.
293  * \return \c NULL on error.
294  * \since 12
295  */
296 struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic,
297                                              stasis_subscription_cb callback,
298                                              void *data);
299
300 /*!
301  * \brief Cancel a subscription.
302  *
303  * Note that in an asynchronous system, there may still be messages queued or
304  * in transit to the subscription's callback. These will still be delivered.
305  * There will be a final 'SubscriptionCancelled' message, indicating the
306  * delivery of the final message.
307  *
308  * \param subscription Subscription to cancel.
309  * \since 12
310  */
311 void stasis_unsubscribe(struct stasis_subscription *subscription);
312
313 /*!
314  * \brief Create a subscription which forwards all messages from one topic to
315  * another.
316  *
317  * Note that the \a topic parameter of the invoked callback will the be \a topic
318  * the message was sent to, not the topic the subscriber subscribed to.
319  *
320  * \param from_topic Topic to forward.
321  * \param to_topic Destination topic of forwarded messages.
322  * \return New forwarding subscription.
323  * \return \c NULL on error.
324  * \since 12
325  */
326 struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic);
327
328 /*!
329  * \brief Get the unique ID for the subscription.
330  *
331  * \param sub Subscription for which to get the unique ID.
332  * \return Unique ID for the subscription.
333  * \since 12
334  */
335 const char *stasis_subscription_uniqueid(const struct stasis_subscription *sub);
336
337 /*!
338  * \brief Returns whether a subscription is currently subscribed.
339  *
340  * Note that there may still be messages queued up to be dispatched to this
341  * subscription, but the stasis_subscription_final_message() has been enqueued.
342  *
343  * \param sub Subscription to check
344  * \return False (zero) if subscription is not subscribed.
345  * \return True (non-zero) if still subscribed.
346  */
347 int stasis_subscription_is_subscribed(const struct stasis_subscription *sub);
348
349 /*!
350  * \brief Determine whether a message is the final message to be received on a subscription.
351  *
352  * \param sub Subscription on which the message was received.
353  * \param msg Message to check.
354  * \return zero if the provided message is not the final message.
355  * \return non-zero if the provided message is the final message.
356  * \since 12
357  */
358 int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg);
359
360 /*!
361  * \brief Holds details about changes to subscriptions for the specified topic
362  * \since 12
363  */
364 struct stasis_subscription_change {
365         AST_DECLARE_STRING_FIELDS(
366                 AST_STRING_FIELD(uniqueid);     /*!< The unique ID associated with this subscription */
367                 AST_STRING_FIELD(description);  /*!< The description of the change to the subscription associated with the uniqueid */
368         );
369         struct stasis_topic *topic;             /*!< The topic the subscription is/was subscribing to */
370 };
371
372 /*!
373  * \brief Gets the message type for subscription change notices
374  * \return The stasis_message_type for subscription change notices
375  * \since 12
376  */
377 struct stasis_message_type *stasis_subscription_change(void);
378
379 /*! @} */
380
381 /*! @{ */
382
383 /*!
384  * \brief A topic wrapper, which caches certain messages.
385  * \since 12
386  */
387 struct stasis_caching_topic;
388
389 /*!
390  * \brief Message type for cache update messages.
391  * \return Message type for cache update messages.
392  * \since 12
393  */
394 struct stasis_message_type *stasis_cache_update(void);
395
396 /*!
397  * \brief Cache update message
398  * \since 12
399  */
400 struct stasis_cache_update {
401         /*! \brief Topic that published \c new_snapshot */
402         struct stasis_topic *topic;
403         /*! \brief Convenience reference to snapshot type */
404         struct stasis_message_type *type;
405         /*! \brief Old value from the cache */
406         struct stasis_message *old_snapshot;
407         /*! \brief New value */
408         struct stasis_message *new_snapshot;
409 };
410
411 /*!
412  * \brief A message which instructs the caching topic to remove an entry from its cache.
413  * \param type Message type.
414  * \param id Unique id of the snapshot to clear.
415  * \return Message which, when sent to the \a topic, will clear the item from the cache.
416  * \return \c NULL on error.
417  * \since 12
418  */
419 struct stasis_message *stasis_cache_clear_create(struct stasis_message_type *type, const char *id);
420
421 /*!
422  * \brief Callback extract a unique identity from a snapshot message.
423  *
424  * This identity is unique to the underlying object of the snapshot, such as the
425  * UniqueId field of a channel.
426  *
427  * \param message Message to extract id from.
428  * \return String representing the snapshot's id.
429  * \return \c NULL if the message_type of the message isn't a handled snapshot.
430  * \since 12
431  */
432 typedef const char *(*snapshot_get_id)(struct stasis_message *message);
433
434 /*!
435  * \brief Create a topic which monitors and caches messages from another topic.
436  *
437  * The idea is that some topics publish 'snapshots' of some other object's state
438  * that should be cached. When these snapshot messages are received, the cache
439  * is updated, and a stasis_cache_update() message is forwarded, which has both
440  * the original snapshot message and the new message.
441  *
442  * \param original_topic Topic publishing snapshot messages.
443  * \param id_fn Callback to extract the id from a snapshot message.
444  * \return New topic which changes snapshot messages to stasis_cache_update()
445  *         messages, and forwards all other messages from the original topic.
446  * \since 12
447  */
448 struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, snapshot_get_id id_fn);
449
450 /*!
451  * Unsubscribes a caching topic from its upstream topic.
452  * \param caching_topic Caching topic to unsubscribe
453  * \since 12
454  */
455 void stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic);
456
457 /*!
458  * \brief Returns the topic of cached events from a caching topics.
459  * \param caching_topic The caching topic.
460  * \return The topic that publishes cache update events, along with passthrough events
461  *         from the underlying topic.
462  * \return \c NULL if \a caching_topic is \c NULL.
463  * \since 12
464  */
465 struct stasis_topic *stasis_caching_get_topic(struct stasis_caching_topic *caching_topic);
466
467 /*!
468  * \brief Retrieve an item from the cache.
469  * \param caching_topic The topic returned from stasis_caching_topic_create().
470  * \param type Type of message to retrieve.
471  * \param id Identity of the snapshot to retrieve.
472  * \return Message from the cache. The cache still owns the message, so
473  *         ao2_ref() if you want to keep it.
474  * \return \c NULL if message is not found.
475  * \since 12
476  */
477 struct stasis_message *stasis_cache_get(struct stasis_caching_topic *caching_topic,
478                                         struct stasis_message_type *type,
479                                         const char *id);
480
481 /*!
482  * \brief Dump cached items to a subscription
483  * \param caching_topic The topic returned from stasis_caching_topic_create().
484  * \param type Type of message to dump (any type if NULL).
485  * \return ao2_container containing all matches (must be unreffed by caller)
486  * \return NULL on allocation error
487  * \since 12
488  */
489 struct ao2_container *stasis_cache_dump(struct stasis_caching_topic *caching_topic,
490                                         struct stasis_message_type *type);
491
492 /*! @} */
493
494 /*! @{ */
495
496 /*!
497  * \brief Initialize the Stasis subsystem
498  * \return 0 on success.
499  * \return Non-zero on error.
500  * \since 12
501  */
502 int stasis_init(void);
503
504 /*!
505  * \private
506  * \brief called by stasis_init() for cache initialization.
507  * \return 0 on success.
508  * \return Non-zero on error.
509  * \since 12
510  */
511 int stasis_cache_init(void);
512
513 /*! @} */
514
515 #endif /* _ASTERISK_STASIS_H */