res_corosync: Update module to work with Stasis (and compile)
[asterisk/asterisk.git] / main / stasis.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis Message Bus API.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
33
34 #include "asterisk/astobj2.h"
35 #include "asterisk/stasis_internal.h"
36 #include "asterisk/stasis.h"
37 #include "asterisk/taskprocessor.h"
38 #include "asterisk/utils.h"
39 #include "asterisk/uuid.h"
40 #include "asterisk/vector.h"
41
42 /*!
43  * \page stasis-impl Stasis Implementation Notes
44  *
45  * \par Reference counting
46  *
47  * Stasis introduces a number of objects, which are tightly related to one
48  * another. Because we rely on ref-counting for memory management, understanding
49  * these relationships is important to understanding this code.
50  *
51  * \code{.txt}
52  *
53  *   stasis_topic <----> stasis_subscription
54  *             ^          ^
55  *              \        /
56  *               \      /
57  *               dispatch
58  *                  |
59  *                  |
60  *                  v
61  *            stasis_message
62  *                  |
63  *                  |
64  *                  v
65  *          stasis_message_type
66  *
67  * \endcode
68  *
69  * The most troubling thing in this chart is the cyclic reference between
70  * stasis_topic and stasis_subscription. This is both unfortunate, and
71  * necessary. Topics need the subscription in order to dispatch messages;
72  * subscriptions need the topics to unsubscribe and check subscription status.
73  *
74  * The cycle is broken by stasis_unsubscribe(). The unsubscribe will remove the
75  * topic's reference to a subscription. When the subcription is destroyed, it
76  * will remove its reference to the topic.
77  *
78  * This means that until a subscription has be explicitly unsubscribed, it will
79  * not be destroyed. Neither will a topic be destroyed while it has subscribers.
80  * The destructors of both have assertions regarding this to catch ref-counting
81  * problems where a subscription or topic has had an extra ao2_cleanup().
82  *
83  * The \ref dispatch object is a transient object, which is posted to a
84  * subscription's taskprocessor to send a message to the subscriber. They have
85  * short life cycles, allocated on one thread, destroyed on another.
86  *
87  * During shutdown, or the deletion of a domain object, there are a flurry of
88  * ao2_cleanup()s on subscriptions and topics, as the final in-flight messages
89  * are processed. Any one of these cleanups could be the one to actually destroy
90  * a given object, so care must be taken to ensure that an object isn't
91  * referenced after an ao2_cleanup(). This includes the implicit ao2_unlock()
92  * that might happen when a RAII_VAR() goes out of scope.
93  *
94  * \par Typical life cycles
95  *
96  *  \li stasis_topic - There are several topics which live for the duration of
97  *      the Asterisk process (ast_channel_topic_all(), etc.) but most of these
98  *      are actually fed by shorter-lived topics whose lifetime is associated
99  *      with some domain object (like ast_channel_topic() for a given
100  *      ast_channel).
101  *
102  *  \li stasis_subscription - Subscriptions have a similar mix of lifetimes as
103  *      topics, for similar reasons.
104  *
105  *  \li dispatch - Very short lived; just long enough to post a message to a
106  *      subscriber.
107  *
108  *  \li stasis_message - Short to intermediate lifetimes, but that is mostly
109  *      irrelevant. Messages are strictly data and have no behavior associated
110  *      with them, so it doesn't really matter if/when they are destroyed. By
111  *      design, a component could hold a ref to a message forever without any
112  *      ill consequences (aside from consuming more memory).
113  *
114  *  \li stasis_message_type - Long life cycles, typically only destroyed on
115  *      module unloading or _clean_ process exit.
116  *
117  * \par Subscriber shutdown sequencing
118  *
119  * Subscribers are sensitive to shutdown sequencing, specifically in how the
120  * reference message types. This is fully detailed on the wiki at
121  * https://wiki.asterisk.org/wiki/x/K4BqAQ.
122  *
123  * In short, the lifetime of the \a data (and \a callback, if in a module) must
124  * be held until the stasis_subscription_final_message() has been received.
125  * Depending on the structure of the subscriber code, this can be handled by
126  * using stasis_subscription_final_message() to free resources on the final
127  * message, or using stasis_subscription_join()/stasis_unsubscribe_and_join() to
128  * block until the unsubscribe has completed.
129  */
130
131 /*! Initial size of the subscribers list. */
132 #define INITIAL_SUBSCRIBERS_MAX 4
133
134 /*! The number of buckets to use for topic pools */
135 #define TOPIC_POOL_BUCKETS 57
136
137 STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type);
138
139 /*! \internal */
140 struct stasis_topic {
141         char *name;
142         /*! Variable length array of the subscribers */
143         AST_VECTOR(, struct stasis_subscription *) subscribers;
144
145         /*! Topics forwarding into this topic */
146         AST_VECTOR(, struct stasis_topic *) upstream_topics;
147 };
148
149 /* Forward declarations for the tightly-coupled subscription object */
150 static int topic_add_subscription(struct stasis_topic *topic,
151         struct stasis_subscription *sub);
152
153 static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
154
155 /*! \brief Lock two topics. */
156 #define topic_lock_both(topic1, topic2) \
157         do { \
158                 ao2_lock(topic1); \
159                 while (ao2_trylock(topic2)) { \
160                         AO2_DEADLOCK_AVOIDANCE(topic1); \
161                 } \
162         } while (0)
163
164 static void topic_dtor(void *obj)
165 {
166         struct stasis_topic *topic = obj;
167
168         /* Subscribers hold a reference to topics, so they should all be
169          * unsubscribed before we get here. */
170         ast_assert(AST_VECTOR_SIZE(&topic->subscribers) == 0);
171
172         ast_free(topic->name);
173         topic->name = NULL;
174
175         AST_VECTOR_FREE(&topic->subscribers);
176         AST_VECTOR_FREE(&topic->upstream_topics);
177 }
178
179 struct stasis_topic *stasis_topic_create(const char *name)
180 {
181         struct stasis_topic *topic;
182         int res = 0;
183
184         topic = ao2_alloc(sizeof(*topic), topic_dtor);
185         if (!topic) {
186                 return NULL;
187         }
188
189         topic->name = ast_strdup(name);
190         res |= AST_VECTOR_INIT(&topic->subscribers, INITIAL_SUBSCRIBERS_MAX);
191         res |= AST_VECTOR_INIT(&topic->upstream_topics, 0);
192         if (!topic->name || res) {
193                 ao2_cleanup(topic);
194                 return NULL;
195         }
196
197         return topic;
198 }
199
200 const char *stasis_topic_name(const struct stasis_topic *topic)
201 {
202         return topic->name;
203 }
204
205 /*! \internal */
206 struct stasis_subscription {
207         /*! Unique ID for this subscription */
208         char uniqueid[AST_UUID_STR_LEN];
209         /*! Topic subscribed to. */
210         struct stasis_topic *topic;
211         /*! Mailbox for processing incoming messages. */
212         struct ast_taskprocessor *mailbox;
213         /*! Callback function for incoming message processing. */
214         stasis_subscription_cb callback;
215         /*! Data pointer to be handed to the callback. */
216         void *data;
217
218         /*! Condition for joining with subscription. */
219         ast_cond_t join_cond;
220         /*! Flag set when final message for sub has been received.
221          *  Be sure join_lock is held before reading/setting. */
222         int final_message_rxed;
223         /*! Flag set when final message for sub has been processed.
224          *  Be sure join_lock is held before reading/setting. */
225         int final_message_processed;
226 };
227
228 static void subscription_dtor(void *obj)
229 {
230         struct stasis_subscription *sub = obj;
231
232         /* Subscriptions need to be manually unsubscribed before destruction
233          * b/c there's a cyclic reference between topics and subscriptions */
234         ast_assert(!stasis_subscription_is_subscribed(sub));
235         /* If there are any messages in flight to this subscription; that would
236          * be bad. */
237         ast_assert(stasis_subscription_is_done(sub));
238
239         ao2_cleanup(sub->topic);
240         sub->topic = NULL;
241         ast_taskprocessor_unreference(sub->mailbox);
242         sub->mailbox = NULL;
243         ast_cond_destroy(&sub->join_cond);
244 }
245
246 /*!
247  * \brief Invoke the subscription's callback.
248  * \param sub Subscription to invoke.
249  * \param topic Topic message was published to.
250  * \param message Message to send.
251  */
252 static void subscription_invoke(struct stasis_subscription *sub,
253                                   struct stasis_message *message)
254 {
255         /* Notify that the final message has been received */
256         if (stasis_subscription_final_message(sub, message)) {
257                 SCOPED_AO2LOCK(lock, sub);
258
259                 sub->final_message_rxed = 1;
260                 ast_cond_signal(&sub->join_cond);
261         }
262
263         /* Since sub is mostly immutable, no need to lock sub */
264         sub->callback(sub->data, sub, message);
265
266         /* Notify that the final message has been processed */
267         if (stasis_subscription_final_message(sub, message)) {
268                 SCOPED_AO2LOCK(lock, sub);
269
270                 sub->final_message_processed = 1;
271                 ast_cond_signal(&sub->join_cond);
272         }
273 }
274
275 static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
276 static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
277
278 struct stasis_subscription *internal_stasis_subscribe(
279         struct stasis_topic *topic,
280         stasis_subscription_cb callback,
281         void *data,
282         int needs_mailbox)
283 {
284         RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup);
285
286         if (!topic) {
287                 return NULL;
288         }
289
290         /* The ao2 lock is used for join_cond. */
291         sub = ao2_alloc(sizeof(*sub), subscription_dtor);
292         if (!sub) {
293                 return NULL;
294         }
295
296         ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid));
297
298         if (needs_mailbox) {
299                 /* With a small number of subscribers, a thread-per-sub is
300                  * acceptable. If our usage changes so that we have larger
301                  * numbers of subscribers, we'll probably want to consider
302                  * a threadpool. We had that originally, but with so few
303                  * subscribers it was actually a performance loss instead of
304                  * a gain.
305                  */
306                 sub->mailbox = ast_taskprocessor_get(sub->uniqueid,
307                         TPS_REF_DEFAULT);
308                 if (!sub->mailbox) {
309                         return NULL;
310                 }
311                 ast_taskprocessor_set_local(sub->mailbox, sub);
312                 /* Taskprocessor has a reference */
313                 ao2_ref(sub, +1);
314         }
315
316         ao2_ref(topic, +1);
317         sub->topic = topic;
318         sub->callback = callback;
319         sub->data = data;
320         ast_cond_init(&sub->join_cond, NULL);
321
322         if (topic_add_subscription(topic, sub) != 0) {
323                 return NULL;
324         }
325         send_subscription_subscribe(topic, sub);
326
327         ao2_ref(sub, +1);
328         return sub;
329 }
330
331 struct stasis_subscription *stasis_subscribe(
332         struct stasis_topic *topic,
333         stasis_subscription_cb callback,
334         void *data)
335 {
336         return internal_stasis_subscribe(topic, callback, data, 1);
337 }
338
339 static int sub_cleanup(void *data)
340 {
341         struct stasis_subscription *sub = data;
342         ao2_cleanup(sub);
343         return 0;
344 }
345
346 struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
347 {
348         /* The subscription may be the last ref to this topic. Hold
349          * the topic ref open until after the unlock. */
350         RAII_VAR(struct stasis_topic *, topic,
351                 ao2_bump(sub ? sub->topic : NULL), ao2_cleanup);
352
353         if (!sub) {
354                 return NULL;
355         }
356
357         /* We have to remove the subscription first, to ensure the unsubscribe
358          * is the final message */
359         if (topic_remove_subscription(sub->topic, sub) != 0) {
360                 ast_log(LOG_ERROR,
361                         "Internal error: subscription has invalid topic\n");
362                 return NULL;
363         }
364
365         /* Now let everyone know about the unsubscribe */
366         send_subscription_unsubscribe(topic, sub);
367
368         /* When all that's done, remove the ref the mailbox has on the sub */
369         if (sub->mailbox) {
370                 ast_taskprocessor_push(sub->mailbox, sub_cleanup, sub);
371         }
372
373         /* Unsubscribing unrefs the subscription */
374         ao2_cleanup(sub);
375         return NULL;
376 }
377
378 void stasis_subscription_join(struct stasis_subscription *subscription)
379 {
380         if (subscription) {
381                 SCOPED_AO2LOCK(lock, subscription);
382
383                 /* Wait until the processed flag has been set */
384                 while (!subscription->final_message_processed) {
385                         ast_cond_wait(&subscription->join_cond,
386                                 ao2_object_get_lockaddr(subscription));
387                 }
388         }
389 }
390
391 int stasis_subscription_is_done(struct stasis_subscription *subscription)
392 {
393         if (subscription) {
394                 SCOPED_AO2LOCK(lock, subscription);
395
396                 return subscription->final_message_rxed;
397         }
398
399         /* Null subscription is about as done as you can get */
400         return 1;
401 }
402
403 struct stasis_subscription *stasis_unsubscribe_and_join(
404         struct stasis_subscription *subscription)
405 {
406         if (!subscription) {
407                 return NULL;
408         }
409
410         /* Bump refcount to hold it past the unsubscribe */
411         ao2_ref(subscription, +1);
412         stasis_unsubscribe(subscription);
413         stasis_subscription_join(subscription);
414         /* Now decrement the refcount back */
415         ao2_cleanup(subscription);
416         return NULL;
417 }
418
419 int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
420 {
421         if (sub) {
422                 size_t i;
423                 struct stasis_topic *topic = sub->topic;
424                 SCOPED_AO2LOCK(lock_topic, topic);
425
426                 for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
427                         if (AST_VECTOR_GET(&topic->subscribers, i) == sub) {
428                                 return 1;
429                         }
430                 }
431         }
432
433         return 0;
434 }
435
436 const char *stasis_subscription_uniqueid(const struct stasis_subscription *sub)
437 {
438         return sub->uniqueid;
439 }
440
441 int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
442 {
443         struct stasis_subscription_change *change;
444
445         if (stasis_message_type(msg) != stasis_subscription_change_type()) {
446                 return 0;
447         }
448
449         change = stasis_message_data(msg);
450         if (strcmp("Unsubscribe", change->description)) {
451                 return 0;
452         }
453
454         if (strcmp(stasis_subscription_uniqueid(sub), change->uniqueid)) {
455                 return 0;
456         }
457
458         return 1;
459 }
460
461 /*!
462  * \brief Add a subscriber to a topic.
463  * \param topic Topic
464  * \param sub Subscriber
465  * \return 0 on success
466  * \return Non-zero on error
467  */
468 static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
469 {
470         size_t idx;
471         SCOPED_AO2LOCK(lock, topic);
472
473         /* The reference from the topic to the subscription is shared with
474          * the owner of the subscription, which will explicitly unsubscribe
475          * to release it.
476          *
477          * If we bumped the refcount here, the owner would have to unsubscribe
478          * and cleanup, which is a bit awkward. */
479         AST_VECTOR_APPEND(&topic->subscribers, sub);
480
481         for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
482                 topic_add_subscription(
483                         AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
484         }
485
486         return 0;
487 }
488
489 static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
490 {
491         size_t idx;
492         SCOPED_AO2LOCK(lock_topic, topic);
493
494         for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
495                 topic_remove_subscription(
496                         AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
497         }
498
499         return AST_VECTOR_REMOVE_ELEM_UNORDERED(&topic->subscribers, sub,
500                 AST_VECTOR_ELEM_CLEANUP_NOOP);
501 }
502
503 /*!
504  * \internal \brief Dispatch a message to a subscriber asynchronously
505  * \param local \ref ast_taskprocessor_local object
506  * \return 0
507  */
508 static int dispatch_exec_async(struct ast_taskprocessor_local *local)
509 {
510         struct stasis_subscription *sub = local->local_data;
511         struct stasis_message *message = local->data;
512
513         subscription_invoke(sub, message);
514         ao2_cleanup(message);
515
516         return 0;
517 }
518
519 /*!
520  * \internal \brief Data passed to \ref dispatch_exec_sync to synchronize
521  * a published message to a subscriber
522  */
523 struct sync_task_data {
524         ast_mutex_t lock;
525         ast_cond_t cond;
526         int complete;
527         void *task_data;
528 };
529
530 /*!
531  * \internal \brief Dispatch a message to a subscriber synchronously
532  * \param local \ref ast_taskprocessor_local object
533  * \return 0
534  */
535 static int dispatch_exec_sync(struct ast_taskprocessor_local *local)
536 {
537         struct stasis_subscription *sub = local->local_data;
538         struct sync_task_data *std = local->data;
539         struct stasis_message *message = std->task_data;
540
541         subscription_invoke(sub, message);
542         ao2_cleanup(message);
543
544         ast_mutex_lock(&std->lock);
545         std->complete = 1;
546         ast_cond_signal(&std->cond);
547         ast_mutex_unlock(&std->lock);
548
549         return 0;
550 }
551
552 /*!
553  * \internal \brief Dispatch a message to a subscriber
554  * \param sub The subscriber to dispatch to
555  * \param message The message to send
556  * \param synchronous If non-zero, synchronize on the subscriber receiving
557  * the message
558  */
559 static void dispatch_message(struct stasis_subscription *sub,
560         struct stasis_message *message,
561         int synchronous)
562 {
563         if (!sub->mailbox) {
564                 /* Dispatch directly */
565                 subscription_invoke(sub, message);
566                 return;
567         }
568
569         /* Bump the message for the taskprocessor push. This will get de-ref'd
570          * by the task processor callback.
571          */
572         ao2_bump(message);
573         if (!synchronous) {
574                 if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec_async, message)) {
575                         /* Push failed; ugh. */
576                         ast_log(LOG_ERROR, "Dropping async dispatch\n");
577                         ao2_cleanup(message);
578                 }
579         } else {
580                 struct sync_task_data std;
581
582                 ast_mutex_init(&std.lock);
583                 ast_cond_init(&std.cond, NULL);
584                 std.complete = 0;
585                 std.task_data = message;
586
587                 if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec_sync, &std)) {
588                         /* Push failed; ugh. */
589                         ast_log(LOG_ERROR, "Dropping sync dispatch\n");
590                         ao2_cleanup(message);
591                         ast_mutex_destroy(&std.lock);
592                         ast_cond_destroy(&std.cond);
593                         return;
594                 }
595
596                 ast_mutex_lock(&std.lock);
597                 while (!std.complete) {
598                         ast_cond_wait(&std.cond, &std.lock);
599                 }
600                 ast_mutex_unlock(&std.lock);
601
602                 ast_mutex_destroy(&std.lock);
603                 ast_cond_destroy(&std.cond);
604         }
605 }
606
607 /*!
608  * \internal \brief Publish a message to a topic's subscribers
609  * \brief topic The topic to publish to
610  * \brief message The message to publish
611  * \brief sync_sub An optional subscriber of the topic to publish synchronously
612  * to
613  */
614 static void publish_msg(struct stasis_topic *topic,
615         struct stasis_message *message, struct stasis_subscription *sync_sub)
616 {
617         size_t i;
618
619         ast_assert(topic != NULL);
620         ast_assert(message != NULL);
621
622         /*
623          * The topic may be unref'ed by the subscription invocation.
624          * Make sure we hold onto a reference while dispatching.
625          */
626         ao2_ref(topic, +1);
627         ao2_lock(topic);
628         for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
629                 struct stasis_subscription *sub = AST_VECTOR_GET(&topic->subscribers, i);
630
631                 ast_assert(sub != NULL);
632
633                 dispatch_message(sub, message, (sub == sync_sub));
634         }
635         ao2_unlock(topic);
636         ao2_ref(topic, -1);
637 }
638
639 void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
640 {
641         publish_msg(topic, message, NULL);
642 }
643
644 void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message)
645 {
646         ast_assert(sub != NULL);
647
648         publish_msg(sub->topic, message, sub);
649 }
650
651 /*!
652  * \brief Forwarding information
653  *
654  * Any message posted to \a from_topic is forwarded to \a to_topic.
655  *
656  * In cases where both the \a from_topic and \a to_topic need to be locked,
657  * always lock the \a to_topic first, then the \a from_topic. Lest you deadlock.
658  */
659 struct stasis_forward {
660         /*! Originating topic */
661         struct stasis_topic *from_topic;
662         /*! Destination topic */
663         struct stasis_topic *to_topic;
664 };
665
666 static void forward_dtor(void *obj)
667 {
668         struct stasis_forward *forward = obj;
669
670         ao2_cleanup(forward->from_topic);
671         forward->from_topic = NULL;
672         ao2_cleanup(forward->to_topic);
673         forward->to_topic = NULL;
674 }
675
676 struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward)
677 {
678         int idx;
679         struct stasis_topic *from;
680         struct stasis_topic *to;
681
682         if (!forward) {
683                 return NULL;
684         }
685
686         from = forward->from_topic;
687         to = forward->to_topic;
688
689         if (from && to) {
690                 topic_lock_both(to, from);
691                 AST_VECTOR_REMOVE_ELEM_UNORDERED(&to->upstream_topics, from,
692                         AST_VECTOR_ELEM_CLEANUP_NOOP);
693
694                 for (idx = 0; idx < AST_VECTOR_SIZE(&to->subscribers); ++idx) {
695                         topic_remove_subscription(from, AST_VECTOR_GET(&to->subscribers, idx));
696                 }
697                 ao2_unlock(from);
698                 ao2_unlock(to);
699         }
700
701         ao2_cleanup(forward);
702
703         return NULL;
704 }
705
706 struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic,
707         struct stasis_topic *to_topic)
708 {
709         int res;
710         size_t idx;
711         RAII_VAR(struct stasis_forward *, forward, NULL, ao2_cleanup);
712
713         if (!from_topic || !to_topic) {
714                 return NULL;
715         }
716
717         forward = ao2_alloc_options(sizeof(*forward), forward_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
718         if (!forward) {
719                 return NULL;
720         }
721
722         /* Forwards to ourselves are implicit. */
723         if (to_topic == from_topic) {
724                 return ao2_bump(forward);
725         }
726
727         forward->from_topic = ao2_bump(from_topic);
728         forward->to_topic = ao2_bump(to_topic);
729
730         topic_lock_both(to_topic, from_topic);
731         res = AST_VECTOR_APPEND(&to_topic->upstream_topics, from_topic);
732         if (res != 0) {
733                 ao2_unlock(from_topic);
734                 ao2_unlock(to_topic);
735                 return NULL;
736         }
737
738         for (idx = 0; idx < AST_VECTOR_SIZE(&to_topic->subscribers); ++idx) {
739                 topic_add_subscription(from_topic, AST_VECTOR_GET(&to_topic->subscribers, idx));
740         }
741         ao2_unlock(from_topic);
742         ao2_unlock(to_topic);
743
744         return ao2_bump(forward);
745 }
746
747 static void subscription_change_dtor(void *obj)
748 {
749         struct stasis_subscription_change *change = obj;
750
751         ast_string_field_free_memory(change);
752         ao2_cleanup(change->topic);
753 }
754
755 static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, const char *uniqueid, const char *description)
756 {
757         struct stasis_subscription_change *change;
758
759         change = ao2_alloc(sizeof(struct stasis_subscription_change), subscription_change_dtor);
760         if (!change || ast_string_field_init(change, 128)) {
761                 ao2_cleanup(change);
762                 return NULL;
763         }
764
765         ast_string_field_set(change, uniqueid, uniqueid);
766         ast_string_field_set(change, description, description);
767         ao2_ref(topic, +1);
768         change->topic = topic;
769
770         return change;
771 }
772
773 static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
774 {
775         struct stasis_subscription_change *change;
776         struct stasis_message *msg;
777
778         /* This assumes that we have already unsubscribed */
779         ast_assert(stasis_subscription_is_subscribed(sub));
780
781         change = subscription_change_alloc(topic, sub->uniqueid, "Subscribe");
782         if (!change) {
783                 return;
784         }
785
786         msg = stasis_message_create(stasis_subscription_change_type(), change);
787         if (!msg) {
788                 ao2_cleanup(change);
789                 return;
790         }
791
792         stasis_publish(topic, msg);
793         ao2_cleanup(msg);
794         ao2_cleanup(change);
795 }
796
797 static void send_subscription_unsubscribe(struct stasis_topic *topic,
798         struct stasis_subscription *sub)
799 {
800         struct stasis_subscription_change *change;
801         struct stasis_message *msg;
802
803         /* This assumes that we have already unsubscribed */
804         ast_assert(!stasis_subscription_is_subscribed(sub));
805
806         change = subscription_change_alloc(topic, sub->uniqueid, "Unsubscribe");
807         if (!change) {
808                 return;
809         }
810
811         msg = stasis_message_create(stasis_subscription_change_type(), change);
812         if (!msg) {
813                 ao2_cleanup(change);
814                 return;
815         }
816
817         stasis_publish(topic, msg);
818
819         /* Now we have to dispatch to the subscription itself */
820         dispatch_message(sub, msg, 0);
821
822         ao2_cleanup(msg);
823         ao2_cleanup(change);
824 }
825
826 struct topic_pool_entry {
827         struct stasis_forward *forward;
828         struct stasis_topic *topic;
829 };
830
831 static void topic_pool_entry_dtor(void *obj)
832 {
833         struct topic_pool_entry *entry = obj;
834
835         entry->forward = stasis_forward_cancel(entry->forward);
836         ao2_cleanup(entry->topic);
837         entry->topic = NULL;
838 }
839
840 static struct topic_pool_entry *topic_pool_entry_alloc(void)
841 {
842         return ao2_alloc_options(sizeof(struct topic_pool_entry), topic_pool_entry_dtor,
843                 AO2_ALLOC_OPT_LOCK_NOLOCK);
844 }
845
846 struct stasis_topic_pool {
847         struct ao2_container *pool_container;
848         struct stasis_topic *pool_topic;
849 };
850
851 static void topic_pool_dtor(void *obj)
852 {
853         struct stasis_topic_pool *pool = obj;
854
855         ao2_cleanup(pool->pool_container);
856         pool->pool_container = NULL;
857         ao2_cleanup(pool->pool_topic);
858         pool->pool_topic = NULL;
859 }
860
861 static int topic_pool_entry_hash(const void *obj, const int flags)
862 {
863         const struct topic_pool_entry *object;
864         const char *key;
865
866         switch (flags & OBJ_SEARCH_MASK) {
867         case OBJ_SEARCH_KEY:
868                 key = obj;
869                 break;
870         case OBJ_SEARCH_OBJECT:
871                 object = obj;
872                 key = stasis_topic_name(object->topic);
873                 break;
874         default:
875                 /* Hash can only work on something with a full key. */
876                 ast_assert(0);
877                 return 0;
878         }
879         return ast_str_case_hash(key);
880 }
881
882 static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
883 {
884         const struct topic_pool_entry *object_left = obj;
885         const struct topic_pool_entry *object_right = arg;
886         const char *right_key = arg;
887         int cmp;
888
889         switch (flags & OBJ_SEARCH_MASK) {
890         case OBJ_SEARCH_OBJECT:
891                 right_key = stasis_topic_name(object_right->topic);
892                 /* Fall through */
893         case OBJ_SEARCH_KEY:
894                 cmp = strcasecmp(stasis_topic_name(object_left->topic), right_key);
895                 break;
896         case OBJ_SEARCH_PARTIAL_KEY:
897                 /* Not supported by container */
898                 ast_assert(0);
899                 cmp = -1;
900                 break;
901         default:
902                 /*
903                  * What arg points to is specific to this traversal callback
904                  * and has no special meaning to astobj2.
905                  */
906                 cmp = 0;
907                 break;
908         }
909         if (cmp) {
910                 return 0;
911         }
912         /*
913          * At this point the traversal callback is identical to a sorted
914          * container.
915          */
916         return CMP_MATCH;
917 }
918
919 struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic)
920 {
921         struct stasis_topic_pool *pool;
922
923         pool = ao2_alloc_options(sizeof(*pool), topic_pool_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
924         if (!pool) {
925                 return NULL;
926         }
927
928         pool->pool_container = ao2_container_alloc(TOPIC_POOL_BUCKETS,
929                 topic_pool_entry_hash, topic_pool_entry_cmp);
930         if (!pool->pool_container) {
931                 ao2_cleanup(pool);
932                 return NULL;
933         }
934         ao2_ref(pooled_topic, +1);
935         pool->pool_topic = pooled_topic;
936
937         return pool;
938 }
939
940 struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
941 {
942         RAII_VAR(struct topic_pool_entry *, topic_pool_entry, NULL, ao2_cleanup);
943         SCOPED_AO2LOCK(topic_container_lock, pool->pool_container);
944
945         topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
946         if (topic_pool_entry) {
947                 return topic_pool_entry->topic;
948         }
949
950         topic_pool_entry = topic_pool_entry_alloc();
951         if (!topic_pool_entry) {
952                 return NULL;
953         }
954
955         topic_pool_entry->topic = stasis_topic_create(topic_name);
956         if (!topic_pool_entry->topic) {
957                 return NULL;
958         }
959
960         topic_pool_entry->forward = stasis_forward_all(topic_pool_entry->topic, pool->pool_topic);
961         if (!topic_pool_entry->forward) {
962                 return NULL;
963         }
964
965         if (!ao2_link_flags(pool->pool_container, topic_pool_entry, OBJ_NOLOCK)) {
966                 return NULL;
967         }
968
969         return topic_pool_entry->topic;
970 }
971
972 void stasis_log_bad_type_access(const char *name)
973 {
974         ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name);
975 }
976
977 /*! \brief Cleanup function for graceful shutdowns */
978 static void stasis_cleanup(void)
979 {
980         STASIS_MESSAGE_TYPE_CLEANUP(stasis_subscription_change_type);
981 }
982
983 int stasis_init(void)
984 {
985         int cache_init;
986
987         /* Be sure the types are cleaned up after the message bus */
988         ast_register_cleanup(stasis_cleanup);
989
990         cache_init = stasis_cache_init();
991         if (cache_init != 0) {
992                 return -1;
993         }
994
995         if (STASIS_MESSAGE_TYPE_INIT(stasis_subscription_change_type) != 0) {
996                 return -1;
997         }
998
999         return 0;
1000 }