Stasis: Allow message types to be blocked
[asterisk/asterisk.git] / main / stasis.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis Message Bus API.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
33
34 #include "asterisk/astobj2.h"
35 #include "asterisk/stasis_internal.h"
36 #include "asterisk/stasis.h"
37 #include "asterisk/taskprocessor.h"
38 #include "asterisk/utils.h"
39 #include "asterisk/uuid.h"
40 #include "asterisk/vector.h"
41 #include "asterisk/stasis_channels.h"
42 #include "asterisk/stasis_bridges.h"
43 #include "asterisk/stasis_endpoints.h"
44 #include "asterisk/config_options.h"
45
46 /*** DOCUMENTATION
47         <managerEvent language="en_US" name="UserEvent">
48                 <managerEventInstance class="EVENT_FLAG_USER">
49                         <synopsis>A user defined event raised from the dialplan.</synopsis>
50                         <syntax>
51                                 <channel_snapshot/>
52                                 <parameter name="UserEvent">
53                                         <para>The event name, as specified in the dialplan.</para>
54                                 </parameter>
55                         </syntax>
56                         <description>
57                                 <para>Event may contain additional arbitrary parameters in addition to optional bridge and endpoint snapshots.  Multiple snapshots of the same type are prefixed with a numeric value.</para>
58                         </description>
59                         <see-also>
60                                 <ref type="application">UserEvent</ref>
61                         </see-also>
62                 </managerEventInstance>
63         </managerEvent>
64         <configInfo name="stasis" language="en_US">
65                 <configFile name="stasis.conf">
66                         <configObject name="declined_message_types">
67                                 <synopsis>Stasis message types for which to decline creation.</synopsis>
68                                 <configOption name="decline">
69                                         <synopsis>The message type to decline.</synopsis>
70                                         <description>
71                                                 <para>This configuration option defines the name of the Stasis
72                                                 message type that Asterisk is forbidden from creating and can be
73                                                 specified as many times as necessary to achieve the desired result.</para>
74                                                 <enumlist>
75                                                         <enum name="stasis_app_recording_snapshot_type" />
76                                                         <enum name="stasis_app_playback_snapshot_type" />
77                                                         <enum name="stasis_test_message_type" />
78                                                         <enum name="confbridge_start_type" />
79                                                         <enum name="confbridge_end_type" />
80                                                         <enum name="confbridge_join_type" />
81                                                         <enum name="confbridge_leave_type" />
82                                                         <enum name="confbridge_start_record_type" />
83                                                         <enum name="confbridge_stop_record_type" />
84                                                         <enum name="confbridge_mute_type" />
85                                                         <enum name="confbridge_unmute_type" />
86                                                         <enum name="confbridge_talking_type" />
87                                                         <enum name="cel_generic_type" />
88                                                         <enum name="ast_bridge_snapshot_type" />
89                                                         <enum name="ast_bridge_merge_message_type" />
90                                                         <enum name="ast_channel_entered_bridge_type" />
91                                                         <enum name="ast_channel_left_bridge_type" />
92                                                         <enum name="ast_blind_transfer_type" />
93                                                         <enum name="ast_attended_transfer_type" />
94                                                         <enum name="ast_endpoint_snapshot_type" />
95                                                         <enum name="ast_endpoint_state_type" />
96                                                         <enum name="ast_device_state_message_type" />
97                                                         <enum name="ast_test_suite_message_type" />
98                                                         <enum name="ast_mwi_state_type" />
99                                                         <enum name="ast_mwi_vm_app_type" />
100                                                         <enum name="ast_format_register_type" />
101                                                         <enum name="ast_format_unregister_type" />
102                                                         <enum name="ast_manager_get_generic_type" />
103                                                         <enum name="ast_parked_call_type" />
104                                                         <enum name="ast_channel_snapshot_type" />
105                                                         <enum name="ast_channel_dial_type" />
106                                                         <enum name="ast_channel_varset_type" />
107                                                         <enum name="ast_channel_hangup_request_type" />
108                                                         <enum name="ast_channel_dtmf_begin_type" />
109                                                         <enum name="ast_channel_dtmf_end_type" />
110                                                         <enum name="ast_channel_hold_type" />
111                                                         <enum name="ast_channel_unhold_type" />
112                                                         <enum name="ast_channel_chanspy_start_type" />
113                                                         <enum name="ast_channel_chanspy_stop_type" />
114                                                         <enum name="ast_channel_fax_type" />
115                                                         <enum name="ast_channel_hangup_handler_type" />
116                                                         <enum name="ast_channel_moh_start_type" />
117                                                         <enum name="ast_channel_moh_stop_type" />
118                                                         <enum name="ast_channel_monitor_start_type" />
119                                                         <enum name="ast_channel_monitor_stop_type" />
120                                                         <enum name="ast_channel_agent_login_type" />
121                                                         <enum name="ast_channel_agent_logoff_type" />
122                                                         <enum name="ast_channel_talking_start" />
123                                                         <enum name="ast_channel_talking_stop" />
124                                                         <enum name="ast_security_event_type" />
125                                                         <enum name="ast_named_acl_change_type" />
126                                                         <enum name="ast_local_bridge_type" />
127                                                         <enum name="ast_local_optimization_begin_type" />
128                                                         <enum name="ast_local_optimization_end_type" />
129                                                         <enum name="stasis_subscription_change_type" />
130                                                         <enum name="ast_multi_user_event_type" />
131                                                         <enum name="stasis_cache_clear_type" />
132                                                         <enum name="stasis_cache_update_type" />
133                                                         <enum name="ast_network_change_type" />
134                                                         <enum name="ast_system_registry_type" />
135                                                         <enum name="ast_cc_available_type" />
136                                                         <enum name="ast_cc_offertimerstart_type" />
137                                                         <enum name="ast_cc_requested_type" />
138                                                         <enum name="ast_cc_requestacknowledged_type" />
139                                                         <enum name="ast_cc_callerstopmonitoring_type" />
140                                                         <enum name="ast_cc_callerstartmonitoring_type" />
141                                                         <enum name="ast_cc_callerrecalling_type" />
142                                                         <enum name="ast_cc_recallcomplete_type" />
143                                                         <enum name="ast_cc_failure_type" />
144                                                         <enum name="ast_cc_monitorfailed_type" />
145                                                         <enum name="ast_presence_state_message_type" />
146                                                         <enum name="ast_rtp_rtcp_sent_type" />
147                                                         <enum name="ast_rtp_rtcp_received_type" />
148                                                         <enum name="ast_call_pickup_type" />
149                                                         <enum name="aoc_s_type" />
150                                                         <enum name="aoc_d_type" />
151                                                         <enum name="aoc_e_type" />
152                                                         <enum name="dahdichannel_type" />
153                                                         <enum name="mcid_type" />
154                                                         <enum name="session_timeout_type" />
155                                                         <enum name="cdr_read_message_type" />
156                                                         <enum name="cdr_write_message_type" />
157                                                         <enum name="cdr_prop_write_message_type" />
158                                                         <enum name="corosync_ping_message_type" />
159                                                         <enum name="agi_exec_start_type" />
160                                                         <enum name="agi_exec_end_type" />
161                                                         <enum name="agi_async_start_type" />
162                                                         <enum name="agi_async_exec_type" />
163                                                         <enum name="agi_async_end_type" />
164                                                         <enum name="queue_caller_join_type" />
165                                                         <enum name="queue_caller_leave_type" />
166                                                         <enum name="queue_caller_abandon_type" />
167                                                         <enum name="queue_member_status_type" />
168                                                         <enum name="queue_member_added_type" />
169                                                         <enum name="queue_member_removed_type" />
170                                                         <enum name="queue_member_pause_type" />
171                                                         <enum name="queue_member_penalty_type" />
172                                                         <enum name="queue_member_ringinuse_type" />
173                                                         <enum name="queue_agent_called_type" />
174                                                         <enum name="queue_agent_connect_type" />
175                                                         <enum name="queue_agent_complete_type" />
176                                                         <enum name="queue_agent_dump_type" />
177                                                         <enum name="queue_agent_ringnoanswer_type" />
178                                                         <enum name="meetme_join_type" />
179                                                         <enum name="meetme_leave_type" />
180                                                         <enum name="meetme_end_type" />
181                                                         <enum name="meetme_mute_type" />
182                                                         <enum name="meetme_talking_type" />
183                                                         <enum name="meetme_talk_request_type" />
184                                                         <enum name="appcdr_message_type" />
185                                                         <enum name="forkcdr_message_type" />
186                                                         <enum name="cdr_sync_message_type" />
187                                                 </enumlist>
188                                         </description>
189                                 </configOption>
190                         </configObject>
191                 </configFile>
192         </configInfo>
193 ***/
194
195 /*!
196  * \page stasis-impl Stasis Implementation Notes
197  *
198  * \par Reference counting
199  *
200  * Stasis introduces a number of objects, which are tightly related to one
201  * another. Because we rely on ref-counting for memory management, understanding
202  * these relationships is important to understanding this code.
203  *
204  * \code{.txt}
205  *
206  *   stasis_topic <----> stasis_subscription
207  *             ^          ^
208  *              \        /
209  *               \      /
210  *               dispatch
211  *                  |
212  *                  |
213  *                  v
214  *            stasis_message
215  *                  |
216  *                  |
217  *                  v
218  *          stasis_message_type
219  *
220  * \endcode
221  *
222  * The most troubling thing in this chart is the cyclic reference between
223  * stasis_topic and stasis_subscription. This is both unfortunate, and
224  * necessary. Topics need the subscription in order to dispatch messages;
225  * subscriptions need the topics to unsubscribe and check subscription status.
226  *
227  * The cycle is broken by stasis_unsubscribe(). The unsubscribe will remove the
228  * topic's reference to a subscription. When the subcription is destroyed, it
229  * will remove its reference to the topic.
230  *
231  * This means that until a subscription has be explicitly unsubscribed, it will
232  * not be destroyed. Neither will a topic be destroyed while it has subscribers.
233  * The destructors of both have assertions regarding this to catch ref-counting
234  * problems where a subscription or topic has had an extra ao2_cleanup().
235  *
236  * The \ref dispatch object is a transient object, which is posted to a
237  * subscription's taskprocessor to send a message to the subscriber. They have
238  * short life cycles, allocated on one thread, destroyed on another.
239  *
240  * During shutdown, or the deletion of a domain object, there are a flurry of
241  * ao2_cleanup()s on subscriptions and topics, as the final in-flight messages
242  * are processed. Any one of these cleanups could be the one to actually destroy
243  * a given object, so care must be taken to ensure that an object isn't
244  * referenced after an ao2_cleanup(). This includes the implicit ao2_unlock()
245  * that might happen when a RAII_VAR() goes out of scope.
246  *
247  * \par Typical life cycles
248  *
249  *  \li stasis_topic - There are several topics which live for the duration of
250  *      the Asterisk process (ast_channel_topic_all(), etc.) but most of these
251  *      are actually fed by shorter-lived topics whose lifetime is associated
252  *      with some domain object (like ast_channel_topic() for a given
253  *      ast_channel).
254  *
255  *  \li stasis_subscription - Subscriptions have a similar mix of lifetimes as
256  *      topics, for similar reasons.
257  *
258  *  \li dispatch - Very short lived; just long enough to post a message to a
259  *      subscriber.
260  *
261  *  \li stasis_message - Short to intermediate lifetimes, but that is mostly
262  *      irrelevant. Messages are strictly data and have no behavior associated
263  *      with them, so it doesn't really matter if/when they are destroyed. By
264  *      design, a component could hold a ref to a message forever without any
265  *      ill consequences (aside from consuming more memory).
266  *
267  *  \li stasis_message_type - Long life cycles, typically only destroyed on
268  *      module unloading or _clean_ process exit.
269  *
270  * \par Subscriber shutdown sequencing
271  *
272  * Subscribers are sensitive to shutdown sequencing, specifically in how the
273  * reference message types. This is fully detailed on the wiki at
274  * https://wiki.asterisk.org/wiki/x/K4BqAQ.
275  *
276  * In short, the lifetime of the \a data (and \a callback, if in a module) must
277  * be held until the stasis_subscription_final_message() has been received.
278  * Depending on the structure of the subscriber code, this can be handled by
279  * using stasis_subscription_final_message() to free resources on the final
280  * message, or using stasis_subscription_join()/stasis_unsubscribe_and_join() to
281  * block until the unsubscribe has completed.
282  */
283
284 /*! Initial size of the subscribers list. */
285 #define INITIAL_SUBSCRIBERS_MAX 4
286
287 /*! The number of buckets to use for topic pools */
288 #define TOPIC_POOL_BUCKETS 57
289
290 STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type);
291
292 /*! \internal */
293 struct stasis_topic {
294         char *name;
295         /*! Variable length array of the subscribers */
296         AST_VECTOR(, struct stasis_subscription *) subscribers;
297
298         /*! Topics forwarding into this topic */
299         AST_VECTOR(, struct stasis_topic *) upstream_topics;
300 };
301
302 /* Forward declarations for the tightly-coupled subscription object */
303 static int topic_add_subscription(struct stasis_topic *topic,
304         struct stasis_subscription *sub);
305
306 static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
307
308 /*! \brief Lock two topics. */
309 #define topic_lock_both(topic1, topic2) \
310         do { \
311                 ao2_lock(topic1); \
312                 while (ao2_trylock(topic2)) { \
313                         AO2_DEADLOCK_AVOIDANCE(topic1); \
314                 } \
315         } while (0)
316
317 static void topic_dtor(void *obj)
318 {
319         struct stasis_topic *topic = obj;
320
321         /* Subscribers hold a reference to topics, so they should all be
322          * unsubscribed before we get here. */
323         ast_assert(AST_VECTOR_SIZE(&topic->subscribers) == 0);
324
325         ast_free(topic->name);
326         topic->name = NULL;
327
328         AST_VECTOR_FREE(&topic->subscribers);
329         AST_VECTOR_FREE(&topic->upstream_topics);
330 }
331
332 struct stasis_topic *stasis_topic_create(const char *name)
333 {
334         struct stasis_topic *topic;
335         int res = 0;
336
337         topic = ao2_t_alloc(sizeof(*topic), topic_dtor, name);
338         if (!topic) {
339                 return NULL;
340         }
341
342         topic->name = ast_strdup(name);
343         res |= AST_VECTOR_INIT(&topic->subscribers, INITIAL_SUBSCRIBERS_MAX);
344         res |= AST_VECTOR_INIT(&topic->upstream_topics, 0);
345         if (!topic->name || res) {
346                 ao2_cleanup(topic);
347                 return NULL;
348         }
349
350         return topic;
351 }
352
353 const char *stasis_topic_name(const struct stasis_topic *topic)
354 {
355         return topic->name;
356 }
357
358 /*! \internal */
359 struct stasis_subscription {
360         /*! Unique ID for this subscription */
361         char uniqueid[AST_UUID_STR_LEN];
362         /*! Topic subscribed to. */
363         struct stasis_topic *topic;
364         /*! Mailbox for processing incoming messages. */
365         struct ast_taskprocessor *mailbox;
366         /*! Callback function for incoming message processing. */
367         stasis_subscription_cb callback;
368         /*! Data pointer to be handed to the callback. */
369         void *data;
370
371         /*! Condition for joining with subscription. */
372         ast_cond_t join_cond;
373         /*! Flag set when final message for sub has been received.
374          *  Be sure join_lock is held before reading/setting. */
375         int final_message_rxed;
376         /*! Flag set when final message for sub has been processed.
377          *  Be sure join_lock is held before reading/setting. */
378         int final_message_processed;
379 };
380
381 static void subscription_dtor(void *obj)
382 {
383         struct stasis_subscription *sub = obj;
384
385         /* Subscriptions need to be manually unsubscribed before destruction
386          * b/c there's a cyclic reference between topics and subscriptions */
387         ast_assert(!stasis_subscription_is_subscribed(sub));
388         /* If there are any messages in flight to this subscription; that would
389          * be bad. */
390         ast_assert(stasis_subscription_is_done(sub));
391
392         ao2_cleanup(sub->topic);
393         sub->topic = NULL;
394         ast_taskprocessor_unreference(sub->mailbox);
395         sub->mailbox = NULL;
396         ast_cond_destroy(&sub->join_cond);
397 }
398
399 /*!
400  * \brief Invoke the subscription's callback.
401  * \param sub Subscription to invoke.
402  * \param topic Topic message was published to.
403  * \param message Message to send.
404  */
405 static void subscription_invoke(struct stasis_subscription *sub,
406                                   struct stasis_message *message)
407 {
408         /* Notify that the final message has been received */
409         if (stasis_subscription_final_message(sub, message)) {
410                 SCOPED_AO2LOCK(lock, sub);
411
412                 sub->final_message_rxed = 1;
413                 ast_cond_signal(&sub->join_cond);
414         }
415
416         /* Since sub is mostly immutable, no need to lock sub */
417         sub->callback(sub->data, sub, message);
418
419         /* Notify that the final message has been processed */
420         if (stasis_subscription_final_message(sub, message)) {
421                 SCOPED_AO2LOCK(lock, sub);
422
423                 sub->final_message_processed = 1;
424                 ast_cond_signal(&sub->join_cond);
425         }
426 }
427
428 static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
429 static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
430
431 struct stasis_subscription *internal_stasis_subscribe(
432         struct stasis_topic *topic,
433         stasis_subscription_cb callback,
434         void *data,
435         int needs_mailbox)
436 {
437         RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup);
438
439         if (!topic) {
440                 return NULL;
441         }
442
443         /* The ao2 lock is used for join_cond. */
444         sub = ao2_t_alloc(sizeof(*sub), subscription_dtor, topic->name);
445         if (!sub) {
446                 return NULL;
447         }
448
449         ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid));
450
451         if (needs_mailbox) {
452                 /* With a small number of subscribers, a thread-per-sub is
453                  * acceptable. If our usage changes so that we have larger
454                  * numbers of subscribers, we'll probably want to consider
455                  * a threadpool. We had that originally, but with so few
456                  * subscribers it was actually a performance loss instead of
457                  * a gain.
458                  */
459                 sub->mailbox = ast_taskprocessor_get(sub->uniqueid,
460                         TPS_REF_DEFAULT);
461                 if (!sub->mailbox) {
462                         return NULL;
463                 }
464                 ast_taskprocessor_set_local(sub->mailbox, sub);
465                 /* Taskprocessor has a reference */
466                 ao2_ref(sub, +1);
467         }
468
469         ao2_ref(topic, +1);
470         sub->topic = topic;
471         sub->callback = callback;
472         sub->data = data;
473         ast_cond_init(&sub->join_cond, NULL);
474
475         if (topic_add_subscription(topic, sub) != 0) {
476                 return NULL;
477         }
478         send_subscription_subscribe(topic, sub);
479
480         ao2_ref(sub, +1);
481         return sub;
482 }
483
484 struct stasis_subscription *stasis_subscribe(
485         struct stasis_topic *topic,
486         stasis_subscription_cb callback,
487         void *data)
488 {
489         return internal_stasis_subscribe(topic, callback, data, 1);
490 }
491
492 static int sub_cleanup(void *data)
493 {
494         struct stasis_subscription *sub = data;
495         ao2_cleanup(sub);
496         return 0;
497 }
498
499 struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
500 {
501         /* The subscription may be the last ref to this topic. Hold
502          * the topic ref open until after the unlock. */
503         RAII_VAR(struct stasis_topic *, topic,
504                 ao2_bump(sub ? sub->topic : NULL), ao2_cleanup);
505
506         if (!sub) {
507                 return NULL;
508         }
509
510         /* We have to remove the subscription first, to ensure the unsubscribe
511          * is the final message */
512         if (topic_remove_subscription(sub->topic, sub) != 0) {
513                 ast_log(LOG_ERROR,
514                         "Internal error: subscription has invalid topic\n");
515                 return NULL;
516         }
517
518         /* Now let everyone know about the unsubscribe */
519         send_subscription_unsubscribe(topic, sub);
520
521         /* When all that's done, remove the ref the mailbox has on the sub */
522         if (sub->mailbox) {
523                 ast_taskprocessor_push(sub->mailbox, sub_cleanup, sub);
524         }
525
526         /* Unsubscribing unrefs the subscription */
527         ao2_cleanup(sub);
528         return NULL;
529 }
530
531 void stasis_subscription_join(struct stasis_subscription *subscription)
532 {
533         if (subscription) {
534                 SCOPED_AO2LOCK(lock, subscription);
535
536                 /* Wait until the processed flag has been set */
537                 while (!subscription->final_message_processed) {
538                         ast_cond_wait(&subscription->join_cond,
539                                 ao2_object_get_lockaddr(subscription));
540                 }
541         }
542 }
543
544 int stasis_subscription_is_done(struct stasis_subscription *subscription)
545 {
546         if (subscription) {
547                 SCOPED_AO2LOCK(lock, subscription);
548
549                 return subscription->final_message_rxed;
550         }
551
552         /* Null subscription is about as done as you can get */
553         return 1;
554 }
555
556 struct stasis_subscription *stasis_unsubscribe_and_join(
557         struct stasis_subscription *subscription)
558 {
559         if (!subscription) {
560                 return NULL;
561         }
562
563         /* Bump refcount to hold it past the unsubscribe */
564         ao2_ref(subscription, +1);
565         stasis_unsubscribe(subscription);
566         stasis_subscription_join(subscription);
567         /* Now decrement the refcount back */
568         ao2_cleanup(subscription);
569         return NULL;
570 }
571
572 int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
573 {
574         if (sub) {
575                 size_t i;
576                 struct stasis_topic *topic = sub->topic;
577                 SCOPED_AO2LOCK(lock_topic, topic);
578
579                 for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
580                         if (AST_VECTOR_GET(&topic->subscribers, i) == sub) {
581                                 return 1;
582                         }
583                 }
584         }
585
586         return 0;
587 }
588
589 const char *stasis_subscription_uniqueid(const struct stasis_subscription *sub)
590 {
591         return sub->uniqueid;
592 }
593
594 int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
595 {
596         struct stasis_subscription_change *change;
597
598         if (stasis_message_type(msg) != stasis_subscription_change_type()) {
599                 return 0;
600         }
601
602         change = stasis_message_data(msg);
603         if (strcmp("Unsubscribe", change->description)) {
604                 return 0;
605         }
606
607         if (strcmp(stasis_subscription_uniqueid(sub), change->uniqueid)) {
608                 return 0;
609         }
610
611         return 1;
612 }
613
614 /*!
615  * \brief Add a subscriber to a topic.
616  * \param topic Topic
617  * \param sub Subscriber
618  * \return 0 on success
619  * \return Non-zero on error
620  */
621 static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
622 {
623         size_t idx;
624         SCOPED_AO2LOCK(lock, topic);
625
626         /* The reference from the topic to the subscription is shared with
627          * the owner of the subscription, which will explicitly unsubscribe
628          * to release it.
629          *
630          * If we bumped the refcount here, the owner would have to unsubscribe
631          * and cleanup, which is a bit awkward. */
632         AST_VECTOR_APPEND(&topic->subscribers, sub);
633
634         for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
635                 topic_add_subscription(
636                         AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
637         }
638
639         return 0;
640 }
641
642 static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
643 {
644         size_t idx;
645         SCOPED_AO2LOCK(lock_topic, topic);
646
647         for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
648                 topic_remove_subscription(
649                         AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
650         }
651
652         return AST_VECTOR_REMOVE_ELEM_UNORDERED(&topic->subscribers, sub,
653                 AST_VECTOR_ELEM_CLEANUP_NOOP);
654 }
655
656 /*!
657  * \internal \brief Dispatch a message to a subscriber asynchronously
658  * \param local \ref ast_taskprocessor_local object
659  * \return 0
660  */
661 static int dispatch_exec_async(struct ast_taskprocessor_local *local)
662 {
663         struct stasis_subscription *sub = local->local_data;
664         struct stasis_message *message = local->data;
665
666         subscription_invoke(sub, message);
667         ao2_cleanup(message);
668
669         return 0;
670 }
671
672 /*!
673  * \internal \brief Data passed to \ref dispatch_exec_sync to synchronize
674  * a published message to a subscriber
675  */
676 struct sync_task_data {
677         ast_mutex_t lock;
678         ast_cond_t cond;
679         int complete;
680         void *task_data;
681 };
682
683 /*!
684  * \internal \brief Dispatch a message to a subscriber synchronously
685  * \param local \ref ast_taskprocessor_local object
686  * \return 0
687  */
688 static int dispatch_exec_sync(struct ast_taskprocessor_local *local)
689 {
690         struct stasis_subscription *sub = local->local_data;
691         struct sync_task_data *std = local->data;
692         struct stasis_message *message = std->task_data;
693
694         subscription_invoke(sub, message);
695         ao2_cleanup(message);
696
697         ast_mutex_lock(&std->lock);
698         std->complete = 1;
699         ast_cond_signal(&std->cond);
700         ast_mutex_unlock(&std->lock);
701
702         return 0;
703 }
704
705 /*!
706  * \internal \brief Dispatch a message to a subscriber
707  * \param sub The subscriber to dispatch to
708  * \param message The message to send
709  * \param synchronous If non-zero, synchronize on the subscriber receiving
710  * the message
711  */
712 static void dispatch_message(struct stasis_subscription *sub,
713         struct stasis_message *message,
714         int synchronous)
715 {
716         if (!sub->mailbox) {
717                 /* Dispatch directly */
718                 subscription_invoke(sub, message);
719                 return;
720         }
721
722         /* Bump the message for the taskprocessor push. This will get de-ref'd
723          * by the task processor callback.
724          */
725         ao2_bump(message);
726         if (!synchronous) {
727                 if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec_async, message)) {
728                         /* Push failed; ugh. */
729                         ast_log(LOG_ERROR, "Dropping async dispatch\n");
730                         ao2_cleanup(message);
731                 }
732         } else {
733                 struct sync_task_data std;
734
735                 ast_mutex_init(&std.lock);
736                 ast_cond_init(&std.cond, NULL);
737                 std.complete = 0;
738                 std.task_data = message;
739
740                 if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec_sync, &std)) {
741                         /* Push failed; ugh. */
742                         ast_log(LOG_ERROR, "Dropping sync dispatch\n");
743                         ao2_cleanup(message);
744                         ast_mutex_destroy(&std.lock);
745                         ast_cond_destroy(&std.cond);
746                         return;
747                 }
748
749                 ast_mutex_lock(&std.lock);
750                 while (!std.complete) {
751                         ast_cond_wait(&std.cond, &std.lock);
752                 }
753                 ast_mutex_unlock(&std.lock);
754
755                 ast_mutex_destroy(&std.lock);
756                 ast_cond_destroy(&std.cond);
757         }
758 }
759
760 /*!
761  * \internal \brief Publish a message to a topic's subscribers
762  * \brief topic The topic to publish to
763  * \brief message The message to publish
764  * \brief sync_sub An optional subscriber of the topic to publish synchronously
765  * to
766  */
767 static void publish_msg(struct stasis_topic *topic,
768         struct stasis_message *message, struct stasis_subscription *sync_sub)
769 {
770         size_t i;
771
772         ast_assert(topic != NULL);
773         ast_assert(message != NULL);
774
775         /*
776          * The topic may be unref'ed by the subscription invocation.
777          * Make sure we hold onto a reference while dispatching.
778          */
779         ao2_ref(topic, +1);
780         ao2_lock(topic);
781         for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
782                 struct stasis_subscription *sub = AST_VECTOR_GET(&topic->subscribers, i);
783
784                 ast_assert(sub != NULL);
785
786                 dispatch_message(sub, message, (sub == sync_sub));
787         }
788         ao2_unlock(topic);
789         ao2_ref(topic, -1);
790 }
791
792 void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
793 {
794         publish_msg(topic, message, NULL);
795 }
796
797 void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message)
798 {
799         ast_assert(sub != NULL);
800
801         publish_msg(sub->topic, message, sub);
802 }
803
804 /*!
805  * \brief Forwarding information
806  *
807  * Any message posted to \a from_topic is forwarded to \a to_topic.
808  *
809  * In cases where both the \a from_topic and \a to_topic need to be locked,
810  * always lock the \a to_topic first, then the \a from_topic. Lest you deadlock.
811  */
812 struct stasis_forward {
813         /*! Originating topic */
814         struct stasis_topic *from_topic;
815         /*! Destination topic */
816         struct stasis_topic *to_topic;
817 };
818
819 static void forward_dtor(void *obj)
820 {
821         struct stasis_forward *forward = obj;
822
823         ao2_cleanup(forward->from_topic);
824         forward->from_topic = NULL;
825         ao2_cleanup(forward->to_topic);
826         forward->to_topic = NULL;
827 }
828
829 struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward)
830 {
831         int idx;
832         struct stasis_topic *from;
833         struct stasis_topic *to;
834
835         if (!forward) {
836                 return NULL;
837         }
838
839         from = forward->from_topic;
840         to = forward->to_topic;
841
842         if (from && to) {
843                 topic_lock_both(to, from);
844                 AST_VECTOR_REMOVE_ELEM_UNORDERED(&to->upstream_topics, from,
845                         AST_VECTOR_ELEM_CLEANUP_NOOP);
846
847                 for (idx = 0; idx < AST_VECTOR_SIZE(&to->subscribers); ++idx) {
848                         topic_remove_subscription(from, AST_VECTOR_GET(&to->subscribers, idx));
849                 }
850                 ao2_unlock(from);
851                 ao2_unlock(to);
852         }
853
854         ao2_cleanup(forward);
855
856         return NULL;
857 }
858
859 struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic,
860         struct stasis_topic *to_topic)
861 {
862         int res;
863         size_t idx;
864         RAII_VAR(struct stasis_forward *, forward, NULL, ao2_cleanup);
865
866         if (!from_topic || !to_topic) {
867                 return NULL;
868         }
869
870         forward = ao2_alloc_options(sizeof(*forward), forward_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
871         if (!forward) {
872                 return NULL;
873         }
874
875         /* Forwards to ourselves are implicit. */
876         if (to_topic == from_topic) {
877                 return ao2_bump(forward);
878         }
879
880         forward->from_topic = ao2_bump(from_topic);
881         forward->to_topic = ao2_bump(to_topic);
882
883         topic_lock_both(to_topic, from_topic);
884         res = AST_VECTOR_APPEND(&to_topic->upstream_topics, from_topic);
885         if (res != 0) {
886                 ao2_unlock(from_topic);
887                 ao2_unlock(to_topic);
888                 return NULL;
889         }
890
891         for (idx = 0; idx < AST_VECTOR_SIZE(&to_topic->subscribers); ++idx) {
892                 topic_add_subscription(from_topic, AST_VECTOR_GET(&to_topic->subscribers, idx));
893         }
894         ao2_unlock(from_topic);
895         ao2_unlock(to_topic);
896
897         return ao2_bump(forward);
898 }
899
900 static void subscription_change_dtor(void *obj)
901 {
902         struct stasis_subscription_change *change = obj;
903
904         ast_string_field_free_memory(change);
905         ao2_cleanup(change->topic);
906 }
907
908 static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, const char *uniqueid, const char *description)
909 {
910         struct stasis_subscription_change *change;
911
912         change = ao2_alloc(sizeof(struct stasis_subscription_change), subscription_change_dtor);
913         if (!change || ast_string_field_init(change, 128)) {
914                 ao2_cleanup(change);
915                 return NULL;
916         }
917
918         ast_string_field_set(change, uniqueid, uniqueid);
919         ast_string_field_set(change, description, description);
920         ao2_ref(topic, +1);
921         change->topic = topic;
922
923         return change;
924 }
925
926 static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
927 {
928         struct stasis_subscription_change *change;
929         struct stasis_message *msg;
930
931         /* This assumes that we have already unsubscribed */
932         ast_assert(stasis_subscription_is_subscribed(sub));
933
934         if (!stasis_subscription_change_type()) {
935                 return;
936         }
937
938         change = subscription_change_alloc(topic, sub->uniqueid, "Subscribe");
939         if (!change) {
940                 return;
941         }
942
943         msg = stasis_message_create(stasis_subscription_change_type(), change);
944         if (!msg) {
945                 ao2_cleanup(change);
946                 return;
947         }
948
949         stasis_publish(topic, msg);
950         ao2_cleanup(msg);
951         ao2_cleanup(change);
952 }
953
954 static void send_subscription_unsubscribe(struct stasis_topic *topic,
955         struct stasis_subscription *sub)
956 {
957         struct stasis_subscription_change *change;
958         struct stasis_message *msg;
959
960         /* This assumes that we have already unsubscribed */
961         ast_assert(!stasis_subscription_is_subscribed(sub));
962
963         if (!stasis_subscription_change_type()) {
964                 return;
965         }
966
967         change = subscription_change_alloc(topic, sub->uniqueid, "Unsubscribe");
968         if (!change) {
969                 return;
970         }
971
972         msg = stasis_message_create(stasis_subscription_change_type(), change);
973         if (!msg) {
974                 ao2_cleanup(change);
975                 return;
976         }
977
978         stasis_publish(topic, msg);
979
980         /* Now we have to dispatch to the subscription itself */
981         dispatch_message(sub, msg, 0);
982
983         ao2_cleanup(msg);
984         ao2_cleanup(change);
985 }
986
987 struct topic_pool_entry {
988         struct stasis_forward *forward;
989         struct stasis_topic *topic;
990 };
991
992 static void topic_pool_entry_dtor(void *obj)
993 {
994         struct topic_pool_entry *entry = obj;
995
996         entry->forward = stasis_forward_cancel(entry->forward);
997         ao2_cleanup(entry->topic);
998         entry->topic = NULL;
999 }
1000
1001 static struct topic_pool_entry *topic_pool_entry_alloc(void)
1002 {
1003         return ao2_alloc_options(sizeof(struct topic_pool_entry), topic_pool_entry_dtor,
1004                 AO2_ALLOC_OPT_LOCK_NOLOCK);
1005 }
1006
1007 struct stasis_topic_pool {
1008         struct ao2_container *pool_container;
1009         struct stasis_topic *pool_topic;
1010 };
1011
1012 static void topic_pool_dtor(void *obj)
1013 {
1014         struct stasis_topic_pool *pool = obj;
1015
1016         ao2_cleanup(pool->pool_container);
1017         pool->pool_container = NULL;
1018         ao2_cleanup(pool->pool_topic);
1019         pool->pool_topic = NULL;
1020 }
1021
1022 static int topic_pool_entry_hash(const void *obj, const int flags)
1023 {
1024         const struct topic_pool_entry *object;
1025         const char *key;
1026
1027         switch (flags & OBJ_SEARCH_MASK) {
1028         case OBJ_SEARCH_KEY:
1029                 key = obj;
1030                 break;
1031         case OBJ_SEARCH_OBJECT:
1032                 object = obj;
1033                 key = stasis_topic_name(object->topic);
1034                 break;
1035         default:
1036                 /* Hash can only work on something with a full key. */
1037                 ast_assert(0);
1038                 return 0;
1039         }
1040         return ast_str_case_hash(key);
1041 }
1042
1043 static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
1044 {
1045         const struct topic_pool_entry *object_left = obj;
1046         const struct topic_pool_entry *object_right = arg;
1047         const char *right_key = arg;
1048         int cmp;
1049
1050         switch (flags & OBJ_SEARCH_MASK) {
1051         case OBJ_SEARCH_OBJECT:
1052                 right_key = stasis_topic_name(object_right->topic);
1053                 /* Fall through */
1054         case OBJ_SEARCH_KEY:
1055                 cmp = strcasecmp(stasis_topic_name(object_left->topic), right_key);
1056                 break;
1057         case OBJ_SEARCH_PARTIAL_KEY:
1058                 /* Not supported by container */
1059                 ast_assert(0);
1060                 cmp = -1;
1061                 break;
1062         default:
1063                 /*
1064                  * What arg points to is specific to this traversal callback
1065                  * and has no special meaning to astobj2.
1066                  */
1067                 cmp = 0;
1068                 break;
1069         }
1070         if (cmp) {
1071                 return 0;
1072         }
1073         /*
1074          * At this point the traversal callback is identical to a sorted
1075          * container.
1076          */
1077         return CMP_MATCH;
1078 }
1079
1080 struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic)
1081 {
1082         struct stasis_topic_pool *pool;
1083
1084         pool = ao2_alloc_options(sizeof(*pool), topic_pool_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
1085         if (!pool) {
1086                 return NULL;
1087         }
1088
1089         pool->pool_container = ao2_container_alloc(TOPIC_POOL_BUCKETS,
1090                 topic_pool_entry_hash, topic_pool_entry_cmp);
1091         if (!pool->pool_container) {
1092                 ao2_cleanup(pool);
1093                 return NULL;
1094         }
1095         ao2_ref(pooled_topic, +1);
1096         pool->pool_topic = pooled_topic;
1097
1098         return pool;
1099 }
1100
1101 struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
1102 {
1103         RAII_VAR(struct topic_pool_entry *, topic_pool_entry, NULL, ao2_cleanup);
1104         SCOPED_AO2LOCK(topic_container_lock, pool->pool_container);
1105
1106         topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1107         if (topic_pool_entry) {
1108                 return topic_pool_entry->topic;
1109         }
1110
1111         topic_pool_entry = topic_pool_entry_alloc();
1112         if (!topic_pool_entry) {
1113                 return NULL;
1114         }
1115
1116         topic_pool_entry->topic = stasis_topic_create(topic_name);
1117         if (!topic_pool_entry->topic) {
1118                 return NULL;
1119         }
1120
1121         topic_pool_entry->forward = stasis_forward_all(topic_pool_entry->topic, pool->pool_topic);
1122         if (!topic_pool_entry->forward) {
1123                 return NULL;
1124         }
1125
1126         if (!ao2_link_flags(pool->pool_container, topic_pool_entry, OBJ_NOLOCK)) {
1127                 return NULL;
1128         }
1129
1130         return topic_pool_entry->topic;
1131 }
1132
1133 void stasis_log_bad_type_access(const char *name)
1134 {
1135         ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name);
1136 }
1137
1138 /*! \brief A multi object blob data structure to carry user event stasis messages */
1139 struct ast_multi_object_blob {
1140         struct ast_json *blob;                             /*< A blob of JSON data */
1141         AST_VECTOR(, void *) snapshots[STASIS_UMOS_MAX];   /*< Vector of snapshots for each type */
1142 };
1143
1144 /*!
1145  * \internal
1146  * \brief Destructor for \ref ast_multi_object_blob objects
1147  */
1148 static void multi_object_blob_dtor(void *obj)
1149 {
1150         struct ast_multi_object_blob *multi = obj;
1151         int type;
1152         int i;
1153
1154         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1155                 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
1156                         ao2_cleanup(AST_VECTOR_GET(&multi->snapshots[type], i));
1157                 }
1158                 AST_VECTOR_FREE(&multi->snapshots[type]);
1159         }
1160         ast_json_unref(multi->blob);
1161 }
1162
1163 /*! \brief Create a stasis user event multi object blob */
1164 struct ast_multi_object_blob *ast_multi_object_blob_create(struct ast_json *blob)
1165 {
1166         int type;
1167         RAII_VAR(struct ast_multi_object_blob *, multi,
1168                         ao2_alloc(sizeof(*multi), multi_object_blob_dtor),
1169                         ao2_cleanup);
1170
1171         ast_assert(blob != NULL);
1172
1173         if (!multi) {
1174                 return NULL;
1175         }
1176
1177         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1178                 if (AST_VECTOR_INIT(&multi->snapshots[type], 0)) {
1179                         return NULL;
1180                 }
1181         }
1182
1183         multi->blob = ast_json_ref(blob);
1184
1185         ao2_ref(multi, +1);
1186         return multi;
1187 }
1188
1189 /*! \brief Add an object (snapshot) to the blob */
1190 void ast_multi_object_blob_add(struct ast_multi_object_blob *multi,
1191         enum stasis_user_multi_object_snapshot_type type, void *object)
1192 {
1193         if (!multi || !object) {
1194                 return;
1195         }
1196         AST_VECTOR_APPEND(&multi->snapshots[type],object);
1197 }
1198
1199 /*! \brief Publish single channel user event (for app_userevent compatibility) */
1200 void ast_multi_object_blob_single_channel_publish(struct ast_channel *chan,
1201         struct stasis_message_type *type, struct ast_json *blob)
1202 {
1203         RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
1204         RAII_VAR(struct ast_channel_snapshot *, channel_snapshot, NULL, ao2_cleanup);
1205         RAII_VAR(struct ast_multi_object_blob *, multi, NULL, ao2_cleanup);
1206
1207         if (!type) {
1208                 return;
1209         }
1210
1211         multi = ast_multi_object_blob_create(blob);
1212         if (!multi) {
1213                 return;
1214         }
1215
1216         channel_snapshot = ast_channel_snapshot_create(chan);
1217         ao2_ref(channel_snapshot, +1);
1218         ast_multi_object_blob_add(multi, STASIS_UMOS_CHANNEL, channel_snapshot);
1219
1220         message = stasis_message_create(type, multi);
1221         if (message) {
1222                 /* app_userevent still publishes to channel */
1223                 stasis_publish(ast_channel_topic(chan), message);
1224         }
1225 }
1226
1227 /*! \internal \brief convert multi object blob to ari json */
1228 static struct ast_json *multi_user_event_to_json(
1229         struct stasis_message *message,
1230         const struct stasis_message_sanitizer *sanitize)
1231 {
1232         RAII_VAR(struct ast_json *, out, NULL, ast_json_unref);
1233         struct ast_multi_object_blob *multi = stasis_message_data(message);
1234         struct ast_json *blob = multi->blob;
1235         const struct timeval *tv = stasis_message_timestamp(message);
1236         enum stasis_user_multi_object_snapshot_type type;
1237         int i;
1238
1239         out = ast_json_object_create();
1240         if (!out) {
1241                 return NULL;
1242         }
1243
1244         ast_json_object_set(out, "type", ast_json_string_create("ChannelUserevent"));
1245         ast_json_object_set(out, "timestamp", ast_json_timeval(*tv, NULL));
1246         ast_json_object_set(out, "eventname", ast_json_ref(ast_json_object_get(blob, "eventname")));
1247         ast_json_object_set(out, "userevent", ast_json_ref(blob)); /* eventname gets duplicated, that's ok */
1248
1249         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1250                 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
1251                         struct ast_json *json_object = NULL;
1252                         char *name = NULL;
1253                         void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
1254
1255                         switch (type) {
1256                         case STASIS_UMOS_CHANNEL:
1257                                 json_object = ast_channel_snapshot_to_json(snapshot, sanitize);
1258                                 name = "channel";
1259                                 break;
1260                         case STASIS_UMOS_BRIDGE:
1261                                 json_object = ast_bridge_snapshot_to_json(snapshot, sanitize);
1262                                 name = "bridge";
1263                                 break;
1264                         case STASIS_UMOS_ENDPOINT:
1265                                 json_object = ast_endpoint_snapshot_to_json(snapshot, sanitize);
1266                                 name = "endpoint";
1267                                 break;
1268                         }
1269                         if (json_object) {
1270                                 ast_json_object_set(out, name, json_object);
1271                         }
1272                 }
1273         }
1274         return ast_json_ref(out);
1275 }
1276
1277 /*! \internal \brief convert multi object blob to ami string */
1278 static struct ast_str *multi_object_blob_to_ami(void *obj)
1279 {
1280         struct ast_str *ami_str=ast_str_create(1024);
1281         struct ast_str *ami_snapshot;
1282         const struct ast_multi_object_blob *multi = obj;
1283         enum stasis_user_multi_object_snapshot_type type;
1284         int i;
1285
1286         if (!ami_str) {
1287                 return NULL;
1288         }
1289         if (!multi) {
1290                 ast_free(ami_str);
1291                 return NULL;
1292         }
1293
1294         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1295                 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
1296                         char *name = "";
1297                         void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
1298                         ami_snapshot = NULL;
1299
1300                         if (i > 0) {
1301                                 ast_asprintf(&name, "%d", i + 1);
1302                         }
1303
1304                         switch (type) {
1305                         case STASIS_UMOS_CHANNEL:
1306                                 ami_snapshot = ast_manager_build_channel_state_string_prefix(snapshot, name);
1307                                 break;
1308
1309                         case STASIS_UMOS_BRIDGE:
1310                                 ami_snapshot = ast_manager_build_bridge_state_string_prefix(snapshot, name);
1311                                 break;
1312
1313                         case STASIS_UMOS_ENDPOINT:
1314                                 /* currently not sending endpoint snapshots to AMI */
1315                                 break;
1316                         }
1317                         if (ami_snapshot) {
1318                                 ast_str_append(&ami_str, 0, "%s", ast_str_buffer(ami_snapshot));
1319                                 ast_free(ami_snapshot);
1320                         }
1321                 }
1322         }
1323
1324         return ami_str;
1325 }
1326
1327 /*! \internal \brief Callback to pass only user defined parameters from blob */
1328 static int userevent_exclusion_cb(const char *key)
1329 {
1330         if (!strcmp("eventname", key)) {
1331                 return 1;
1332         }
1333         return 0;
1334 }
1335
1336 static struct ast_manager_event_blob *multi_user_event_to_ami(
1337         struct stasis_message *message)
1338 {
1339         RAII_VAR(struct ast_str *, object_string, NULL, ast_free);
1340         RAII_VAR(struct ast_str *, body, NULL, ast_free);
1341         struct ast_multi_object_blob *multi = stasis_message_data(message);
1342         const char *eventname;
1343
1344         eventname = ast_json_string_get(ast_json_object_get(multi->blob, "eventname"));
1345         body = ast_manager_str_from_json_object(multi->blob, userevent_exclusion_cb);
1346         object_string = multi_object_blob_to_ami(multi);
1347         if (!object_string || !body) {
1348                 return NULL;
1349         }
1350
1351         return ast_manager_event_blob_create(EVENT_FLAG_USER, "UserEvent",
1352                 "%s"
1353                 "UserEvent: %s\r\n"
1354                 "%s",
1355                 ast_str_buffer(object_string),
1356                 eventname,
1357                 ast_str_buffer(body));
1358 }
1359
1360 /*! \brief A structure to hold global configuration-related options */
1361 struct stasis_declined_config {
1362         /*! The list of message types to decline */
1363         struct ao2_container *declined;
1364 };
1365
1366
1367 struct stasis_config {
1368         struct stasis_declined_config *declined_message_types;
1369 };
1370
1371 /*! \brief An aco_type structure to link the "declined_message_types" category to the stasis_declined_config type */
1372 static struct aco_type declined_option = {
1373         .type = ACO_GLOBAL,
1374         .name = "declined_message_types",
1375         .item_offset = offsetof(struct stasis_config, declined_message_types),
1376         .category_match = ACO_WHITELIST,
1377         .category = "^declined_message_types$",
1378 };
1379
1380 struct aco_type *declined_options[] = ACO_TYPES(&declined_option);
1381
1382 struct aco_file stasis_conf = {
1383         .filename = "stasis.conf",
1384         .types = ACO_TYPES(&declined_option),
1385 };
1386
1387 /*! \brief A global object container that will contain the stasis_config that gets swapped out on reloads */
1388 static AO2_GLOBAL_OBJ_STATIC(globals);
1389
1390 static void *stasis_config_alloc(void);
1391
1392 /*! \brief Register information about the configs being processed by this module */
1393 CONFIG_INFO_CORE("stasis", cfg_info, globals, stasis_config_alloc,
1394         .files = ACO_FILES(&stasis_conf),
1395 );
1396
1397 static void stasis_declined_config_destructor(void *obj)
1398 {
1399         struct stasis_declined_config *declined = obj;
1400         ao2_cleanup(declined->declined);
1401 }
1402
1403 static void stasis_config_destructor(void *obj)
1404 {
1405         struct stasis_config *cfg = obj;
1406         ao2_cleanup(cfg->declined_message_types);
1407 }
1408
1409 static void *stasis_config_alloc(void)
1410 {
1411         struct stasis_config *cfg;
1412
1413         if (!(cfg = ao2_alloc(sizeof(*cfg), stasis_config_destructor))) {
1414                 return NULL;
1415         }
1416
1417         /* Allocate/initialize memory */
1418         cfg->declined_message_types = ao2_alloc(sizeof(*cfg->declined_message_types), stasis_declined_config_destructor);
1419         if (!cfg->declined_message_types) {
1420                 goto error;
1421         }
1422
1423         cfg->declined_message_types->declined = ast_str_container_alloc(13);
1424         if (!cfg->declined_message_types->declined) {
1425                 goto error;
1426         }
1427
1428         return cfg;
1429 error:
1430         ao2_ref(cfg, -1);
1431         return NULL;
1432 }
1433
1434 int stasis_message_type_declined(const char *name)
1435 {
1436         RAII_VAR(struct stasis_config *, cfg, ao2_global_obj_ref(globals), ao2_cleanup);
1437         char *name_in_declined;
1438         int res;
1439
1440         if (!cfg || !cfg->declined_message_types) {
1441                 return 0;
1442         }
1443
1444         name_in_declined = ao2_find(cfg->declined_message_types->declined, name, OBJ_SEARCH_KEY);
1445         res = name_in_declined ? 1 : 0;
1446         ao2_cleanup(name_in_declined);
1447         if (res) {
1448                 ast_log(LOG_NOTICE, "Declining to allocate Stasis message type '%s' due to configuration\n", name);
1449         }
1450         return res;
1451 }
1452
1453 static int declined_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
1454 {
1455         struct stasis_declined_config *declined = obj;
1456
1457         if (ast_strlen_zero(var->value)) {
1458                 return 0;
1459         }
1460
1461         if (ast_str_container_add(declined->declined, var->value)) {
1462                 return -1;
1463         }
1464
1465         return 0;
1466 }
1467
1468 /*!
1469  * @{ \brief Define multi user event message type(s).
1470  */
1471
1472 STASIS_MESSAGE_TYPE_DEFN(ast_multi_user_event_type,
1473         .to_json = multi_user_event_to_json,
1474         .to_ami = multi_user_event_to_ami,
1475         );
1476
1477 /*! @} */
1478
1479 /*! \brief Cleanup function for graceful shutdowns */
1480 static void stasis_cleanup(void)
1481 {
1482         STASIS_MESSAGE_TYPE_CLEANUP(stasis_subscription_change_type);
1483         STASIS_MESSAGE_TYPE_CLEANUP(ast_multi_user_event_type);
1484         aco_info_destroy(&cfg_info);
1485         ao2_global_obj_release(globals);
1486 }
1487
1488 int stasis_init(void)
1489 {
1490         int cache_init;
1491
1492         /* Be sure the types are cleaned up after the message bus */
1493         ast_register_cleanup(stasis_cleanup);
1494
1495         if (aco_info_init(&cfg_info)) {
1496                 return -1;
1497         }
1498
1499         aco_option_register_custom(&cfg_info, "decline", ACO_EXACT, declined_options, "", declined_handler, 0);
1500
1501         if (aco_process_config(&cfg_info, 0) == ACO_PROCESS_ERROR) {
1502                 RAII_VAR(struct stasis_config *, stasis_cfg, stasis_config_alloc(), ao2_cleanup);
1503
1504                 if (aco_set_defaults(&declined_option, "declined_message_types", stasis_cfg->declined_message_types)) {
1505                         ast_log(LOG_ERROR, "Failed to load stasis.conf and failed to initialize defaults.\n");
1506                         return -1;
1507                 }
1508
1509                 ast_log(LOG_NOTICE, "Could not load stasis config; using defaults\n");
1510                 ao2_global_obj_replace_unref(globals, stasis_cfg);
1511         }
1512
1513         cache_init = stasis_cache_init();
1514         if (cache_init != 0) {
1515                 return -1;
1516         }
1517
1518         if (STASIS_MESSAGE_TYPE_INIT(stasis_subscription_change_type) != 0) {
1519                 return -1;
1520         }
1521         if (STASIS_MESSAGE_TYPE_INIT(ast_multi_user_event_type) != 0) {
1522                 return -1;
1523         }
1524
1525         return 0;
1526 }
1527