stasis: use ao2_t_alloc for certain object allocators
[asterisk/asterisk.git] / main / stasis.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis Message Bus API.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
33
34 #include "asterisk/astobj2.h"
35 #include "asterisk/stasis_internal.h"
36 #include "asterisk/stasis.h"
37 #include "asterisk/taskprocessor.h"
38 #include "asterisk/utils.h"
39 #include "asterisk/uuid.h"
40 #include "asterisk/vector.h"
41 #include "asterisk/stasis_channels.h"
42 #include "asterisk/stasis_bridges.h"
43 #include "asterisk/stasis_endpoints.h"
44
45 /*** DOCUMENTATION
46         <managerEvent language="en_US" name="UserEvent">
47                 <managerEventInstance class="EVENT_FLAG_USER">
48                         <synopsis>A user defined event raised from the dialplan.</synopsis>
49                         <syntax>
50                                 <channel_snapshot/>
51                                 <parameter name="UserEvent">
52                                         <para>The event name, as specified in the dialplan.</para>
53                                 </parameter>
54                         </syntax>
55                         <description>
56                                 <para>Event may contain additional arbitrary parameters in addition to optional bridge and endpoint snapshots.  Multiple snapshots of the same type are prefixed with a numeric value.</para>
57                         </description>
58                         <see-also>
59                                 <ref type="application">UserEvent</ref>
60                         </see-also>
61                 </managerEventInstance>
62         </managerEvent>
63 ***/
64
65 /*!
66  * \page stasis-impl Stasis Implementation Notes
67  *
68  * \par Reference counting
69  *
70  * Stasis introduces a number of objects, which are tightly related to one
71  * another. Because we rely on ref-counting for memory management, understanding
72  * these relationships is important to understanding this code.
73  *
74  * \code{.txt}
75  *
76  *   stasis_topic <----> stasis_subscription
77  *             ^          ^
78  *              \        /
79  *               \      /
80  *               dispatch
81  *                  |
82  *                  |
83  *                  v
84  *            stasis_message
85  *                  |
86  *                  |
87  *                  v
88  *          stasis_message_type
89  *
90  * \endcode
91  *
92  * The most troubling thing in this chart is the cyclic reference between
93  * stasis_topic and stasis_subscription. This is both unfortunate, and
94  * necessary. Topics need the subscription in order to dispatch messages;
95  * subscriptions need the topics to unsubscribe and check subscription status.
96  *
97  * The cycle is broken by stasis_unsubscribe(). The unsubscribe will remove the
98  * topic's reference to a subscription. When the subcription is destroyed, it
99  * will remove its reference to the topic.
100  *
101  * This means that until a subscription has be explicitly unsubscribed, it will
102  * not be destroyed. Neither will a topic be destroyed while it has subscribers.
103  * The destructors of both have assertions regarding this to catch ref-counting
104  * problems where a subscription or topic has had an extra ao2_cleanup().
105  *
106  * The \ref dispatch object is a transient object, which is posted to a
107  * subscription's taskprocessor to send a message to the subscriber. They have
108  * short life cycles, allocated on one thread, destroyed on another.
109  *
110  * During shutdown, or the deletion of a domain object, there are a flurry of
111  * ao2_cleanup()s on subscriptions and topics, as the final in-flight messages
112  * are processed. Any one of these cleanups could be the one to actually destroy
113  * a given object, so care must be taken to ensure that an object isn't
114  * referenced after an ao2_cleanup(). This includes the implicit ao2_unlock()
115  * that might happen when a RAII_VAR() goes out of scope.
116  *
117  * \par Typical life cycles
118  *
119  *  \li stasis_topic - There are several topics which live for the duration of
120  *      the Asterisk process (ast_channel_topic_all(), etc.) but most of these
121  *      are actually fed by shorter-lived topics whose lifetime is associated
122  *      with some domain object (like ast_channel_topic() for a given
123  *      ast_channel).
124  *
125  *  \li stasis_subscription - Subscriptions have a similar mix of lifetimes as
126  *      topics, for similar reasons.
127  *
128  *  \li dispatch - Very short lived; just long enough to post a message to a
129  *      subscriber.
130  *
131  *  \li stasis_message - Short to intermediate lifetimes, but that is mostly
132  *      irrelevant. Messages are strictly data and have no behavior associated
133  *      with them, so it doesn't really matter if/when they are destroyed. By
134  *      design, a component could hold a ref to a message forever without any
135  *      ill consequences (aside from consuming more memory).
136  *
137  *  \li stasis_message_type - Long life cycles, typically only destroyed on
138  *      module unloading or _clean_ process exit.
139  *
140  * \par Subscriber shutdown sequencing
141  *
142  * Subscribers are sensitive to shutdown sequencing, specifically in how the
143  * reference message types. This is fully detailed on the wiki at
144  * https://wiki.asterisk.org/wiki/x/K4BqAQ.
145  *
146  * In short, the lifetime of the \a data (and \a callback, if in a module) must
147  * be held until the stasis_subscription_final_message() has been received.
148  * Depending on the structure of the subscriber code, this can be handled by
149  * using stasis_subscription_final_message() to free resources on the final
150  * message, or using stasis_subscription_join()/stasis_unsubscribe_and_join() to
151  * block until the unsubscribe has completed.
152  */
153
154 /*! Initial size of the subscribers list. */
155 #define INITIAL_SUBSCRIBERS_MAX 4
156
157 /*! The number of buckets to use for topic pools */
158 #define TOPIC_POOL_BUCKETS 57
159
160 STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type);
161
162 /*! \internal */
163 struct stasis_topic {
164         char *name;
165         /*! Variable length array of the subscribers */
166         AST_VECTOR(, struct stasis_subscription *) subscribers;
167
168         /*! Topics forwarding into this topic */
169         AST_VECTOR(, struct stasis_topic *) upstream_topics;
170 };
171
172 /* Forward declarations for the tightly-coupled subscription object */
173 static int topic_add_subscription(struct stasis_topic *topic,
174         struct stasis_subscription *sub);
175
176 static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
177
178 /*! \brief Lock two topics. */
179 #define topic_lock_both(topic1, topic2) \
180         do { \
181                 ao2_lock(topic1); \
182                 while (ao2_trylock(topic2)) { \
183                         AO2_DEADLOCK_AVOIDANCE(topic1); \
184                 } \
185         } while (0)
186
187 static void topic_dtor(void *obj)
188 {
189         struct stasis_topic *topic = obj;
190
191         /* Subscribers hold a reference to topics, so they should all be
192          * unsubscribed before we get here. */
193         ast_assert(AST_VECTOR_SIZE(&topic->subscribers) == 0);
194
195         ast_free(topic->name);
196         topic->name = NULL;
197
198         AST_VECTOR_FREE(&topic->subscribers);
199         AST_VECTOR_FREE(&topic->upstream_topics);
200 }
201
202 struct stasis_topic *stasis_topic_create(const char *name)
203 {
204         struct stasis_topic *topic;
205         int res = 0;
206
207         topic = ao2_t_alloc(sizeof(*topic), topic_dtor, name);
208         if (!topic) {
209                 return NULL;
210         }
211
212         topic->name = ast_strdup(name);
213         res |= AST_VECTOR_INIT(&topic->subscribers, INITIAL_SUBSCRIBERS_MAX);
214         res |= AST_VECTOR_INIT(&topic->upstream_topics, 0);
215         if (!topic->name || res) {
216                 ao2_cleanup(topic);
217                 return NULL;
218         }
219
220         return topic;
221 }
222
223 const char *stasis_topic_name(const struct stasis_topic *topic)
224 {
225         return topic->name;
226 }
227
228 /*! \internal */
229 struct stasis_subscription {
230         /*! Unique ID for this subscription */
231         char uniqueid[AST_UUID_STR_LEN];
232         /*! Topic subscribed to. */
233         struct stasis_topic *topic;
234         /*! Mailbox for processing incoming messages. */
235         struct ast_taskprocessor *mailbox;
236         /*! Callback function for incoming message processing. */
237         stasis_subscription_cb callback;
238         /*! Data pointer to be handed to the callback. */
239         void *data;
240
241         /*! Condition for joining with subscription. */
242         ast_cond_t join_cond;
243         /*! Flag set when final message for sub has been received.
244          *  Be sure join_lock is held before reading/setting. */
245         int final_message_rxed;
246         /*! Flag set when final message for sub has been processed.
247          *  Be sure join_lock is held before reading/setting. */
248         int final_message_processed;
249 };
250
251 static void subscription_dtor(void *obj)
252 {
253         struct stasis_subscription *sub = obj;
254
255         /* Subscriptions need to be manually unsubscribed before destruction
256          * b/c there's a cyclic reference between topics and subscriptions */
257         ast_assert(!stasis_subscription_is_subscribed(sub));
258         /* If there are any messages in flight to this subscription; that would
259          * be bad. */
260         ast_assert(stasis_subscription_is_done(sub));
261
262         ao2_cleanup(sub->topic);
263         sub->topic = NULL;
264         ast_taskprocessor_unreference(sub->mailbox);
265         sub->mailbox = NULL;
266         ast_cond_destroy(&sub->join_cond);
267 }
268
269 /*!
270  * \brief Invoke the subscription's callback.
271  * \param sub Subscription to invoke.
272  * \param topic Topic message was published to.
273  * \param message Message to send.
274  */
275 static void subscription_invoke(struct stasis_subscription *sub,
276                                   struct stasis_message *message)
277 {
278         /* Notify that the final message has been received */
279         if (stasis_subscription_final_message(sub, message)) {
280                 SCOPED_AO2LOCK(lock, sub);
281
282                 sub->final_message_rxed = 1;
283                 ast_cond_signal(&sub->join_cond);
284         }
285
286         /* Since sub is mostly immutable, no need to lock sub */
287         sub->callback(sub->data, sub, message);
288
289         /* Notify that the final message has been processed */
290         if (stasis_subscription_final_message(sub, message)) {
291                 SCOPED_AO2LOCK(lock, sub);
292
293                 sub->final_message_processed = 1;
294                 ast_cond_signal(&sub->join_cond);
295         }
296 }
297
298 static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
299 static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
300
301 struct stasis_subscription *internal_stasis_subscribe(
302         struct stasis_topic *topic,
303         stasis_subscription_cb callback,
304         void *data,
305         int needs_mailbox)
306 {
307         RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup);
308
309         if (!topic) {
310                 return NULL;
311         }
312
313         /* The ao2 lock is used for join_cond. */
314         sub = ao2_t_alloc(sizeof(*sub), subscription_dtor, topic->name);
315         if (!sub) {
316                 return NULL;
317         }
318
319         ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid));
320
321         if (needs_mailbox) {
322                 /* With a small number of subscribers, a thread-per-sub is
323                  * acceptable. If our usage changes so that we have larger
324                  * numbers of subscribers, we'll probably want to consider
325                  * a threadpool. We had that originally, but with so few
326                  * subscribers it was actually a performance loss instead of
327                  * a gain.
328                  */
329                 sub->mailbox = ast_taskprocessor_get(sub->uniqueid,
330                         TPS_REF_DEFAULT);
331                 if (!sub->mailbox) {
332                         return NULL;
333                 }
334                 ast_taskprocessor_set_local(sub->mailbox, sub);
335                 /* Taskprocessor has a reference */
336                 ao2_ref(sub, +1);
337         }
338
339         ao2_ref(topic, +1);
340         sub->topic = topic;
341         sub->callback = callback;
342         sub->data = data;
343         ast_cond_init(&sub->join_cond, NULL);
344
345         if (topic_add_subscription(topic, sub) != 0) {
346                 return NULL;
347         }
348         send_subscription_subscribe(topic, sub);
349
350         ao2_ref(sub, +1);
351         return sub;
352 }
353
354 struct stasis_subscription *stasis_subscribe(
355         struct stasis_topic *topic,
356         stasis_subscription_cb callback,
357         void *data)
358 {
359         return internal_stasis_subscribe(topic, callback, data, 1);
360 }
361
362 static int sub_cleanup(void *data)
363 {
364         struct stasis_subscription *sub = data;
365         ao2_cleanup(sub);
366         return 0;
367 }
368
369 struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
370 {
371         /* The subscription may be the last ref to this topic. Hold
372          * the topic ref open until after the unlock. */
373         RAII_VAR(struct stasis_topic *, topic,
374                 ao2_bump(sub ? sub->topic : NULL), ao2_cleanup);
375
376         if (!sub) {
377                 return NULL;
378         }
379
380         /* We have to remove the subscription first, to ensure the unsubscribe
381          * is the final message */
382         if (topic_remove_subscription(sub->topic, sub) != 0) {
383                 ast_log(LOG_ERROR,
384                         "Internal error: subscription has invalid topic\n");
385                 return NULL;
386         }
387
388         /* Now let everyone know about the unsubscribe */
389         send_subscription_unsubscribe(topic, sub);
390
391         /* When all that's done, remove the ref the mailbox has on the sub */
392         if (sub->mailbox) {
393                 ast_taskprocessor_push(sub->mailbox, sub_cleanup, sub);
394         }
395
396         /* Unsubscribing unrefs the subscription */
397         ao2_cleanup(sub);
398         return NULL;
399 }
400
401 void stasis_subscription_join(struct stasis_subscription *subscription)
402 {
403         if (subscription) {
404                 SCOPED_AO2LOCK(lock, subscription);
405
406                 /* Wait until the processed flag has been set */
407                 while (!subscription->final_message_processed) {
408                         ast_cond_wait(&subscription->join_cond,
409                                 ao2_object_get_lockaddr(subscription));
410                 }
411         }
412 }
413
414 int stasis_subscription_is_done(struct stasis_subscription *subscription)
415 {
416         if (subscription) {
417                 SCOPED_AO2LOCK(lock, subscription);
418
419                 return subscription->final_message_rxed;
420         }
421
422         /* Null subscription is about as done as you can get */
423         return 1;
424 }
425
426 struct stasis_subscription *stasis_unsubscribe_and_join(
427         struct stasis_subscription *subscription)
428 {
429         if (!subscription) {
430                 return NULL;
431         }
432
433         /* Bump refcount to hold it past the unsubscribe */
434         ao2_ref(subscription, +1);
435         stasis_unsubscribe(subscription);
436         stasis_subscription_join(subscription);
437         /* Now decrement the refcount back */
438         ao2_cleanup(subscription);
439         return NULL;
440 }
441
442 int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
443 {
444         if (sub) {
445                 size_t i;
446                 struct stasis_topic *topic = sub->topic;
447                 SCOPED_AO2LOCK(lock_topic, topic);
448
449                 for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
450                         if (AST_VECTOR_GET(&topic->subscribers, i) == sub) {
451                                 return 1;
452                         }
453                 }
454         }
455
456         return 0;
457 }
458
459 const char *stasis_subscription_uniqueid(const struct stasis_subscription *sub)
460 {
461         return sub->uniqueid;
462 }
463
464 int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
465 {
466         struct stasis_subscription_change *change;
467
468         if (stasis_message_type(msg) != stasis_subscription_change_type()) {
469                 return 0;
470         }
471
472         change = stasis_message_data(msg);
473         if (strcmp("Unsubscribe", change->description)) {
474                 return 0;
475         }
476
477         if (strcmp(stasis_subscription_uniqueid(sub), change->uniqueid)) {
478                 return 0;
479         }
480
481         return 1;
482 }
483
484 /*!
485  * \brief Add a subscriber to a topic.
486  * \param topic Topic
487  * \param sub Subscriber
488  * \return 0 on success
489  * \return Non-zero on error
490  */
491 static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
492 {
493         size_t idx;
494         SCOPED_AO2LOCK(lock, topic);
495
496         /* The reference from the topic to the subscription is shared with
497          * the owner of the subscription, which will explicitly unsubscribe
498          * to release it.
499          *
500          * If we bumped the refcount here, the owner would have to unsubscribe
501          * and cleanup, which is a bit awkward. */
502         AST_VECTOR_APPEND(&topic->subscribers, sub);
503
504         for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
505                 topic_add_subscription(
506                         AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
507         }
508
509         return 0;
510 }
511
512 static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
513 {
514         size_t idx;
515         SCOPED_AO2LOCK(lock_topic, topic);
516
517         for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
518                 topic_remove_subscription(
519                         AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
520         }
521
522         return AST_VECTOR_REMOVE_ELEM_UNORDERED(&topic->subscribers, sub,
523                 AST_VECTOR_ELEM_CLEANUP_NOOP);
524 }
525
526 /*!
527  * \internal \brief Dispatch a message to a subscriber asynchronously
528  * \param local \ref ast_taskprocessor_local object
529  * \return 0
530  */
531 static int dispatch_exec_async(struct ast_taskprocessor_local *local)
532 {
533         struct stasis_subscription *sub = local->local_data;
534         struct stasis_message *message = local->data;
535
536         subscription_invoke(sub, message);
537         ao2_cleanup(message);
538
539         return 0;
540 }
541
542 /*!
543  * \internal \brief Data passed to \ref dispatch_exec_sync to synchronize
544  * a published message to a subscriber
545  */
546 struct sync_task_data {
547         ast_mutex_t lock;
548         ast_cond_t cond;
549         int complete;
550         void *task_data;
551 };
552
553 /*!
554  * \internal \brief Dispatch a message to a subscriber synchronously
555  * \param local \ref ast_taskprocessor_local object
556  * \return 0
557  */
558 static int dispatch_exec_sync(struct ast_taskprocessor_local *local)
559 {
560         struct stasis_subscription *sub = local->local_data;
561         struct sync_task_data *std = local->data;
562         struct stasis_message *message = std->task_data;
563
564         subscription_invoke(sub, message);
565         ao2_cleanup(message);
566
567         ast_mutex_lock(&std->lock);
568         std->complete = 1;
569         ast_cond_signal(&std->cond);
570         ast_mutex_unlock(&std->lock);
571
572         return 0;
573 }
574
575 /*!
576  * \internal \brief Dispatch a message to a subscriber
577  * \param sub The subscriber to dispatch to
578  * \param message The message to send
579  * \param synchronous If non-zero, synchronize on the subscriber receiving
580  * the message
581  */
582 static void dispatch_message(struct stasis_subscription *sub,
583         struct stasis_message *message,
584         int synchronous)
585 {
586         if (!sub->mailbox) {
587                 /* Dispatch directly */
588                 subscription_invoke(sub, message);
589                 return;
590         }
591
592         /* Bump the message for the taskprocessor push. This will get de-ref'd
593          * by the task processor callback.
594          */
595         ao2_bump(message);
596         if (!synchronous) {
597                 if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec_async, message)) {
598                         /* Push failed; ugh. */
599                         ast_log(LOG_ERROR, "Dropping async dispatch\n");
600                         ao2_cleanup(message);
601                 }
602         } else {
603                 struct sync_task_data std;
604
605                 ast_mutex_init(&std.lock);
606                 ast_cond_init(&std.cond, NULL);
607                 std.complete = 0;
608                 std.task_data = message;
609
610                 if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec_sync, &std)) {
611                         /* Push failed; ugh. */
612                         ast_log(LOG_ERROR, "Dropping sync dispatch\n");
613                         ao2_cleanup(message);
614                         ast_mutex_destroy(&std.lock);
615                         ast_cond_destroy(&std.cond);
616                         return;
617                 }
618
619                 ast_mutex_lock(&std.lock);
620                 while (!std.complete) {
621                         ast_cond_wait(&std.cond, &std.lock);
622                 }
623                 ast_mutex_unlock(&std.lock);
624
625                 ast_mutex_destroy(&std.lock);
626                 ast_cond_destroy(&std.cond);
627         }
628 }
629
630 /*!
631  * \internal \brief Publish a message to a topic's subscribers
632  * \brief topic The topic to publish to
633  * \brief message The message to publish
634  * \brief sync_sub An optional subscriber of the topic to publish synchronously
635  * to
636  */
637 static void publish_msg(struct stasis_topic *topic,
638         struct stasis_message *message, struct stasis_subscription *sync_sub)
639 {
640         size_t i;
641
642         ast_assert(topic != NULL);
643         ast_assert(message != NULL);
644
645         /*
646          * The topic may be unref'ed by the subscription invocation.
647          * Make sure we hold onto a reference while dispatching.
648          */
649         ao2_ref(topic, +1);
650         ao2_lock(topic);
651         for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
652                 struct stasis_subscription *sub = AST_VECTOR_GET(&topic->subscribers, i);
653
654                 ast_assert(sub != NULL);
655
656                 dispatch_message(sub, message, (sub == sync_sub));
657         }
658         ao2_unlock(topic);
659         ao2_ref(topic, -1);
660 }
661
662 void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
663 {
664         publish_msg(topic, message, NULL);
665 }
666
667 void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message)
668 {
669         ast_assert(sub != NULL);
670
671         publish_msg(sub->topic, message, sub);
672 }
673
674 /*!
675  * \brief Forwarding information
676  *
677  * Any message posted to \a from_topic is forwarded to \a to_topic.
678  *
679  * In cases where both the \a from_topic and \a to_topic need to be locked,
680  * always lock the \a to_topic first, then the \a from_topic. Lest you deadlock.
681  */
682 struct stasis_forward {
683         /*! Originating topic */
684         struct stasis_topic *from_topic;
685         /*! Destination topic */
686         struct stasis_topic *to_topic;
687 };
688
689 static void forward_dtor(void *obj)
690 {
691         struct stasis_forward *forward = obj;
692
693         ao2_cleanup(forward->from_topic);
694         forward->from_topic = NULL;
695         ao2_cleanup(forward->to_topic);
696         forward->to_topic = NULL;
697 }
698
699 struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward)
700 {
701         int idx;
702         struct stasis_topic *from;
703         struct stasis_topic *to;
704
705         if (!forward) {
706                 return NULL;
707         }
708
709         from = forward->from_topic;
710         to = forward->to_topic;
711
712         if (from && to) {
713                 topic_lock_both(to, from);
714                 AST_VECTOR_REMOVE_ELEM_UNORDERED(&to->upstream_topics, from,
715                         AST_VECTOR_ELEM_CLEANUP_NOOP);
716
717                 for (idx = 0; idx < AST_VECTOR_SIZE(&to->subscribers); ++idx) {
718                         topic_remove_subscription(from, AST_VECTOR_GET(&to->subscribers, idx));
719                 }
720                 ao2_unlock(from);
721                 ao2_unlock(to);
722         }
723
724         ao2_cleanup(forward);
725
726         return NULL;
727 }
728
729 struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic,
730         struct stasis_topic *to_topic)
731 {
732         int res;
733         size_t idx;
734         RAII_VAR(struct stasis_forward *, forward, NULL, ao2_cleanup);
735
736         if (!from_topic || !to_topic) {
737                 return NULL;
738         }
739
740         forward = ao2_alloc_options(sizeof(*forward), forward_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
741         if (!forward) {
742                 return NULL;
743         }
744
745         /* Forwards to ourselves are implicit. */
746         if (to_topic == from_topic) {
747                 return ao2_bump(forward);
748         }
749
750         forward->from_topic = ao2_bump(from_topic);
751         forward->to_topic = ao2_bump(to_topic);
752
753         topic_lock_both(to_topic, from_topic);
754         res = AST_VECTOR_APPEND(&to_topic->upstream_topics, from_topic);
755         if (res != 0) {
756                 ao2_unlock(from_topic);
757                 ao2_unlock(to_topic);
758                 return NULL;
759         }
760
761         for (idx = 0; idx < AST_VECTOR_SIZE(&to_topic->subscribers); ++idx) {
762                 topic_add_subscription(from_topic, AST_VECTOR_GET(&to_topic->subscribers, idx));
763         }
764         ao2_unlock(from_topic);
765         ao2_unlock(to_topic);
766
767         return ao2_bump(forward);
768 }
769
770 static void subscription_change_dtor(void *obj)
771 {
772         struct stasis_subscription_change *change = obj;
773
774         ast_string_field_free_memory(change);
775         ao2_cleanup(change->topic);
776 }
777
778 static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, const char *uniqueid, const char *description)
779 {
780         struct stasis_subscription_change *change;
781
782         change = ao2_alloc(sizeof(struct stasis_subscription_change), subscription_change_dtor);
783         if (!change || ast_string_field_init(change, 128)) {
784                 ao2_cleanup(change);
785                 return NULL;
786         }
787
788         ast_string_field_set(change, uniqueid, uniqueid);
789         ast_string_field_set(change, description, description);
790         ao2_ref(topic, +1);
791         change->topic = topic;
792
793         return change;
794 }
795
796 static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
797 {
798         struct stasis_subscription_change *change;
799         struct stasis_message *msg;
800
801         /* This assumes that we have already unsubscribed */
802         ast_assert(stasis_subscription_is_subscribed(sub));
803
804         change = subscription_change_alloc(topic, sub->uniqueid, "Subscribe");
805         if (!change) {
806                 return;
807         }
808
809         msg = stasis_message_create(stasis_subscription_change_type(), change);
810         if (!msg) {
811                 ao2_cleanup(change);
812                 return;
813         }
814
815         stasis_publish(topic, msg);
816         ao2_cleanup(msg);
817         ao2_cleanup(change);
818 }
819
820 static void send_subscription_unsubscribe(struct stasis_topic *topic,
821         struct stasis_subscription *sub)
822 {
823         struct stasis_subscription_change *change;
824         struct stasis_message *msg;
825
826         /* This assumes that we have already unsubscribed */
827         ast_assert(!stasis_subscription_is_subscribed(sub));
828
829         change = subscription_change_alloc(topic, sub->uniqueid, "Unsubscribe");
830         if (!change) {
831                 return;
832         }
833
834         msg = stasis_message_create(stasis_subscription_change_type(), change);
835         if (!msg) {
836                 ao2_cleanup(change);
837                 return;
838         }
839
840         stasis_publish(topic, msg);
841
842         /* Now we have to dispatch to the subscription itself */
843         dispatch_message(sub, msg, 0);
844
845         ao2_cleanup(msg);
846         ao2_cleanup(change);
847 }
848
849 struct topic_pool_entry {
850         struct stasis_forward *forward;
851         struct stasis_topic *topic;
852 };
853
854 static void topic_pool_entry_dtor(void *obj)
855 {
856         struct topic_pool_entry *entry = obj;
857
858         entry->forward = stasis_forward_cancel(entry->forward);
859         ao2_cleanup(entry->topic);
860         entry->topic = NULL;
861 }
862
863 static struct topic_pool_entry *topic_pool_entry_alloc(void)
864 {
865         return ao2_alloc_options(sizeof(struct topic_pool_entry), topic_pool_entry_dtor,
866                 AO2_ALLOC_OPT_LOCK_NOLOCK);
867 }
868
869 struct stasis_topic_pool {
870         struct ao2_container *pool_container;
871         struct stasis_topic *pool_topic;
872 };
873
874 static void topic_pool_dtor(void *obj)
875 {
876         struct stasis_topic_pool *pool = obj;
877
878         ao2_cleanup(pool->pool_container);
879         pool->pool_container = NULL;
880         ao2_cleanup(pool->pool_topic);
881         pool->pool_topic = NULL;
882 }
883
884 static int topic_pool_entry_hash(const void *obj, const int flags)
885 {
886         const struct topic_pool_entry *object;
887         const char *key;
888
889         switch (flags & OBJ_SEARCH_MASK) {
890         case OBJ_SEARCH_KEY:
891                 key = obj;
892                 break;
893         case OBJ_SEARCH_OBJECT:
894                 object = obj;
895                 key = stasis_topic_name(object->topic);
896                 break;
897         default:
898                 /* Hash can only work on something with a full key. */
899                 ast_assert(0);
900                 return 0;
901         }
902         return ast_str_case_hash(key);
903 }
904
905 static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
906 {
907         const struct topic_pool_entry *object_left = obj;
908         const struct topic_pool_entry *object_right = arg;
909         const char *right_key = arg;
910         int cmp;
911
912         switch (flags & OBJ_SEARCH_MASK) {
913         case OBJ_SEARCH_OBJECT:
914                 right_key = stasis_topic_name(object_right->topic);
915                 /* Fall through */
916         case OBJ_SEARCH_KEY:
917                 cmp = strcasecmp(stasis_topic_name(object_left->topic), right_key);
918                 break;
919         case OBJ_SEARCH_PARTIAL_KEY:
920                 /* Not supported by container */
921                 ast_assert(0);
922                 cmp = -1;
923                 break;
924         default:
925                 /*
926                  * What arg points to is specific to this traversal callback
927                  * and has no special meaning to astobj2.
928                  */
929                 cmp = 0;
930                 break;
931         }
932         if (cmp) {
933                 return 0;
934         }
935         /*
936          * At this point the traversal callback is identical to a sorted
937          * container.
938          */
939         return CMP_MATCH;
940 }
941
942 struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic)
943 {
944         struct stasis_topic_pool *pool;
945
946         pool = ao2_alloc_options(sizeof(*pool), topic_pool_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
947         if (!pool) {
948                 return NULL;
949         }
950
951         pool->pool_container = ao2_container_alloc(TOPIC_POOL_BUCKETS,
952                 topic_pool_entry_hash, topic_pool_entry_cmp);
953         if (!pool->pool_container) {
954                 ao2_cleanup(pool);
955                 return NULL;
956         }
957         ao2_ref(pooled_topic, +1);
958         pool->pool_topic = pooled_topic;
959
960         return pool;
961 }
962
963 struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
964 {
965         RAII_VAR(struct topic_pool_entry *, topic_pool_entry, NULL, ao2_cleanup);
966         SCOPED_AO2LOCK(topic_container_lock, pool->pool_container);
967
968         topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
969         if (topic_pool_entry) {
970                 return topic_pool_entry->topic;
971         }
972
973         topic_pool_entry = topic_pool_entry_alloc();
974         if (!topic_pool_entry) {
975                 return NULL;
976         }
977
978         topic_pool_entry->topic = stasis_topic_create(topic_name);
979         if (!topic_pool_entry->topic) {
980                 return NULL;
981         }
982
983         topic_pool_entry->forward = stasis_forward_all(topic_pool_entry->topic, pool->pool_topic);
984         if (!topic_pool_entry->forward) {
985                 return NULL;
986         }
987
988         if (!ao2_link_flags(pool->pool_container, topic_pool_entry, OBJ_NOLOCK)) {
989                 return NULL;
990         }
991
992         return topic_pool_entry->topic;
993 }
994
995 void stasis_log_bad_type_access(const char *name)
996 {
997         ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name);
998 }
999
1000 /*! \brief A multi object blob data structure to carry user event stasis messages */
1001 struct ast_multi_object_blob {
1002         struct ast_json *blob;                             /*< A blob of JSON data */
1003         AST_VECTOR(, void *) snapshots[STASIS_UMOS_MAX];   /*< Vector of snapshots for each type */
1004 };
1005
1006 /*!
1007  * \internal
1008  * \brief Destructor for \ref ast_multi_object_blob objects
1009  */
1010 static void multi_object_blob_dtor(void *obj)
1011 {
1012         struct ast_multi_object_blob *multi = obj;
1013         int type;
1014         int i;
1015
1016         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1017                 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
1018                         ao2_cleanup(AST_VECTOR_GET(&multi->snapshots[type], i));
1019                 }
1020                 AST_VECTOR_FREE(&multi->snapshots[type]);
1021         }
1022         ast_json_unref(multi->blob);
1023 }
1024
1025 /*! \brief Create a stasis user event multi object blob */
1026 struct ast_multi_object_blob *ast_multi_object_blob_create(struct ast_json *blob)
1027 {
1028         int type;
1029         RAII_VAR(struct ast_multi_object_blob *, multi,
1030                         ao2_alloc(sizeof(*multi), multi_object_blob_dtor),
1031                         ao2_cleanup);
1032
1033         ast_assert(blob != NULL);
1034
1035         if (!multi) {
1036                 return NULL;
1037         }
1038
1039         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1040                 if (AST_VECTOR_INIT(&multi->snapshots[type], 0)) {
1041                         return NULL;
1042                 }
1043         }
1044
1045         multi->blob = ast_json_ref(blob);
1046
1047         ao2_ref(multi, +1);
1048         return multi;
1049 }
1050
1051 /*! \brief Add an object (snapshot) to the blob */
1052 void ast_multi_object_blob_add(struct ast_multi_object_blob *multi,
1053         enum stasis_user_multi_object_snapshot_type type, void *object)
1054 {
1055         if (!multi || !object) {
1056                 return;
1057         }
1058         AST_VECTOR_APPEND(&multi->snapshots[type],object);
1059 }
1060
1061 /*! \brief Publish single channel user event (for app_userevent compatibility) */
1062 void ast_multi_object_blob_single_channel_publish(struct ast_channel *chan,
1063         struct stasis_message_type *type, struct ast_json *blob)
1064 {
1065         RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
1066         RAII_VAR(struct ast_channel_snapshot *, channel_snapshot, NULL, ao2_cleanup);
1067         RAII_VAR(struct ast_multi_object_blob *, multi, NULL, ao2_cleanup);
1068
1069         multi = ast_multi_object_blob_create(blob);
1070         if (!multi) {
1071                 return;
1072         }
1073
1074         channel_snapshot = ast_channel_snapshot_create(chan);
1075         ao2_ref(channel_snapshot, +1);
1076         ast_multi_object_blob_add(multi, STASIS_UMOS_CHANNEL, channel_snapshot);
1077
1078         message = stasis_message_create(type, multi);
1079         if (message) {
1080                 /* app_userevent still publishes to channel */
1081                 stasis_publish(ast_channel_topic(chan), message);
1082         }
1083 }
1084
1085 /*! \internal \brief convert multi object blob to ari json */
1086 static struct ast_json *multi_user_event_to_json(
1087         struct stasis_message *message,
1088         const struct stasis_message_sanitizer *sanitize)
1089 {
1090         RAII_VAR(struct ast_json *, out, NULL, ast_json_unref);
1091         struct ast_multi_object_blob *multi = stasis_message_data(message);
1092         struct ast_json *blob = multi->blob;
1093         const struct timeval *tv = stasis_message_timestamp(message);
1094         enum stasis_user_multi_object_snapshot_type type;
1095         int i;
1096
1097         out = ast_json_object_create();
1098         if (!out) {
1099                 return NULL;
1100         }
1101
1102         ast_json_object_set(out, "type", ast_json_string_create("ChannelUserevent"));
1103         ast_json_object_set(out, "timestamp", ast_json_timeval(*tv, NULL));
1104         ast_json_object_set(out, "eventname", ast_json_ref(ast_json_object_get(blob, "eventname")));
1105         ast_json_object_set(out, "userevent", ast_json_ref(blob)); /* eventname gets duplicated, that's ok */
1106
1107         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1108                 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
1109                         struct ast_json *json_object = NULL;
1110                         char *name = NULL;
1111                         void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
1112
1113                         switch (type) {
1114                         case STASIS_UMOS_CHANNEL:
1115                                 json_object = ast_channel_snapshot_to_json(snapshot, sanitize);
1116                                 name = "channel";
1117                                 break;
1118                         case STASIS_UMOS_BRIDGE:
1119                                 json_object = ast_bridge_snapshot_to_json(snapshot, sanitize);
1120                                 name = "bridge";
1121                                 break;
1122                         case STASIS_UMOS_ENDPOINT:
1123                                 json_object = ast_endpoint_snapshot_to_json(snapshot, sanitize);
1124                                 name = "endpoint";
1125                                 break;
1126                         }
1127                         if (json_object) {
1128                                 ast_json_object_set(out, name, json_object);
1129                         }
1130                 }
1131         }
1132         return ast_json_ref(out);
1133 }
1134
1135 /*! \internal \brief convert multi object blob to ami string */
1136 static struct ast_str *multi_object_blob_to_ami(void *obj)
1137 {
1138         struct ast_str *ami_str=ast_str_create(1024);
1139         struct ast_str *ami_snapshot;
1140         const struct ast_multi_object_blob *multi = obj;
1141         enum stasis_user_multi_object_snapshot_type type;
1142         int i;
1143
1144         if (!ami_str) {
1145                 return NULL;
1146         }
1147         if (!multi) {
1148                 ast_free(ami_str);
1149                 return NULL;
1150         }
1151
1152         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1153                 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
1154                         char *name = "";
1155                         void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
1156                         ami_snapshot = NULL;
1157
1158                         if (i > 0) {
1159                                 ast_asprintf(&name, "%d", i + 1);
1160                         }
1161
1162                         switch (type) {
1163                         case STASIS_UMOS_CHANNEL:
1164                                 ami_snapshot = ast_manager_build_channel_state_string_prefix(snapshot, name);
1165                                 break;
1166
1167                         case STASIS_UMOS_BRIDGE:
1168                                 ami_snapshot = ast_manager_build_bridge_state_string_prefix(snapshot, name);
1169                                 break;
1170
1171                         case STASIS_UMOS_ENDPOINT:
1172                                 /* currently not sending endpoint snapshots to AMI */
1173                                 break;
1174                         }
1175                         if (ami_snapshot) {
1176                                 ast_str_append(&ami_str, 0, "%s", ast_str_buffer(ami_snapshot));
1177                                 ast_free(ami_snapshot);
1178                         }
1179                 }
1180         }
1181
1182         return ami_str;
1183 }
1184
1185 /*! \internal \brief Callback to pass only user defined parameters from blob */
1186 static int userevent_exclusion_cb(const char *key)
1187 {
1188         if (!strcmp("eventname", key)) {
1189                 return 1;
1190         }
1191         return 0;
1192 }
1193
1194 static struct ast_manager_event_blob *multi_user_event_to_ami(
1195         struct stasis_message *message)
1196 {
1197         RAII_VAR(struct ast_str *, object_string, NULL, ast_free);
1198         RAII_VAR(struct ast_str *, body, NULL, ast_free);
1199         struct ast_multi_object_blob *multi = stasis_message_data(message);
1200         const char *eventname;
1201
1202         eventname = ast_json_string_get(ast_json_object_get(multi->blob, "eventname"));
1203         body = ast_manager_str_from_json_object(multi->blob, userevent_exclusion_cb);
1204         object_string = multi_object_blob_to_ami(multi);
1205         if (!object_string || !body) {
1206                 return NULL;
1207         }
1208
1209         return ast_manager_event_blob_create(EVENT_FLAG_USER, "UserEvent",
1210                 "%s"
1211                 "UserEvent: %s\r\n"
1212                 "%s",
1213                 ast_str_buffer(object_string),
1214                 eventname,
1215                 ast_str_buffer(body));
1216 }
1217
1218
1219 /*!
1220  * @{ \brief Define multi user event message type(s).
1221  */
1222
1223 STASIS_MESSAGE_TYPE_DEFN(ast_multi_user_event_type,
1224         .to_json = multi_user_event_to_json,
1225         .to_ami = multi_user_event_to_ami,
1226         );
1227
1228 /*! @} */
1229
1230 /*! \brief Cleanup function for graceful shutdowns */
1231 static void stasis_cleanup(void)
1232 {
1233         STASIS_MESSAGE_TYPE_CLEANUP(stasis_subscription_change_type);
1234         STASIS_MESSAGE_TYPE_CLEANUP(ast_multi_user_event_type);
1235 }
1236
1237 int stasis_init(void)
1238 {
1239         int cache_init;
1240
1241         /* Be sure the types are cleaned up after the message bus */
1242         ast_register_cleanup(stasis_cleanup);
1243
1244         cache_init = stasis_cache_init();
1245         if (cache_init != 0) {
1246                 return -1;
1247         }
1248
1249         if (STASIS_MESSAGE_TYPE_INIT(stasis_subscription_change_type) != 0) {
1250                 return -1;
1251         }
1252         if (STASIS_MESSAGE_TYPE_INIT(ast_multi_user_event_type) != 0) {
1253                 return -1;
1254         }
1255
1256         return 0;
1257 }
1258