stasis: Add methods to allow for synchronous publishing to subscriber
[asterisk/asterisk.git] / main / stasis.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis Message Bus API.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
33
34 #include "asterisk/astobj2.h"
35 #include "asterisk/stasis_internal.h"
36 #include "asterisk/stasis.h"
37 #include "asterisk/taskprocessor.h"
38 #include "asterisk/utils.h"
39 #include "asterisk/uuid.h"
40 #include "asterisk/vector.h"
41
42 /*!
43  * \page stasis-impl Stasis Implementation Notes
44  *
45  * \par Reference counting
46  *
47  * Stasis introduces a number of objects, which are tightly related to one
48  * another. Because we rely on ref-counting for memory management, understanding
49  * these relationships is important to understanding this code.
50  *
51  * \code{.txt}
52  *
53  *   stasis_topic <----> stasis_subscription
54  *             ^          ^
55  *              \        /
56  *               \      /
57  *               dispatch
58  *                  |
59  *                  |
60  *                  v
61  *            stasis_message
62  *                  |
63  *                  |
64  *                  v
65  *          stasis_message_type
66  *
67  * \endcode
68  *
69  * The most troubling thing in this chart is the cyclic reference between
70  * stasis_topic and stasis_subscription. This is both unfortunate, and
71  * necessary. Topics need the subscription in order to dispatch messages;
72  * subscriptions need the topics to unsubscribe and check subscription status.
73  *
74  * The cycle is broken by stasis_unsubscribe(). The unsubscribe will remove the
75  * topic's reference to a subscription. When the subcription is destroyed, it
76  * will remove its reference to the topic.
77  *
78  * This means that until a subscription has be explicitly unsubscribed, it will
79  * not be destroyed. Neither will a topic be destroyed while it has subscribers.
80  * The destructors of both have assertions regarding this to catch ref-counting
81  * problems where a subscription or topic has had an extra ao2_cleanup().
82  *
83  * The \ref dispatch object is a transient object, which is posted to a
84  * subscription's taskprocessor to send a message to the subscriber. They have
85  * short life cycles, allocated on one thread, destroyed on another.
86  *
87  * During shutdown, or the deletion of a domain object, there are a flurry of
88  * ao2_cleanup()s on subscriptions and topics, as the final in-flight messages
89  * are processed. Any one of these cleanups could be the one to actually destroy
90  * a given object, so care must be taken to ensure that an object isn't
91  * referenced after an ao2_cleanup(). This includes the implicit ao2_unlock()
92  * that might happen when a RAII_VAR() goes out of scope.
93  *
94  * \par Typical life cycles
95  *
96  *  \li stasis_topic - There are several topics which live for the duration of
97  *      the Asterisk process (ast_channel_topic_all(), etc.) but most of these
98  *      are actually fed by shorter-lived topics whose lifetime is associated
99  *      with some domain object (like ast_channel_topic() for a given
100  *      ast_channel).
101  *
102  *  \li stasis_subscription - Subscriptions have a similar mix of lifetimes as
103  *      topics, for similar reasons.
104  *
105  *  \li dispatch - Very short lived; just long enough to post a message to a
106  *      subscriber.
107  *
108  *  \li stasis_message - Short to intermediate lifetimes, but that is mostly
109  *      irrelevant. Messages are strictly data and have no behavior associated
110  *      with them, so it doesn't really matter if/when they are destroyed. By
111  *      design, a component could hold a ref to a message forever without any
112  *      ill consequences (aside from consuming more memory).
113  *
114  *  \li stasis_message_type - Long life cycles, typically only destroyed on
115  *      module unloading or _clean_ process exit.
116  *
117  * \par Subscriber shutdown sequencing
118  *
119  * Subscribers are sensitive to shutdown sequencing, specifically in how the
120  * reference message types. This is fully detailed on the wiki at
121  * https://wiki.asterisk.org/wiki/x/K4BqAQ.
122  *
123  * In short, the lifetime of the \a data (and \a callback, if in a module) must
124  * be held until the stasis_subscription_final_message() has been received.
125  * Depending on the structure of the subscriber code, this can be handled by
126  * using stasis_subscription_final_message() to free resources on the final
127  * message, or using stasis_subscription_join()/stasis_unsubscribe_and_join() to
128  * block until the unsubscribe has completed.
129  */
130
131 /*! Initial size of the subscribers list. */
132 #define INITIAL_SUBSCRIBERS_MAX 4
133
134 /*! The number of buckets to use for topic pools */
135 #define TOPIC_POOL_BUCKETS 57
136
137 STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type);
138
139 /*! \internal */
140 struct stasis_topic {
141         char *name;
142         /*! Variable length array of the subscribers */
143         AST_VECTOR(, struct stasis_subscription *) subscribers;
144
145         /*! Topics forwarding into this topic */
146         AST_VECTOR(, struct stasis_topic *) upstream_topics;
147 };
148
149 /* Forward declarations for the tightly-coupled subscription object */
150 static int topic_add_subscription(struct stasis_topic *topic,
151         struct stasis_subscription *sub);
152
153 static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
154
155 /*! \brief Lock two topics. */
156 #define topic_lock_both(topic1, topic2) \
157         do { \
158                 ao2_lock(topic1); \
159                 while (ao2_trylock(topic2)) { \
160                         AO2_DEADLOCK_AVOIDANCE(topic1); \
161                 } \
162         } while (0)
163
164 static void topic_dtor(void *obj)
165 {
166         struct stasis_topic *topic = obj;
167
168         /* Subscribers hold a reference to topics, so they should all be
169          * unsubscribed before we get here. */
170         ast_assert(AST_VECTOR_SIZE(&topic->subscribers) == 0);
171
172         ast_free(topic->name);
173         topic->name = NULL;
174
175         AST_VECTOR_FREE(&topic->subscribers);
176         AST_VECTOR_FREE(&topic->upstream_topics);
177 }
178
179 struct stasis_topic *stasis_topic_create(const char *name)
180 {
181         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
182         int res = 0;
183
184         topic = ao2_alloc(sizeof(*topic), topic_dtor);
185
186         if (!topic) {
187                 return NULL;
188         }
189
190         topic->name = ast_strdup(name);
191         if (!topic->name) {
192                 return NULL;
193         }
194
195         res |= AST_VECTOR_INIT(&topic->subscribers, INITIAL_SUBSCRIBERS_MAX);
196         res |= AST_VECTOR_INIT(&topic->upstream_topics, 0);
197
198         if (res != 0) {
199                 return NULL;
200         }
201
202         ao2_ref(topic, +1);
203         return topic;
204 }
205
206 const char *stasis_topic_name(const struct stasis_topic *topic)
207 {
208         return topic->name;
209 }
210
211 /*! \internal */
212 struct stasis_subscription {
213         /*! Unique ID for this subscription */
214         char uniqueid[AST_UUID_STR_LEN];
215         /*! Topic subscribed to. */
216         struct stasis_topic *topic;
217         /*! Mailbox for processing incoming messages. */
218         struct ast_taskprocessor *mailbox;
219         /*! Callback function for incoming message processing. */
220         stasis_subscription_cb callback;
221         /*! Data pointer to be handed to the callback. */
222         void *data;
223
224         /*! Lock for completion flags \c final_message_{rxed,processed}. */
225         ast_mutex_t join_lock;
226         /*! Condition for joining with subscription. */
227         ast_cond_t join_cond;
228         /*! Flag set when final message for sub has been received.
229          *  Be sure join_lock is held before reading/setting. */
230         int final_message_rxed;
231         /*! Flag set when final message for sub has been processed.
232          *  Be sure join_lock is held before reading/setting. */
233         int final_message_processed;
234 };
235
236 static void subscription_dtor(void *obj)
237 {
238         struct stasis_subscription *sub = obj;
239
240         /* Subscriptions need to be manually unsubscribed before destruction
241          * b/c there's a cyclic reference between topics and subscriptions */
242         ast_assert(!stasis_subscription_is_subscribed(sub));
243         /* If there are any messages in flight to this subscription; that would
244          * be bad. */
245         ast_assert(stasis_subscription_is_done(sub));
246
247         ao2_cleanup(sub->topic);
248         sub->topic = NULL;
249         ast_taskprocessor_unreference(sub->mailbox);
250         sub->mailbox = NULL;
251         ast_mutex_destroy(&sub->join_lock);
252         ast_cond_destroy(&sub->join_cond);
253 }
254
255 /*!
256  * \brief Invoke the subscription's callback.
257  * \param sub Subscription to invoke.
258  * \param topic Topic message was published to.
259  * \param message Message to send.
260  */
261 static void subscription_invoke(struct stasis_subscription *sub,
262                                   struct stasis_message *message)
263 {
264         /* Notify that the final message has been received */
265         if (stasis_subscription_final_message(sub, message)) {
266                 SCOPED_MUTEX(lock, &sub->join_lock);
267                 sub->final_message_rxed = 1;
268                 ast_cond_signal(&sub->join_cond);
269         }
270
271         /* Since sub is mostly immutable, no need to lock sub */
272         sub->callback(sub->data, sub, message);
273
274         /* Notify that the final message has been processed */
275         if (stasis_subscription_final_message(sub, message)) {
276                 SCOPED_MUTEX(lock, &sub->join_lock);
277                 sub->final_message_processed = 1;
278                 ast_cond_signal(&sub->join_cond);
279         }
280 }
281
282 static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
283 static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
284
285 struct stasis_subscription *internal_stasis_subscribe(
286         struct stasis_topic *topic,
287         stasis_subscription_cb callback,
288         void *data,
289         int needs_mailbox)
290 {
291         RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup);
292
293         if (!topic) {
294                 return NULL;
295         }
296
297         sub = ao2_alloc(sizeof(*sub), subscription_dtor);
298         if (!sub) {
299                 return NULL;
300         }
301
302         ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid));
303
304         if (needs_mailbox) {
305                 /* With a small number of subscribers, a thread-per-sub is
306                  * acceptable. If our usage changes so that we have larger
307                  * numbers of subscribers, we'll probably want to consider
308                  * a threadpool. We had that originally, but with so few
309                  * subscribers it was actually a performance loss instead of
310                  * a gain.
311                  */
312                 sub->mailbox = ast_taskprocessor_get(sub->uniqueid,
313                         TPS_REF_DEFAULT);
314                 if (!sub->mailbox) {
315                         return NULL;
316                 }
317                 ast_taskprocessor_set_local(sub->mailbox, sub);
318                 /* Taskprocessor has a reference */
319                 ao2_ref(sub, +1);
320         }
321
322         ao2_ref(topic, +1);
323         sub->topic = topic;
324         sub->callback = callback;
325         sub->data = data;
326         ast_mutex_init(&sub->join_lock);
327         ast_cond_init(&sub->join_cond, NULL);
328
329         if (topic_add_subscription(topic, sub) != 0) {
330                 return NULL;
331         }
332         send_subscription_subscribe(topic, sub);
333
334         ao2_ref(sub, +1);
335         return sub;
336 }
337
338 struct stasis_subscription *stasis_subscribe(
339         struct stasis_topic *topic,
340         stasis_subscription_cb callback,
341         void *data)
342 {
343         return internal_stasis_subscribe(topic, callback, data, 1);
344 }
345
346 static int sub_cleanup(void *data)
347 {
348         struct stasis_subscription *sub = data;
349         ao2_cleanup(sub);
350         return 0;
351 }
352
353 struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
354 {
355         /* The subscription may be the last ref to this topic. Hold
356          * the topic ref open until after the unlock. */
357         RAII_VAR(struct stasis_topic *, topic,
358                 ao2_bump(sub ? sub->topic : NULL), ao2_cleanup);
359
360         if (!sub) {
361                 return NULL;
362         }
363
364         /* We have to remove the subscription first, to ensure the unsubscribe
365          * is the final message */
366         if (topic_remove_subscription(sub->topic, sub) != 0) {
367                 ast_log(LOG_ERROR,
368                         "Internal error: subscription has invalid topic\n");
369                 return NULL;
370         }
371
372         /* Now let everyone know about the unsubscribe */
373         send_subscription_unsubscribe(topic, sub);
374
375         /* When all that's done, remove the ref the mailbox has on the sub */
376         if (sub->mailbox) {
377                 ast_taskprocessor_push(sub->mailbox, sub_cleanup, sub);
378         }
379
380         /* Unsubscribing unrefs the subscription */
381         ao2_cleanup(sub);
382         return NULL;
383 }
384
385 void stasis_subscription_join(struct stasis_subscription *subscription)
386 {
387         if (subscription) {
388                 SCOPED_MUTEX(lock, &subscription->join_lock);
389                 /* Wait until the processed flag has been set */
390                 while (!subscription->final_message_processed) {
391                         ast_cond_wait(&subscription->join_cond,
392                                 &subscription->join_lock);
393                 }
394         }
395 }
396
397 int stasis_subscription_is_done(struct stasis_subscription *subscription)
398 {
399         if (subscription) {
400                 SCOPED_MUTEX(lock, &subscription->join_lock);
401                 return subscription->final_message_rxed;
402         }
403
404         /* Null subscription is about as done as you can get */
405         return 1;
406 }
407
408 struct stasis_subscription *stasis_unsubscribe_and_join(
409         struct stasis_subscription *subscription)
410 {
411         if (!subscription) {
412                 return NULL;
413         }
414
415         /* Bump refcount to hold it past the unsubscribe */
416         ao2_ref(subscription, +1);
417         stasis_unsubscribe(subscription);
418         stasis_subscription_join(subscription);
419         /* Now decrement the refcount back */
420         ao2_cleanup(subscription);
421         return NULL;
422 }
423
424 int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
425 {
426         if (sub) {
427                 size_t i;
428                 struct stasis_topic *topic = sub->topic;
429                 SCOPED_AO2LOCK(lock_topic, topic);
430
431                 for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
432                         if (AST_VECTOR_GET(&topic->subscribers, i) == sub) {
433                                 return 1;
434                         }
435                 }
436         }
437
438         return 0;
439 }
440
441 const char *stasis_subscription_uniqueid(const struct stasis_subscription *sub)
442 {
443         return sub->uniqueid;
444 }
445
446 int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
447 {
448         struct stasis_subscription_change *change;
449         if (stasis_message_type(msg) != stasis_subscription_change_type()) {
450                 return 0;
451         }
452
453         change = stasis_message_data(msg);
454         if (strcmp("Unsubscribe", change->description)) {
455                 return 0;
456         }
457
458         if (strcmp(stasis_subscription_uniqueid(sub), change->uniqueid)) {
459                 return 0;
460         }
461
462         return 1;
463 }
464
465 /*!
466  * \brief Add a subscriber to a topic.
467  * \param topic Topic
468  * \param sub Subscriber
469  * \return 0 on success
470  * \return Non-zero on error
471  */
472 static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
473 {
474         size_t idx;
475         SCOPED_AO2LOCK(lock, topic);
476
477         /* The reference from the topic to the subscription is shared with
478          * the owner of the subscription, which will explicitly unsubscribe
479          * to release it.
480          *
481          * If we bumped the refcount here, the owner would have to unsubscribe
482          * and cleanup, which is a bit awkward. */
483         AST_VECTOR_APPEND(&topic->subscribers, sub);
484
485         for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
486                 topic_add_subscription(
487                         AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
488         }
489
490         return 0;
491 }
492
493 static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
494 {
495         size_t idx;
496         SCOPED_AO2LOCK(lock_topic, topic);
497
498         for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
499                 topic_remove_subscription(
500                         AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
501         }
502
503         return AST_VECTOR_REMOVE_ELEM_UNORDERED(&topic->subscribers, sub,
504                 AST_VECTOR_ELEM_CLEANUP_NOOP);
505 }
506
507 /*!
508  * \internal \brief Dispatch a message to a subscriber asynchronously
509  * \param local \ref ast_taskprocessor_local object
510  * \return 0
511  */
512 static int dispatch_exec_async(struct ast_taskprocessor_local *local)
513 {
514         struct stasis_subscription *sub = local->local_data;
515         struct stasis_message *message = local->data;
516
517         subscription_invoke(sub, message);
518         ao2_cleanup(message);
519
520         return 0;
521 }
522
523 /*!
524  * \internal \brief Data passed to \ref dispatch_exec_sync to synchronize
525  * a published message to a subscriber
526  */
527 struct sync_task_data {
528         ast_mutex_t lock;
529         ast_cond_t cond;
530         int complete;
531         void *task_data;
532 };
533
534 /*!
535  * \internal \brief Dispatch a message to a subscriber synchronously
536  * \param local \ref ast_taskprocessor_local object
537  * \return 0
538  */
539 static int dispatch_exec_sync(struct ast_taskprocessor_local *local)
540 {
541         struct stasis_subscription *sub = local->local_data;
542         struct sync_task_data *std = local->data;
543         struct stasis_message *message = std->task_data;
544
545         subscription_invoke(sub, message);
546         ao2_cleanup(message);
547
548         ast_mutex_lock(&std->lock);
549         std->complete = 1;
550         ast_cond_signal(&std->cond);
551         ast_mutex_unlock(&std->lock);
552
553         return 0;
554 }
555
556 /*!
557  * \internal \brief Dispatch a message to a subscriber
558  * \param sub The subscriber to dispatch to
559  * \param message The message to send
560  * \param synchronous If non-zero, synchronize on the subscriber receiving
561  * the message
562  */
563 static void dispatch_message(struct stasis_subscription *sub,
564         struct stasis_message *message,
565         int synchronous)
566 {
567         if (!sub->mailbox) {
568                 /* Dispatch directly */
569                 subscription_invoke(sub, message);
570                 return;
571         }
572
573         /* Bump the message for the taskprocessor push. This will get de-ref'd
574          * by the task processor callback.
575          */
576         ao2_bump(message);
577         if (!synchronous) {
578                 if (ast_taskprocessor_push_local(sub->mailbox,
579                                                      dispatch_exec_async,
580                                                      message) != 0) {
581                         /* Push failed; ugh. */
582                         ast_log(LOG_ERROR, "Dropping async dispatch\n");
583                         ao2_cleanup(message);
584                 }
585         } else {
586                 struct sync_task_data std;
587
588                 ast_mutex_init(&std.lock);
589                 ast_cond_init(&std.cond, NULL);
590                 std.complete = 0;
591                 std.task_data = message;
592
593                 if (ast_taskprocessor_push_local(sub->mailbox,
594                                                      dispatch_exec_sync,
595                                                      &std)) {
596                         /* Push failed; ugh. */
597                         ast_log(LOG_ERROR, "Dropping sync dispatch\n");
598                         ao2_cleanup(message);
599                         return;
600                 }
601
602                 ast_mutex_lock(&std.lock);
603                 while (!std.complete) {
604                         ast_cond_wait(&std.cond, &std.lock);
605                 }
606                 ast_mutex_unlock(&std.lock);
607
608                 ast_mutex_destroy(&std.lock);
609                 ast_cond_destroy(&std.cond);
610         }
611 }
612
613 /*!
614  * \internal \brief Publish a message to a topic's subscribers
615  * \brief topic The topic to publish to
616  * \brief message The message to publish
617  * \brief sync_sub An optional subscriber of the topic to publish synchronously
618  * to
619  */
620 static void publish_msg(struct stasis_topic *topic,
621         struct stasis_message *message, struct stasis_subscription *sync_sub)
622 {
623         size_t i;
624
625         ast_assert(topic != NULL);
626         ast_assert(message != NULL);
627
628         /*
629          * The topic may be unref'ed by the subscription invocation.
630          * Make sure we hold onto a reference while dispatching.
631          */
632         ao2_ref(topic, +1);
633         ao2_lock(topic);
634         for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
635                 struct stasis_subscription *sub = AST_VECTOR_GET(&topic->subscribers, i);
636
637                 ast_assert(sub != NULL);
638
639                 dispatch_message(sub, message, (sub == sync_sub));
640         }
641         ao2_unlock(topic);
642         ao2_ref(topic, -1);
643 }
644
645 void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
646 {
647         publish_msg(topic, message, NULL);
648 }
649
650 void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message)
651 {
652         ast_assert(sub != NULL);
653
654         publish_msg(sub->topic, message, sub);
655 }
656
657 /*!
658  * \brief Forwarding information
659  *
660  * Any message posted to \a from_topic is forwarded to \a to_topic.
661  *
662  * In cases where both the \a from_topic and \a to_topic need to be locked,
663  * always lock the \a to_topic first, then the \a from_topic. Lest you deadlock.
664  */
665 struct stasis_forward {
666         /*! Originating topic */
667         struct stasis_topic *from_topic;
668         /*! Destination topic */
669         struct stasis_topic *to_topic;
670 };
671
672 static void forward_dtor(void *obj)
673 {
674         struct stasis_forward *forward = obj;
675
676         ao2_cleanup(forward->from_topic);
677         forward->from_topic = NULL;
678         ao2_cleanup(forward->to_topic);
679         forward->to_topic = NULL;
680 }
681
682 struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward)
683 {
684         int idx;
685         struct stasis_topic *from;
686         struct stasis_topic *to;
687
688         if (!forward) {
689                 return NULL;
690         }
691
692         from = forward->from_topic;
693         to = forward->to_topic;
694
695         topic_lock_both(to, from);
696         AST_VECTOR_REMOVE_ELEM_UNORDERED(&to->upstream_topics, from,
697                 AST_VECTOR_ELEM_CLEANUP_NOOP);
698
699         for (idx = 0; idx < AST_VECTOR_SIZE(&to->subscribers); ++idx) {
700                 topic_remove_subscription(from, AST_VECTOR_GET(&to->subscribers, idx));
701         }
702         ao2_unlock(from);
703         ao2_unlock(to);
704
705         ao2_cleanup(forward);
706
707         return NULL;
708 }
709
710 struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic,
711         struct stasis_topic *to_topic)
712 {
713         int res;
714         size_t idx;
715         RAII_VAR(struct stasis_forward *, forward, NULL, ao2_cleanup);
716
717         if (!from_topic || !to_topic) {
718                 return NULL;
719         }
720
721         forward = ao2_alloc(sizeof(*forward), forward_dtor);
722         if (!forward) {
723                 return NULL;
724         }
725
726         forward->from_topic = ao2_bump(from_topic);
727         forward->to_topic = ao2_bump(to_topic);
728
729         topic_lock_both(to_topic, from_topic);
730         res = AST_VECTOR_APPEND(&to_topic->upstream_topics, from_topic);
731         if (res != 0) {
732                 ao2_unlock(from_topic);
733                 ao2_unlock(to_topic);
734                 return NULL;
735         }
736
737         for (idx = 0; idx < AST_VECTOR_SIZE(&to_topic->subscribers); ++idx) {
738                 topic_add_subscription(from_topic, AST_VECTOR_GET(&to_topic->subscribers, idx));
739         }
740         ao2_unlock(from_topic);
741         ao2_unlock(to_topic);
742
743         return ao2_bump(forward);
744 }
745
746 static void subscription_change_dtor(void *obj)
747 {
748         struct stasis_subscription_change *change = obj;
749         ast_string_field_free_memory(change);
750         ao2_cleanup(change->topic);
751 }
752
753 static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, const char *uniqueid, const char *description)
754 {
755         RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
756
757         change = ao2_alloc(sizeof(struct stasis_subscription_change), subscription_change_dtor);
758         if (ast_string_field_init(change, 128)) {
759                 return NULL;
760         }
761
762         ast_string_field_set(change, uniqueid, uniqueid);
763         ast_string_field_set(change, description, description);
764         ao2_ref(topic, +1);
765         change->topic = topic;
766
767         ao2_ref(change, +1);
768         return change;
769 }
770
771 static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
772 {
773         RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
774         RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
775
776         /* This assumes that we have already unsubscribed */
777         ast_assert(stasis_subscription_is_subscribed(sub));
778
779         change = subscription_change_alloc(topic, sub->uniqueid, "Subscribe");
780
781         if (!change) {
782                 return;
783         }
784
785         msg = stasis_message_create(stasis_subscription_change_type(), change);
786
787         if (!msg) {
788                 return;
789         }
790
791         stasis_publish(topic, msg);
792 }
793
794 static void send_subscription_unsubscribe(struct stasis_topic *topic,
795         struct stasis_subscription *sub)
796 {
797         RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
798         RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
799
800         /* This assumes that we have already unsubscribed */
801         ast_assert(!stasis_subscription_is_subscribed(sub));
802
803         change = subscription_change_alloc(topic, sub->uniqueid, "Unsubscribe");
804
805         if (!change) {
806                 return;
807         }
808
809         msg = stasis_message_create(stasis_subscription_change_type(), change);
810
811         if (!msg) {
812                 return;
813         }
814
815         stasis_publish(topic, msg);
816
817         /* Now we have to dispatch to the subscription itself */
818         dispatch_message(sub, msg, 0);
819 }
820
821 struct topic_pool_entry {
822         struct stasis_forward *forward;
823         struct stasis_topic *topic;
824 };
825
826 static void topic_pool_entry_dtor(void *obj)
827 {
828         struct topic_pool_entry *entry = obj;
829         entry->forward = stasis_forward_cancel(entry->forward);
830         ao2_cleanup(entry->topic);
831         entry->topic = NULL;
832 }
833
834 static struct topic_pool_entry *topic_pool_entry_alloc(void)
835 {
836         return ao2_alloc(sizeof(struct topic_pool_entry), topic_pool_entry_dtor);
837 }
838
839 struct stasis_topic_pool {
840         struct ao2_container *pool_container;
841         struct stasis_topic *pool_topic;
842 };
843
844 static void topic_pool_dtor(void *obj)
845 {
846         struct stasis_topic_pool *pool = obj;
847         ao2_cleanup(pool->pool_container);
848         pool->pool_container = NULL;
849         ao2_cleanup(pool->pool_topic);
850         pool->pool_topic = NULL;
851 }
852
853 static int topic_pool_entry_hash(const void *obj, const int flags)
854 {
855         const char *topic_name = (flags & OBJ_KEY) ? obj : stasis_topic_name(((struct topic_pool_entry*) obj)->topic);
856         return ast_str_case_hash(topic_name);
857 }
858
859 static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
860 {
861         struct topic_pool_entry *opt1 = obj, *opt2 = arg;
862         const char *topic_name = (flags & OBJ_KEY) ? arg : stasis_topic_name(opt2->topic);
863         return strcasecmp(stasis_topic_name(opt1->topic), topic_name) ? 0 : CMP_MATCH | CMP_STOP;
864 }
865
866 struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic)
867 {
868         RAII_VAR(struct stasis_topic_pool *, pool, ao2_alloc(sizeof(*pool), topic_pool_dtor), ao2_cleanup);
869         if (!pool) {
870                 return NULL;
871         }
872         pool->pool_container = ao2_container_alloc(TOPIC_POOL_BUCKETS, topic_pool_entry_hash, topic_pool_entry_cmp);
873         ao2_ref(pooled_topic, +1);
874         pool->pool_topic = pooled_topic;
875
876         ao2_ref(pool, +1);
877         return pool;
878 }
879
880 struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
881 {
882         RAII_VAR(struct topic_pool_entry *, topic_pool_entry, NULL, ao2_cleanup);
883         SCOPED_AO2LOCK(topic_container_lock, pool->pool_container);
884         topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_KEY | OBJ_NOLOCK);
885
886         if (topic_pool_entry) {
887                 return topic_pool_entry->topic;
888         }
889
890         topic_pool_entry = topic_pool_entry_alloc();
891
892         if (!topic_pool_entry) {
893                 return NULL;
894         }
895
896         topic_pool_entry->topic = stasis_topic_create(topic_name);
897         if (!topic_pool_entry->topic) {
898                 return NULL;
899         }
900
901         topic_pool_entry->forward = stasis_forward_all(topic_pool_entry->topic, pool->pool_topic);
902         if (!topic_pool_entry->forward) {
903                 return NULL;
904         }
905
906         ao2_link_flags(pool->pool_container, topic_pool_entry, OBJ_NOLOCK);
907
908         return topic_pool_entry->topic;
909 }
910
911 void stasis_log_bad_type_access(const char *name)
912 {
913         ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name);
914 }
915
916 /*! \brief Cleanup function for graceful shutdowns */
917 static void stasis_cleanup(void)
918 {
919         STASIS_MESSAGE_TYPE_CLEANUP(stasis_subscription_change_type);
920 }
921
922 int stasis_init(void)
923 {
924         int cache_init;
925
926         /* Be sure the types are cleaned up after the message bus */
927         ast_register_cleanup(stasis_cleanup);
928
929         cache_init = stasis_cache_init();
930         if (cache_init != 0) {
931                 return -1;
932         }
933
934         if (STASIS_MESSAGE_TYPE_INIT(stasis_subscription_change_type) != 0) {
935                 return -1;
936         }
937
938         return 0;
939 }