Multiple revisions 399887,400138,400178,400180-400181
[asterisk/asterisk.git] / main / stasis.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis Message Bus API.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
33
34 #include "asterisk/astobj2.h"
35 #include "asterisk/stasis_internal.h"
36 #include "asterisk/stasis.h"
37 #include "asterisk/taskprocessor.h"
38 #include "asterisk/utils.h"
39 #include "asterisk/uuid.h"
40 #include "asterisk/vector.h"
41
42 /*!
43  * \page stasis-impl Stasis Implementation Notes
44  *
45  * \par Reference counting
46  *
47  * Stasis introduces a number of objects, which are tightly related to one
48  * another. Because we rely on ref-counting for memory management, understanding
49  * these relationships is important to understanding this code.
50  *
51  * \code{.txt}
52  *
53  *   stasis_topic <----> stasis_subscription
54  *             ^          ^
55  *              \        /
56  *               \      /
57  *               dispatch
58  *                  |
59  *                  |
60  *                  v
61  *            stasis_message
62  *                  |
63  *                  |
64  *                  v
65  *          stasis_message_type
66  *
67  * \endcode
68  *
69  * The most troubling thing in this chart is the cyclic reference between
70  * stasis_topic and stasis_subscription. This is both unfortunate, and
71  * necessary. Topics need the subscription in order to dispatch messages;
72  * subscriptions need the topics to unsubscribe and check subscription status.
73  *
74  * The cycle is broken by stasis_unsubscribe(). The unsubscribe will remove the
75  * topic's reference to a subscription. When the subcription is destroyed, it
76  * will remove its reference to the topic.
77  *
78  * This means that until a subscription has be explicitly unsubscribed, it will
79  * not be destroyed. Neither will a topic be destroyed while it has subscribers.
80  * The destructors of both have assertions regarding this to catch ref-counting
81  * problems where a subscription or topic has had an extra ao2_cleanup().
82  *
83  * The \ref dispatch object is a transient object, which is posted to a
84  * subscription's taskprocessor to send a message to the subscriber. They have
85  * short life cycles, allocated on one thread, destroyed on another.
86  *
87  * During shutdown, or the deletion of a domain object, there are a flurry of
88  * ao2_cleanup()s on subscriptions and topics, as the final in-flight messages
89  * are processed. Any one of these cleanups could be the one to actually destroy
90  * a given object, so care must be taken to ensure that an object isn't
91  * referenced after an ao2_cleanup(). This includes the implicit ao2_unlock()
92  * that might happen when a RAII_VAR() goes out of scope.
93  *
94  * \par Typical life cycles
95  *
96  *  \li stasis_topic - There are several topics which live for the duration of
97  *      the Asterisk process (ast_channel_topic_all(), etc.) but most of these
98  *      are actually fed by shorter-lived topics whose lifetime is associated
99  *      with some domain object (like ast_channel_topic() for a given
100  *      ast_channel).
101  *
102  *  \li stasis_subscription - Subscriptions have a similar mix of lifetimes as
103  *      topics, for similar reasons.
104  *
105  *  \li dispatch - Very short lived; just long enough to post a message to a
106  *      subscriber.
107  *
108  *  \li stasis_message - Short to intermediate lifetimes, but that is mostly
109  *      irrelevant. Messages are strictly data and have no behavior associated
110  *      with them, so it doesn't really matter if/when they are destroyed. By
111  *      design, a component could hold a ref to a message forever without any
112  *      ill consequences (aside from consuming more memory).
113  *
114  *  \li stasis_message_type - Long life cycles, typically only destroyed on
115  *      module unloading or _clean_ process exit.
116  *
117  * \par Subscriber shutdown sequencing
118  *
119  * Subscribers are sensitive to shutdown sequencing, specifically in how the
120  * reference message types. This is fully detailed on the wiki at
121  * https://wiki.asterisk.org/wiki/x/K4BqAQ.
122  *
123  * In short, the lifetime of the \a data (and \a callback, if in a module) must
124  * be held until the stasis_subscription_final_message() has been received.
125  * Depending on the structure of the subscriber code, this can be handled by
126  * using stasis_subscription_final_message() to free resources on the final
127  * message, or using stasis_subscription_join()/stasis_unsubscribe_and_join() to
128  * block until the unsubscribe has completed.
129  */
130
131 /*! Initial size of the subscribers list. */
132 #define INITIAL_SUBSCRIBERS_MAX 4
133
134 /*! The number of buckets to use for topic pools */
135 #define TOPIC_POOL_BUCKETS 57
136
137 STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type);
138
139 /*! \internal */
140 struct stasis_topic {
141         char *name;
142         /*! Variable length array of the subscribers */
143         ast_vector(struct stasis_subscription *) subscribers;
144
145         /*! Topics forwarding into this topic */
146         ast_vector(struct stasis_topic *) upstream_topics;
147 };
148
149 /* Forward declarations for the tightly-coupled subscription object */
150 static int topic_add_subscription(struct stasis_topic *topic,
151         struct stasis_subscription *sub);
152
153 static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
154
155 static void topic_dtor(void *obj)
156 {
157         struct stasis_topic *topic = obj;
158
159         /* Subscribers hold a reference to topics, so they should all be
160          * unsubscribed before we get here. */
161         ast_assert(ast_vector_size(topic->subscribers) == 0);
162         ast_free(topic->name);
163         topic->name = NULL;
164
165         ast_vector_free(topic->subscribers);
166         ast_vector_free(topic->upstream_topics);
167 }
168
169 struct stasis_topic *stasis_topic_create(const char *name)
170 {
171         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
172         int res = 0;
173
174         topic = ao2_alloc(sizeof(*topic), topic_dtor);
175
176         if (!topic) {
177                 return NULL;
178         }
179
180         topic->name = ast_strdup(name);
181         if (!topic->name) {
182                 return NULL;
183         }
184
185         res |= ast_vector_init(topic->subscribers, INITIAL_SUBSCRIBERS_MAX);
186         res |= ast_vector_init(topic->upstream_topics, 0);
187
188         if (res != 0) {
189                 return NULL;
190         }
191
192         ao2_ref(topic, +1);
193         return topic;
194 }
195
196 const char *stasis_topic_name(const struct stasis_topic *topic)
197 {
198         return topic->name;
199 }
200
201 /*! \internal */
202 struct stasis_subscription {
203         /*! Unique ID for this subscription */
204         char uniqueid[AST_UUID_STR_LEN];
205         /*! Topic subscribed to. */
206         struct stasis_topic *topic;
207         /*! Mailbox for processing incoming messages. */
208         struct ast_taskprocessor *mailbox;
209         /*! Callback function for incoming message processing. */
210         stasis_subscription_cb callback;
211         /*! Data pointer to be handed to the callback. */
212         void *data;
213
214         /*! Lock for completion flags \c final_message_{rxed,processed}. */
215         ast_mutex_t join_lock;
216         /*! Condition for joining with subscription. */
217         ast_cond_t join_cond;
218         /*! Flag set when final message for sub has been received.
219          *  Be sure join_lock is held before reading/setting. */
220         int final_message_rxed;
221         /*! Flag set when final message for sub has been processed.
222          *  Be sure join_lock is held before reading/setting. */
223         int final_message_processed;
224 };
225
226 static void subscription_dtor(void *obj)
227 {
228         struct stasis_subscription *sub = obj;
229
230         /* Subscriptions need to be manually unsubscribed before destruction
231          * b/c there's a cyclic reference between topics and subscriptions */
232         ast_assert(!stasis_subscription_is_subscribed(sub));
233         /* If there are any messages in flight to this subscription; that would
234          * be bad. */
235         ast_assert(stasis_subscription_is_done(sub));
236
237         ao2_cleanup(sub->topic);
238         sub->topic = NULL;
239         ast_taskprocessor_unreference(sub->mailbox);
240         sub->mailbox = NULL;
241         ast_mutex_destroy(&sub->join_lock);
242         ast_cond_destroy(&sub->join_cond);
243 }
244
245 /*!
246  * \brief Invoke the subscription's callback.
247  * \param sub Subscription to invoke.
248  * \param topic Topic message was published to.
249  * \param message Message to send.
250  */
251 static void subscription_invoke(struct stasis_subscription *sub,
252                                   struct stasis_message *message)
253 {
254         /* Notify that the final message has been received */
255         if (stasis_subscription_final_message(sub, message)) {
256                 SCOPED_MUTEX(lock, &sub->join_lock);
257                 sub->final_message_rxed = 1;
258                 ast_cond_signal(&sub->join_cond);
259         }
260
261         /* Since sub is mostly immutable, no need to lock sub */
262         sub->callback(sub->data, sub, message);
263
264         /* Notify that the final message has been processed */
265         if (stasis_subscription_final_message(sub, message)) {
266                 SCOPED_MUTEX(lock, &sub->join_lock);
267                 sub->final_message_processed = 1;
268                 ast_cond_signal(&sub->join_cond);
269         }
270 }
271
272 static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
273 static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
274
275 struct stasis_subscription *internal_stasis_subscribe(
276         struct stasis_topic *topic,
277         stasis_subscription_cb callback,
278         void *data,
279         int needs_mailbox)
280 {
281         RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup);
282
283         sub = ao2_alloc(sizeof(*sub), subscription_dtor);
284         if (!sub) {
285                 return NULL;
286         }
287
288         ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid));
289
290         if (needs_mailbox) {
291                 /* With a small number of subscribers, a thread-per-sub is
292                  * acceptable. If our usage changes so that we have larger
293                  * numbers of subscribers, we'll probably want to consider
294                  * a threadpool. We had that originally, but with so few
295                  * subscribers it was actually a performance loss instead of
296                  * a gain.
297                  */
298                 sub->mailbox = ast_taskprocessor_get(sub->uniqueid,
299                         TPS_REF_DEFAULT);
300                 if (!sub->mailbox) {
301                         return NULL;
302                 }
303                 ast_taskprocessor_set_local(sub->mailbox, sub);
304                 /* Taskprocessor has a reference */
305                 ao2_ref(sub, +1);
306         }
307
308         ao2_ref(topic, +1);
309         sub->topic = topic;
310         sub->callback = callback;
311         sub->data = data;
312         ast_mutex_init(&sub->join_lock);
313         ast_cond_init(&sub->join_cond, NULL);
314
315         if (topic_add_subscription(topic, sub) != 0) {
316                 return NULL;
317         }
318         send_subscription_subscribe(topic, sub);
319
320         ao2_ref(sub, +1);
321         return sub;
322 }
323
324 struct stasis_subscription *stasis_subscribe(
325         struct stasis_topic *topic,
326         stasis_subscription_cb callback,
327         void *data)
328 {
329         return internal_stasis_subscribe(topic, callback, data, 1);
330 }
331
332 static int sub_cleanup(void *data)
333 {
334         struct stasis_subscription *sub = data;
335         ao2_cleanup(sub);
336         return 0;
337 }
338
339 struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
340 {
341         /* The subscription may be the last ref to this topic. Hold
342          * the topic ref open until after the unlock. */
343         RAII_VAR(struct stasis_topic *, topic,
344                 ao2_bump(sub ? sub->topic : NULL), ao2_cleanup);
345
346         if (!sub) {
347                 return NULL;
348         }
349
350         /* We have to remove the subscription first, to ensure the unsubscribe
351          * is the final message */
352         if (topic_remove_subscription(sub->topic, sub) != 0) {
353                 ast_log(LOG_ERROR,
354                         "Internal error: subscription has invalid topic\n");
355                 return NULL;
356         }
357
358         /* Now let everyone know about the unsubscribe */
359         send_subscription_unsubscribe(topic, sub);
360
361         /* When all that's done, remove the ref the mailbox has on the sub */
362         if (sub->mailbox) {
363                 ast_taskprocessor_push(sub->mailbox, sub_cleanup, sub);
364         }
365
366         /* Unsubscribing unrefs the subscription */
367         ao2_cleanup(sub);
368         return NULL;
369 }
370
371 void stasis_subscription_join(struct stasis_subscription *subscription)
372 {
373         if (subscription) {
374                 SCOPED_MUTEX(lock, &subscription->join_lock);
375                 /* Wait until the processed flag has been set */
376                 while (!subscription->final_message_processed) {
377                         ast_cond_wait(&subscription->join_cond,
378                                 &subscription->join_lock);
379                 }
380         }
381 }
382
383 int stasis_subscription_is_done(struct stasis_subscription *subscription)
384 {
385         if (subscription) {
386                 SCOPED_MUTEX(lock, &subscription->join_lock);
387                 return subscription->final_message_rxed;
388         }
389
390         /* Null subscription is about as done as you can get */
391         return 1;
392 }
393
394 struct stasis_subscription *stasis_unsubscribe_and_join(
395         struct stasis_subscription *subscription)
396 {
397         if (!subscription) {
398                 return NULL;
399         }
400
401         /* Bump refcount to hold it past the unsubscribe */
402         ao2_ref(subscription, +1);
403         stasis_unsubscribe(subscription);
404         stasis_subscription_join(subscription);
405         /* Now decrement the refcount back */
406         ao2_cleanup(subscription);
407         return NULL;
408 }
409
410 int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
411 {
412         if (sub) {
413                 size_t i;
414                 struct stasis_topic *topic = sub->topic;
415                 SCOPED_AO2LOCK(lock_topic, topic);
416
417                 for (i = 0; i < ast_vector_size(topic->subscribers); ++i) {
418                         if (ast_vector_get(topic->subscribers, i) == sub) {
419                                 return 1;
420                         }
421                 }
422         }
423
424         return 0;
425 }
426
427 const char *stasis_subscription_uniqueid(const struct stasis_subscription *sub)
428 {
429         return sub->uniqueid;
430 }
431
432 int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
433 {
434         struct stasis_subscription_change *change;
435         if (stasis_message_type(msg) != stasis_subscription_change_type()) {
436                 return 0;
437         }
438
439         change = stasis_message_data(msg);
440         if (strcmp("Unsubscribe", change->description)) {
441                 return 0;
442         }
443
444         if (strcmp(stasis_subscription_uniqueid(sub), change->uniqueid)) {
445                 return 0;
446         }
447
448         return 1;
449 }
450
451 /*!
452  * \brief Add a subscriber to a topic.
453  * \param topic Topic
454  * \param sub Subscriber
455  * \return 0 on success
456  * \return Non-zero on error
457  */
458 static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
459 {
460         size_t idx;
461         SCOPED_AO2LOCK(lock, topic);
462
463         /* The reference from the topic to the subscription is shared with
464          * the owner of the subscription, which will explicitly unsubscribe
465          * to release it.
466          *
467          * If we bumped the refcount here, the owner would have to unsubscribe
468          * and cleanup, which is a bit awkward. */
469         ast_vector_append(topic->subscribers, sub);
470
471         for (idx = 0; idx < ast_vector_size(topic->upstream_topics); ++idx) {
472                 topic_add_subscription(
473                         ast_vector_get(topic->upstream_topics, idx), sub);
474         }
475
476         return 0;
477 }
478
479 static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
480 {
481         size_t idx;
482         SCOPED_AO2LOCK(lock_topic, topic);
483
484         for (idx = 0; idx < ast_vector_size(topic->upstream_topics); ++idx) {
485                 topic_remove_subscription(
486                         ast_vector_get(topic->upstream_topics, idx), sub);
487         }
488
489         return ast_vector_remove_elem_unordered(topic->subscribers, sub);
490 }
491
492 /*!
493  * \brief Dispatch a message to a subscriber
494  * \param data \ref dispatch object
495  * \return 0
496  */
497 static int dispatch_exec(struct ast_taskprocessor_local *local)
498 {
499         struct stasis_subscription *sub = local->local_data;
500         struct stasis_message *message = local->data;
501
502         subscription_invoke(sub, message);
503         ao2_cleanup(message);
504
505         return 0;
506 }
507
508 static void dispatch_message(struct stasis_subscription *sub,
509         struct stasis_message *message)
510 {
511         if (sub->mailbox) {
512                 ao2_bump(message);
513                 if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec, message) != 0) {
514                         /* Push failed; ugh. */
515                         ast_log(LOG_DEBUG, "Dropping dispatch\n");
516                         ao2_cleanup(message);
517                 }
518         } else {
519                 /* Dispatch directly */
520                 subscription_invoke(sub, message);
521         }
522 }
523
524 void stasis_publish(struct stasis_topic *_topic, struct stasis_message *message)
525 {
526         size_t i;
527         /* The topic may be unref'ed by the subscription invocation.
528          * Make sure we hold onto a reference while dispatching. */
529         RAII_VAR(struct stasis_topic *, topic, ao2_bump(_topic),
530                 ao2_cleanup);
531         SCOPED_AO2LOCK(lock, topic);
532
533         ast_assert(topic != NULL);
534         ast_assert(message != NULL);
535
536         for (i = 0; i < ast_vector_size(topic->subscribers); ++i) {
537                 struct stasis_subscription *sub =
538                         ast_vector_get(topic->subscribers, i);
539
540                 ast_assert(sub != NULL);
541
542                 dispatch_message(sub, message);
543         }
544 }
545
546 /*!
547  * \brief Forwarding information
548  *
549  * Any message posted to \a from_topic is forwarded to \a to_topic.
550  *
551  * In cases where both the \a from_topic and \a to_topic need to be locked,
552  * always lock the \a to_topic first, then the \a from_topic. Lest you deadlock.
553  */
554 struct stasis_forward {
555         /*! Originating topic */
556         struct stasis_topic *from_topic;
557         /*! Destination topic */
558         struct stasis_topic *to_topic;
559 };
560
561 static void forward_dtor(void *obj)
562 {
563         struct stasis_forward *forward = obj;
564
565         ao2_cleanup(forward->from_topic);
566         forward->from_topic = NULL;
567         ao2_cleanup(forward->to_topic);
568         forward->to_topic = NULL;
569 }
570
571 struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward)
572 {
573         if (forward) {
574                 int idx;
575
576                 struct stasis_topic *from = forward->from_topic;
577                 struct stasis_topic *to = forward->to_topic;
578
579                 SCOPED_AO2LOCK(to_lock, to);
580
581                 ast_vector_remove_elem_unordered(to->upstream_topics, from);
582
583                 ao2_lock(from);
584                 for (idx = 0; idx < ast_vector_size(to->subscribers); ++idx) {
585                         topic_remove_subscription(
586                                 from, ast_vector_get(to->subscribers, idx));
587                 }
588                 ao2_unlock(from);
589         }
590
591         ao2_cleanup(forward);
592
593         return NULL;
594 }
595
596 struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic,
597         struct stasis_topic *to_topic)
598 {
599         RAII_VAR(struct stasis_forward *, forward, NULL, ao2_cleanup);
600
601         if (!from_topic || !to_topic) {
602                 return NULL;
603         }
604
605         forward = ao2_alloc(sizeof(*forward), forward_dtor);
606         if (!forward) {
607                 return NULL;
608         }
609
610         forward->from_topic = ao2_bump(from_topic);
611         forward->to_topic = ao2_bump(to_topic);
612
613         {
614                 SCOPED_AO2LOCK(lock, to_topic);
615                 int res;
616
617                 res = ast_vector_append(to_topic->upstream_topics, from_topic);
618                 if (res != 0) {
619                         return NULL;
620                 }
621
622                 {
623                         SCOPED_AO2LOCK(lock, from_topic);
624                         size_t idx;
625                         for (idx = 0; idx < ast_vector_size(to_topic->subscribers); ++idx) {
626                                 topic_add_subscription(from_topic, ast_vector_get(to_topic->subscribers, idx));
627                         }
628                 }
629         }
630
631         return ao2_bump(forward);
632 }
633
634 static void subscription_change_dtor(void *obj)
635 {
636         struct stasis_subscription_change *change = obj;
637         ast_string_field_free_memory(change);
638         ao2_cleanup(change->topic);
639 }
640
641 static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, const char *uniqueid, const char *description)
642 {
643         RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
644
645         change = ao2_alloc(sizeof(struct stasis_subscription_change), subscription_change_dtor);
646         if (ast_string_field_init(change, 128)) {
647                 return NULL;
648         }
649
650         ast_string_field_set(change, uniqueid, uniqueid);
651         ast_string_field_set(change, description, description);
652         ao2_ref(topic, +1);
653         change->topic = topic;
654
655         ao2_ref(change, +1);
656         return change;
657 }
658
659 static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
660 {
661         RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
662         RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
663
664         /* This assumes that we have already unsubscribed */
665         ast_assert(stasis_subscription_is_subscribed(sub));
666
667         change = subscription_change_alloc(topic, sub->uniqueid, "Subscribe");
668
669         if (!change) {
670                 return;
671         }
672
673         msg = stasis_message_create(stasis_subscription_change_type(), change);
674
675         if (!msg) {
676                 return;
677         }
678
679         stasis_publish(topic, msg);
680 }
681
682 static void send_subscription_unsubscribe(struct stasis_topic *topic,
683         struct stasis_subscription *sub)
684 {
685         RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
686         RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
687
688         /* This assumes that we have already unsubscribed */
689         ast_assert(!stasis_subscription_is_subscribed(sub));
690
691         change = subscription_change_alloc(topic, sub->uniqueid, "Unsubscribe");
692
693         if (!change) {
694                 return;
695         }
696
697         msg = stasis_message_create(stasis_subscription_change_type(), change);
698
699         if (!msg) {
700                 return;
701         }
702
703         stasis_publish(topic, msg);
704
705         /* Now we have to dispatch to the subscription itself */
706         dispatch_message(sub, msg);
707 }
708
709 struct topic_pool_entry {
710         struct stasis_forward *forward;
711         struct stasis_topic *topic;
712 };
713
714 static void topic_pool_entry_dtor(void *obj)
715 {
716         struct topic_pool_entry *entry = obj;
717         entry->forward = stasis_forward_cancel(entry->forward);
718         ao2_cleanup(entry->topic);
719         entry->topic = NULL;
720 }
721
722 static struct topic_pool_entry *topic_pool_entry_alloc(void)
723 {
724         return ao2_alloc(sizeof(struct topic_pool_entry), topic_pool_entry_dtor);
725 }
726
727 struct stasis_topic_pool {
728         struct ao2_container *pool_container;
729         struct stasis_topic *pool_topic;
730 };
731
732 static void topic_pool_dtor(void *obj)
733 {
734         struct stasis_topic_pool *pool = obj;
735         ao2_cleanup(pool->pool_container);
736         pool->pool_container = NULL;
737         ao2_cleanup(pool->pool_topic);
738         pool->pool_topic = NULL;
739 }
740
741 static int topic_pool_entry_hash(const void *obj, const int flags)
742 {
743         const char *topic_name = (flags & OBJ_KEY) ? obj : stasis_topic_name(((struct topic_pool_entry*) obj)->topic);
744         return ast_str_case_hash(topic_name);
745 }
746
747 static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
748 {
749         struct topic_pool_entry *opt1 = obj, *opt2 = arg;
750         const char *topic_name = (flags & OBJ_KEY) ? arg : stasis_topic_name(opt2->topic);
751         return strcasecmp(stasis_topic_name(opt1->topic), topic_name) ? 0 : CMP_MATCH | CMP_STOP;
752 }
753
754 struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic)
755 {
756         RAII_VAR(struct stasis_topic_pool *, pool, ao2_alloc(sizeof(*pool), topic_pool_dtor), ao2_cleanup);
757         if (!pool) {
758                 return NULL;
759         }
760         pool->pool_container = ao2_container_alloc(TOPIC_POOL_BUCKETS, topic_pool_entry_hash, topic_pool_entry_cmp);
761         ao2_ref(pooled_topic, +1);
762         pool->pool_topic = pooled_topic;
763
764         ao2_ref(pool, +1);
765         return pool;
766 }
767
768 struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
769 {
770         RAII_VAR(struct topic_pool_entry *, topic_pool_entry, NULL, ao2_cleanup);
771         SCOPED_AO2LOCK(topic_container_lock, pool->pool_container);
772         topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_KEY | OBJ_NOLOCK);
773
774         if (topic_pool_entry) {
775                 return topic_pool_entry->topic;
776         }
777
778         topic_pool_entry = topic_pool_entry_alloc();
779
780         if (!topic_pool_entry) {
781                 return NULL;
782         }
783
784         topic_pool_entry->topic = stasis_topic_create(topic_name);
785         if (!topic_pool_entry->topic) {
786                 return NULL;
787         }
788
789         topic_pool_entry->forward = stasis_forward_all(topic_pool_entry->topic, pool->pool_topic);
790         if (!topic_pool_entry->forward) {
791                 return NULL;
792         }
793
794         ao2_link_flags(pool->pool_container, topic_pool_entry, OBJ_NOLOCK);
795
796         return topic_pool_entry->topic;
797 }
798
799 void stasis_log_bad_type_access(const char *name)
800 {
801         ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name);
802 }
803
804 /*! \brief Cleanup function for graceful shutdowns */
805 static void stasis_cleanup(void)
806 {
807         STASIS_MESSAGE_TYPE_CLEANUP(stasis_subscription_change_type);
808 }
809
810 int stasis_init(void)
811 {
812         int cache_init;
813
814         /* Be sure the types are cleaned up after the message bus */
815         ast_register_cleanup(stasis_cleanup);
816
817         if (stasis_wait_init() != 0) {
818                 ast_log(LOG_ERROR, "Stasis initialization failed\n");
819                 return -1;
820         }
821
822         cache_init = stasis_cache_init();
823         if (cache_init != 0) {
824                 return -1;
825         }
826
827         if (STASIS_MESSAGE_TYPE_INIT(stasis_subscription_change_type) != 0) {
828                 return -1;
829         }
830
831         return 0;
832 }