Stasis: address refcount races; implementation comments
[asterisk/asterisk.git] / main / stasis.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis Message Bus API.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
33
34 #include "asterisk/astobj2.h"
35 #include "asterisk/stasis_internal.h"
36 #include "asterisk/stasis.h"
37 #include "asterisk/threadpool.h"
38 #include "asterisk/taskprocessor.h"
39 #include "asterisk/utils.h"
40 #include "asterisk/uuid.h"
41
42 /*!
43  * \page stasis-impl Stasis Implementation Notes
44  *
45  * \par Reference counting
46  *
47  * Stasis introduces a number of objects, which are tightly related to one
48  * another. Because we rely on ref-counting for memory management, understanding
49  * these relationships is important to understanding this code.
50  *
51  * \code{.txt}
52  *
53  *   stasis_topic <----> stasis_subscription
54  *             ^          ^
55  *              \        /
56  *               \      /
57  *               dispatch
58  *                  |
59  *                  |
60  *                  v
61  *            stasis_message
62  *                  |
63  *                  |
64  *                  v
65  *          stasis_message_type
66  *
67  * \endcode
68  *
69  * The most troubling thing in this chart is the cyclic reference between
70  * stasis_topic and stasis_subscription. This is both unfortunate, and
71  * necessary. Topics need the subscription in order to dispatch messages;
72  * subscriptions need the topics to unsubscribe and check subscription status.
73  *
74  * The cycle is broken by stasis_unsubscribe(). The unsubscribe will remove the
75  * topic's reference to a subscription. When the subcription is destroyed, it
76  * will remove its reference to the topic.
77  *
78  * This means that until a subscription has be explicitly unsubscribed, it will
79  * not be destroyed. Neither will a topic be destroyed while it has subscribers.
80  * The destructors of both have assertions regarding this to catch ref-counting
81  * problems where a subscription or topic has had an extra ao2_cleanup().
82  *
83  * The \ref dispatch object is a transient object, which is posted to a
84  * subscription's taskprocessor to send a message to the subscriber. They have
85  * short life cycles, allocated on one thread, destroyed on another.
86  *
87  * During shutdown, or the deletion of a domain object, there are a flurry of
88  * ao2_cleanup()s on subscriptions and topics, as the final in-flight messages
89  * are processed. Any one of these cleanups could be the one to actually destroy
90  * a given object, so care must be taken to ensure that an object isn't
91  * referenced after an ao2_cleanup(). This includes the implicit ao2_unlock()
92  * that might happen when a RAII_VAR() goes out of scope.
93  *
94  * \par Typical life cycles
95  *
96  *  \li stasis_topic - There are several topics which live for the duration of
97  *      the Asterisk process (ast_channel_topic_all(), etc.) but most of these
98  *      are actually fed by shorter-lived topics whose lifetime is associated
99  *      with some domain object (like ast_channel_topic() for a given
100  *      ast_channel).
101  *
102  *  \li stasis_subscription - Subscriptions have a similar mix of lifetimes as
103  *      topics, for similar reasons.
104  *
105  *  \li dispatch - Very short lived; just long enough to post a message to a
106  *      subscriber.
107  *
108  *  \li stasis_message - Short to intermediate lifetimes, but that is mostly
109  *      irrelevant. Messages are strictly data and have no behavior associated
110  *      with them, so it doesn't really matter if/when they are destroyed. By
111  *      design, a component could hold a ref to a message forever without any
112  *      ill consequences (aside from consuming more memory).
113  *
114  *  \li stasis_message_type - Long life cycles, typically only destroyed on
115  *      module unloading or _clean_ process exit.
116  *
117  * \par Subscriber shutdown sequencing
118  *
119  * Subscribers are sensitive to shutdown sequencing, specifically in how the
120  * reference message types. This is fully detailed on the wiki at
121  * https://wiki.asterisk.org/wiki/x/K4BqAQ.
122  *
123  * In short, the lifetime of the \a data (and \a callback, if in a module) must
124  * be held until the stasis_subscription_final_message() has been received.
125  * Depending on the structure of the subscriber code, this can be handled by
126  * using stasis_subscription_final_message() to free resources on the final
127  * message, or using stasis_subscription_join()/stasis_unsubscribe_and_join() to
128  * block until the unsubscribe has completed.
129  */
130
131 /*! Initial size of the subscribers list. */
132 #define INITIAL_SUBSCRIBERS_MAX 4
133
134 /*! The number of buckets to use for topic pools */
135 #define TOPIC_POOL_BUCKETS 57
136
137 /*! Threadpool for dispatching notifications to subscribers */
138 static struct ast_threadpool *pool;
139
140 STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type);
141
142 /*! \internal */
143 struct stasis_topic {
144         char *name;
145         /*! Variable length array of the subscribers */
146         struct stasis_subscription **subscribers;
147         /*! Allocated length of the subscribers array */
148         size_t num_subscribers_max;
149         /*! Current size of the subscribers array */
150         size_t num_subscribers_current;
151 };
152
153 /* Forward declarations for the tightly-coupled subscription object */
154 static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
155
156 static void topic_dtor(void *obj)
157 {
158         struct stasis_topic *topic = obj;
159
160         /* Subscribers hold a reference to topics, so they should all be
161          * unsubscribed before we get here. */
162         ast_assert(topic->num_subscribers_current == 0);
163         ast_free(topic->name);
164         topic->name = NULL;
165         ast_free(topic->subscribers);
166         topic->subscribers = NULL;
167 }
168
169 struct stasis_topic *stasis_topic_create(const char *name)
170 {
171         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
172
173         topic = ao2_alloc(sizeof(*topic), topic_dtor);
174
175         if (!topic) {
176                 return NULL;
177         }
178
179         topic->name = ast_strdup(name);
180         if (!topic->name) {
181                 return NULL;
182         }
183
184         topic->num_subscribers_max = INITIAL_SUBSCRIBERS_MAX;
185         topic->subscribers = ast_calloc(topic->num_subscribers_max, sizeof(*topic->subscribers));
186         if (!topic->subscribers) {
187                 return NULL;
188         }
189
190         ao2_ref(topic, +1);
191         return topic;
192 }
193
194 const char *stasis_topic_name(const struct stasis_topic *topic)
195 {
196         return topic->name;
197 }
198
199 /*! \internal */
200 struct stasis_subscription {
201         /*! Unique ID for this subscription */
202         char uniqueid[AST_UUID_STR_LEN];
203         /*! Topic subscribed to. */
204         struct stasis_topic *topic;
205         /*! Mailbox for processing incoming messages. */
206         struct ast_taskprocessor *mailbox;
207         /*! Callback function for incoming message processing. */
208         stasis_subscription_cb callback;
209         /*! Data pointer to be handed to the callback. */
210         void *data;
211
212         /*! Lock for completion flags \c final_message_{rxed,processed}. */
213         ast_mutex_t join_lock;
214         /*! Condition for joining with subscription. */
215         ast_cond_t join_cond;
216         /*! Flag set when final message for sub has been received.
217          *  Be sure join_lock is held before reading/setting. */
218         int final_message_rxed;
219         /*! Flag set when final message for sub has been processed.
220          *  Be sure join_lock is held before reading/setting. */
221         int final_message_processed;
222 };
223
224 static void subscription_dtor(void *obj)
225 {
226         struct stasis_subscription *sub = obj;
227
228         /* Subscriptions need to be manually unsubscribed before destruction
229          * b/c there's a cyclic reference between topics and subscriptions */
230         ast_assert(!stasis_subscription_is_subscribed(sub));
231         /* If there are any messages in flight to this subscription; that would
232          * be bad. */
233         ast_assert(stasis_subscription_is_done(sub));
234
235         ao2_cleanup(sub->topic);
236         sub->topic = NULL;
237         ast_taskprocessor_unreference(sub->mailbox);
238         sub->mailbox = NULL;
239         ast_mutex_destroy(&sub->join_lock);
240         ast_cond_destroy(&sub->join_cond);
241 }
242
243 /*!
244  * \brief Invoke the subscription's callback.
245  * \param sub Subscription to invoke.
246  * \param topic Topic message was published to.
247  * \param message Message to send.
248  */
249 static void subscription_invoke(struct stasis_subscription *sub,
250                                   struct stasis_topic *topic,
251                                   struct stasis_message *message)
252 {
253         /* Notify that the final message has been received */
254         if (stasis_subscription_final_message(sub, message)) {
255                 SCOPED_MUTEX(lock, &sub->join_lock);
256                 sub->final_message_rxed = 1;
257                 ast_cond_signal(&sub->join_cond);
258         }
259
260         /* Since sub is mostly immutable, no need to lock sub */
261         sub->callback(sub->data, sub, topic, message);
262
263         /* Notify that the final message has been processed */
264         if (stasis_subscription_final_message(sub, message)) {
265                 SCOPED_MUTEX(lock, &sub->join_lock);
266                 sub->final_message_processed = 1;
267                 ast_cond_signal(&sub->join_cond);
268         }
269 }
270
271 static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description);
272
273 struct stasis_subscription *internal_stasis_subscribe(
274         struct stasis_topic *topic,
275         stasis_subscription_cb callback,
276         void *data,
277         int needs_mailbox)
278 {
279         RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup);
280
281         sub = ao2_alloc(sizeof(*sub), subscription_dtor);
282         if (!sub) {
283                 return NULL;
284         }
285
286         ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid));
287
288         if (needs_mailbox) {
289                 sub->mailbox = ast_threadpool_serializer(sub->uniqueid, pool);
290                 if (!sub->mailbox) {
291                         return NULL;
292                 }
293         }
294
295         ao2_ref(topic, +1);
296         sub->topic = topic;
297         sub->callback = callback;
298         sub->data = data;
299         ast_mutex_init(&sub->join_lock);
300         ast_cond_init(&sub->join_cond, NULL);
301
302         if (topic_add_subscription(topic, sub) != 0) {
303                 return NULL;
304         }
305         send_subscription_change_message(topic, sub->uniqueid, "Subscribe");
306
307         ao2_ref(sub, +1);
308         return sub;
309 }
310
311 struct stasis_subscription *stasis_subscribe(
312         struct stasis_topic *topic,
313         stasis_subscription_cb callback,
314         void *data)
315 {
316         return internal_stasis_subscribe(topic, callback, data, 1);
317 }
318
319 struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
320 {
321         if (sub) {
322                 size_t i;
323                 /* The subscription may be the last ref to this topic. Hold
324                  * the topic ref open until after the unlock. */
325                 RAII_VAR(struct stasis_topic *, topic, ao2_bump(sub->topic),
326                         ao2_cleanup);
327                 SCOPED_AO2LOCK(lock_topic, topic);
328
329                 for (i = 0; i < topic->num_subscribers_current; ++i) {
330                         if (topic->subscribers[i] == sub) {
331                                 send_subscription_change_message(topic, sub->uniqueid, "Unsubscribe");
332                                 /* swap [i] with last entry; remove last entry */
333                                 topic->subscribers[i] = topic->subscribers[--topic->num_subscribers_current];
334                                 /* Unsubscribing unrefs the subscription */
335                                 ao2_cleanup(sub);
336                                 return NULL;
337                         }
338                 }
339
340                 ast_log(LOG_ERROR, "Internal error: subscription has invalid topic\n");
341         }
342         return NULL;
343 }
344
345 void stasis_subscription_join(struct stasis_subscription *subscription)
346 {
347         if (subscription) {
348                 SCOPED_MUTEX(lock, &subscription->join_lock);
349                 /* Wait until the processed flag has been set */
350                 while (!subscription->final_message_processed) {
351                         ast_cond_wait(&subscription->join_cond,
352                                 &subscription->join_lock);
353                 }
354         }
355 }
356
357 int stasis_subscription_is_done(struct stasis_subscription *subscription)
358 {
359         if (subscription) {
360                 SCOPED_MUTEX(lock, &subscription->join_lock);
361                 return subscription->final_message_rxed;
362         }
363
364         /* Null subscription is about as done as you can get */
365         return 1;
366 }
367
368 struct stasis_subscription *stasis_unsubscribe_and_join(
369         struct stasis_subscription *subscription)
370 {
371         if (!subscription) {
372                 return NULL;
373         }
374
375         /* Bump refcount to hold it past the unsubscribe */
376         ao2_ref(subscription, +1);
377         stasis_unsubscribe(subscription);
378         stasis_subscription_join(subscription);
379         /* Now decrement the refcount back */
380         ao2_cleanup(subscription);
381         return NULL;
382 }
383
384 int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
385 {
386         if (sub) {
387                 size_t i;
388                 struct stasis_topic *topic = sub->topic;
389                 SCOPED_AO2LOCK(lock_topic, topic);
390
391                 for (i = 0; i < topic->num_subscribers_current; ++i) {
392                         if (topic->subscribers[i] == sub) {
393                                 return 1;
394                         }
395                 }
396         }
397
398         return 0;
399 }
400
401 const char *stasis_subscription_uniqueid(const struct stasis_subscription *sub)
402 {
403         return sub->uniqueid;
404 }
405
406 int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
407 {
408         struct stasis_subscription_change *change;
409         if (stasis_message_type(msg) != stasis_subscription_change_type()) {
410                 return 0;
411         }
412
413         change = stasis_message_data(msg);
414         if (strcmp("Unsubscribe", change->description)) {
415                 return 0;
416         }
417
418         if (strcmp(stasis_subscription_uniqueid(sub), change->uniqueid)) {
419                 return 0;
420         }
421
422         return 1;
423 }
424
425 /*!
426  * \brief Add a subscriber to a topic.
427  * \param topic Topic
428  * \param sub Subscriber
429  * \return 0 on success
430  * \return Non-zero on error
431  */
432 static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
433 {
434         struct stasis_subscription **subscribers;
435         SCOPED_AO2LOCK(lock, topic);
436
437         /* Increase list size, if needed */
438         if (topic->num_subscribers_current + 1 > topic->num_subscribers_max) {
439                 subscribers = realloc(topic->subscribers, 2 * topic->num_subscribers_max * sizeof(*subscribers));
440                 if (!subscribers) {
441                         return -1;
442                 }
443                 topic->subscribers = subscribers;
444                 topic->num_subscribers_max *= 2;
445         }
446
447         /* The reference from the topic to the subscription is shared with
448          * the owner of the subscription, which will explicitly unsubscribe
449          * to release it.
450          *
451          * If we bumped the refcount here, the owner would have to unsubscribe
452          * and cleanup, which is a bit awkward. */
453         topic->subscribers[topic->num_subscribers_current++] = sub;
454         return 0;
455 }
456
457 /*!
458  * \internal
459  * \brief Information needed to dispatch a message to a subscription
460  */
461 struct dispatch {
462         /*! Topic message was published to */
463         struct stasis_topic *topic;
464         /*! The message itself */
465         struct stasis_message *message;
466         /*! Subscription receiving the message */
467         struct stasis_subscription *sub;
468 };
469
470 static void dispatch_dtor(void *data)
471 {
472         struct dispatch *dispatch = data;
473         ao2_cleanup(dispatch->topic);
474         ao2_cleanup(dispatch->message);
475         ao2_cleanup(dispatch->sub);
476 }
477
478 static struct dispatch *dispatch_create(struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sub)
479 {
480         RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
481
482         ast_assert(topic != NULL);
483         ast_assert(message != NULL);
484         ast_assert(sub != NULL);
485
486         dispatch = ao2_alloc(sizeof(*dispatch), dispatch_dtor);
487         if (!dispatch) {
488                 return NULL;
489         }
490
491         dispatch->topic = topic;
492         ao2_ref(topic, +1);
493
494         dispatch->message = message;
495         ao2_ref(message, +1);
496
497         dispatch->sub = sub;
498         ao2_ref(sub, +1);
499
500         ao2_ref(dispatch, +1);
501         return dispatch;
502 }
503
504 /*!
505  * \brief Dispatch a message to a subscriber
506  * \param data \ref dispatch object
507  * \return 0
508  */
509 static int dispatch_exec(void *data)
510 {
511         RAII_VAR(struct dispatch *, dispatch, data, ao2_cleanup);
512
513         subscription_invoke(dispatch->sub, dispatch->topic, dispatch->message);
514
515         return 0;
516 }
517
518 void stasis_forward_message(struct stasis_topic *_topic, struct stasis_topic *publisher_topic, struct stasis_message *message)
519 {
520         size_t i;
521         /* The topic may be unref'ed by the subscription invocation.
522          * Make sure we hold onto a reference while dispatching. */
523         RAII_VAR(struct stasis_topic *, topic, ao2_bump(_topic),
524                 ao2_cleanup);
525         SCOPED_AO2LOCK(lock, topic);
526
527         ast_assert(topic != NULL);
528         ast_assert(publisher_topic != NULL);
529         ast_assert(message != NULL);
530
531         for (i = 0; i < topic->num_subscribers_current; ++i) {
532                 struct stasis_subscription *sub = topic->subscribers[i];
533
534                 ast_assert(sub != NULL);
535
536                 if (sub->mailbox) {
537                         RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
538
539                         dispatch = dispatch_create(publisher_topic, message, sub);
540                         if (!dispatch) {
541                                 ast_log(LOG_DEBUG, "Dropping dispatch\n");
542                                 break;
543                         }
544
545                         if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) == 0) {
546                                 /* Ownership transferred to mailbox.
547                                  * Don't increment ref, b/c the task processor
548                                  * may have already gotten rid of the object.
549                                  */
550                                 dispatch = NULL;
551                         }
552                 } else {
553                         /* Dispatch directly */
554                         subscription_invoke(sub, publisher_topic, message);
555                 }
556         }
557 }
558
559 void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
560 {
561         stasis_forward_message(topic, topic, message);
562 }
563
564 /*! \brief Forwarding subscriber */
565 static void stasis_forward_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
566 {
567         struct stasis_topic *to_topic = data;
568         stasis_forward_message(to_topic, topic, message);
569
570         if (stasis_subscription_final_message(sub, message)) {
571                 ao2_cleanup(to_topic);
572         }
573 }
574
575 struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
576 {
577         struct stasis_subscription *sub;
578         if (!from_topic || !to_topic) {
579                 return NULL;
580         }
581
582         /* Forwarding subscriptions should dispatch directly instead of having a
583          * mailbox. Otherwise, messages forwarded to the same topic from
584          * different topics may get reordered. Which is bad.
585          */
586         sub = internal_stasis_subscribe(from_topic, stasis_forward_cb, to_topic, 0);
587         if (sub) {
588                 /* hold a ref to to_topic for this forwarding subscription */
589                 ao2_ref(to_topic, +1);
590         }
591         return sub;
592 }
593
594 static void subscription_change_dtor(void *obj)
595 {
596         struct stasis_subscription_change *change = obj;
597         ast_string_field_free_memory(change);
598         ao2_cleanup(change->topic);
599 }
600
601 static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, char *uniqueid, char *description)
602 {
603         RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
604
605         change = ao2_alloc(sizeof(struct stasis_subscription_change), subscription_change_dtor);
606         if (ast_string_field_init(change, 128)) {
607                 return NULL;
608         }
609
610         ast_string_field_set(change, uniqueid, uniqueid);
611         ast_string_field_set(change, description, description);
612         ao2_ref(topic, +1);
613         change->topic = topic;
614
615         ao2_ref(change, +1);
616         return change;
617 }
618
619 static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description)
620 {
621         RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
622         RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
623
624         change = subscription_change_alloc(topic, uniqueid, description);
625
626         if (!change) {
627                 return;
628         }
629
630         msg = stasis_message_create(stasis_subscription_change_type(), change);
631
632         if (!msg) {
633                 return;
634         }
635
636         stasis_publish(topic, msg);
637 }
638
639 struct topic_pool_entry {
640         struct stasis_subscription *forward;
641         struct stasis_topic *topic;
642 };
643
644 static void topic_pool_entry_dtor(void *obj)
645 {
646         struct topic_pool_entry *entry = obj;
647         entry->forward = stasis_unsubscribe(entry->forward);
648         ao2_cleanup(entry->topic);
649         entry->topic = NULL;
650 }
651
652 static struct topic_pool_entry *topic_pool_entry_alloc(void)
653 {
654         return ao2_alloc(sizeof(struct topic_pool_entry), topic_pool_entry_dtor);
655 }
656
657 struct stasis_topic_pool {
658         struct ao2_container *pool_container;
659         struct stasis_topic *pool_topic;
660 };
661
662 static void topic_pool_dtor(void *obj)
663 {
664         struct stasis_topic_pool *pool = obj;
665         ao2_cleanup(pool->pool_container);
666         pool->pool_container = NULL;
667         ao2_cleanup(pool->pool_topic);
668         pool->pool_topic = NULL;
669 }
670
671 static int topic_pool_entry_hash(const void *obj, const int flags)
672 {
673         const char *topic_name = (flags & OBJ_KEY) ? obj : stasis_topic_name(((struct topic_pool_entry*) obj)->topic);
674         return ast_str_case_hash(topic_name);
675 }
676
677 static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
678 {
679         struct topic_pool_entry *opt1 = obj, *opt2 = arg;
680         const char *topic_name = (flags & OBJ_KEY) ? arg : stasis_topic_name(opt2->topic);
681         return strcasecmp(stasis_topic_name(opt1->topic), topic_name) ? 0 : CMP_MATCH | CMP_STOP;
682 }
683
684 struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic)
685 {
686         RAII_VAR(struct stasis_topic_pool *, pool, ao2_alloc(sizeof(*pool), topic_pool_dtor), ao2_cleanup);
687         if (!pool) {
688                 return NULL;
689         }
690         pool->pool_container = ao2_container_alloc(TOPIC_POOL_BUCKETS, topic_pool_entry_hash, topic_pool_entry_cmp);
691         ao2_ref(pooled_topic, +1);
692         pool->pool_topic = pooled_topic;
693
694         ao2_ref(pool, +1);
695         return pool;
696 }
697
698 struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
699 {
700         RAII_VAR(struct topic_pool_entry *, topic_pool_entry, NULL, ao2_cleanup);
701         SCOPED_AO2LOCK(topic_container_lock, pool->pool_container);
702         topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_KEY | OBJ_NOLOCK);
703
704         if (topic_pool_entry) {
705                 return topic_pool_entry->topic;
706         }
707
708         topic_pool_entry = topic_pool_entry_alloc();
709
710         if (!topic_pool_entry) {
711                 return NULL;
712         }
713
714         topic_pool_entry->topic = stasis_topic_create(topic_name);
715         if (!topic_pool_entry->topic) {
716                 return NULL;
717         }
718
719         topic_pool_entry->forward = stasis_forward_all(topic_pool_entry->topic, pool->pool_topic);
720         if (!topic_pool_entry->forward) {
721                 return NULL;
722         }
723
724         ao2_link_flags(pool->pool_container, topic_pool_entry, OBJ_NOLOCK);
725
726         return topic_pool_entry->topic;
727 }
728
729 void stasis_log_bad_type_access(const char *name)
730 {
731         ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name);
732 }
733
734 /*! \brief Shutdown function */
735 static void stasis_exit(void)
736 {
737         ast_threadpool_shutdown(pool);
738         pool = NULL;
739 }
740
741 /*! \brief Cleanup function for graceful shutdowns */
742 static void stasis_cleanup(void)
743 {
744         STASIS_MESSAGE_TYPE_CLEANUP(stasis_subscription_change_type);
745 }
746
747 int stasis_init(void)
748 {
749         int cache_init;
750
751         struct ast_threadpool_options opts;
752
753         /* Be sure the types are cleaned up after the message bus */
754         ast_register_cleanup(stasis_cleanup);
755         ast_register_atexit(stasis_exit);
756
757         if (stasis_config_init() != 0) {
758                 ast_log(LOG_ERROR, "Stasis configuration failed\n");
759                 return -1;
760         }
761
762         if (stasis_wait_init() != 0) {
763                 ast_log(LOG_ERROR, "Stasis initialization failed\n");
764                 return -1;
765         }
766
767         if (pool) {
768                 ast_log(LOG_ERROR, "Stasis double-initialized\n");
769                 return -1;
770         }
771
772         stasis_config_get_threadpool_options(&opts);
773         ast_debug(3, "Creating Stasis threadpool: initial_size = %d, max_size = %d, idle_timeout_secs = %d\n",
774                 opts.initial_size, opts.max_size, opts.idle_timeout);
775         pool = ast_threadpool_create("stasis-core", NULL, &opts);
776         if (!pool) {
777                 ast_log(LOG_ERROR, "Stasis threadpool allocation failed\n");
778                 return -1;
779         }
780
781         cache_init = stasis_cache_init();
782         if (cache_init != 0) {
783                 return -1;
784         }
785
786         if (STASIS_MESSAGE_TYPE_INIT(stasis_subscription_change_type) != 0) {
787                 return -1;
788         }
789
790         return 0;
791 }