Stasis: Only log errors for non-declined types
[asterisk/asterisk.git] / main / stasis.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis Message Bus API.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
33
34 #include "asterisk/astobj2.h"
35 #include "asterisk/stasis_internal.h"
36 #include "asterisk/stasis.h"
37 #include "asterisk/taskprocessor.h"
38 #include "asterisk/utils.h"
39 #include "asterisk/uuid.h"
40 #include "asterisk/vector.h"
41 #include "asterisk/stasis_channels.h"
42 #include "asterisk/stasis_bridges.h"
43 #include "asterisk/stasis_endpoints.h"
44 #include "asterisk/config_options.h"
45
46 /*** DOCUMENTATION
47         <managerEvent language="en_US" name="UserEvent">
48                 <managerEventInstance class="EVENT_FLAG_USER">
49                         <synopsis>A user defined event raised from the dialplan.</synopsis>
50                         <syntax>
51                                 <channel_snapshot/>
52                                 <parameter name="UserEvent">
53                                         <para>The event name, as specified in the dialplan.</para>
54                                 </parameter>
55                         </syntax>
56                         <description>
57                                 <para>Event may contain additional arbitrary parameters in addition to optional bridge and endpoint snapshots.  Multiple snapshots of the same type are prefixed with a numeric value.</para>
58                         </description>
59                         <see-also>
60                                 <ref type="application">UserEvent</ref>
61                         </see-also>
62                 </managerEventInstance>
63         </managerEvent>
64         <configInfo name="stasis" language="en_US">
65                 <configFile name="stasis.conf">
66                         <configObject name="declined_message_types">
67                                 <synopsis>Stasis message types for which to decline creation.</synopsis>
68                                 <configOption name="decline">
69                                         <synopsis>The message type to decline.</synopsis>
70                                         <description>
71                                                 <para>This configuration option defines the name of the Stasis
72                                                 message type that Asterisk is forbidden from creating and can be
73                                                 specified as many times as necessary to achieve the desired result.</para>
74                                                 <enumlist>
75                                                         <enum name="stasis_app_recording_snapshot_type" />
76                                                         <enum name="stasis_app_playback_snapshot_type" />
77                                                         <enum name="stasis_test_message_type" />
78                                                         <enum name="confbridge_start_type" />
79                                                         <enum name="confbridge_end_type" />
80                                                         <enum name="confbridge_join_type" />
81                                                         <enum name="confbridge_leave_type" />
82                                                         <enum name="confbridge_start_record_type" />
83                                                         <enum name="confbridge_stop_record_type" />
84                                                         <enum name="confbridge_mute_type" />
85                                                         <enum name="confbridge_unmute_type" />
86                                                         <enum name="confbridge_talking_type" />
87                                                         <enum name="cel_generic_type" />
88                                                         <enum name="ast_bridge_snapshot_type" />
89                                                         <enum name="ast_bridge_merge_message_type" />
90                                                         <enum name="ast_channel_entered_bridge_type" />
91                                                         <enum name="ast_channel_left_bridge_type" />
92                                                         <enum name="ast_blind_transfer_type" />
93                                                         <enum name="ast_attended_transfer_type" />
94                                                         <enum name="ast_endpoint_snapshot_type" />
95                                                         <enum name="ast_endpoint_state_type" />
96                                                         <enum name="ast_device_state_message_type" />
97                                                         <enum name="ast_test_suite_message_type" />
98                                                         <enum name="ast_mwi_state_type" />
99                                                         <enum name="ast_mwi_vm_app_type" />
100                                                         <enum name="ast_format_register_type" />
101                                                         <enum name="ast_format_unregister_type" />
102                                                         <enum name="ast_manager_get_generic_type" />
103                                                         <enum name="ast_parked_call_type" />
104                                                         <enum name="ast_channel_snapshot_type" />
105                                                         <enum name="ast_channel_dial_type" />
106                                                         <enum name="ast_channel_varset_type" />
107                                                         <enum name="ast_channel_hangup_request_type" />
108                                                         <enum name="ast_channel_dtmf_begin_type" />
109                                                         <enum name="ast_channel_dtmf_end_type" />
110                                                         <enum name="ast_channel_hold_type" />
111                                                         <enum name="ast_channel_unhold_type" />
112                                                         <enum name="ast_channel_chanspy_start_type" />
113                                                         <enum name="ast_channel_chanspy_stop_type" />
114                                                         <enum name="ast_channel_fax_type" />
115                                                         <enum name="ast_channel_hangup_handler_type" />
116                                                         <enum name="ast_channel_moh_start_type" />
117                                                         <enum name="ast_channel_moh_stop_type" />
118                                                         <enum name="ast_channel_monitor_start_type" />
119                                                         <enum name="ast_channel_monitor_stop_type" />
120                                                         <enum name="ast_channel_agent_login_type" />
121                                                         <enum name="ast_channel_agent_logoff_type" />
122                                                         <enum name="ast_channel_talking_start" />
123                                                         <enum name="ast_channel_talking_stop" />
124                                                         <enum name="ast_security_event_type" />
125                                                         <enum name="ast_named_acl_change_type" />
126                                                         <enum name="ast_local_bridge_type" />
127                                                         <enum name="ast_local_optimization_begin_type" />
128                                                         <enum name="ast_local_optimization_end_type" />
129                                                         <enum name="stasis_subscription_change_type" />
130                                                         <enum name="ast_multi_user_event_type" />
131                                                         <enum name="stasis_cache_clear_type" />
132                                                         <enum name="stasis_cache_update_type" />
133                                                         <enum name="ast_network_change_type" />
134                                                         <enum name="ast_system_registry_type" />
135                                                         <enum name="ast_cc_available_type" />
136                                                         <enum name="ast_cc_offertimerstart_type" />
137                                                         <enum name="ast_cc_requested_type" />
138                                                         <enum name="ast_cc_requestacknowledged_type" />
139                                                         <enum name="ast_cc_callerstopmonitoring_type" />
140                                                         <enum name="ast_cc_callerstartmonitoring_type" />
141                                                         <enum name="ast_cc_callerrecalling_type" />
142                                                         <enum name="ast_cc_recallcomplete_type" />
143                                                         <enum name="ast_cc_failure_type" />
144                                                         <enum name="ast_cc_monitorfailed_type" />
145                                                         <enum name="ast_presence_state_message_type" />
146                                                         <enum name="ast_rtp_rtcp_sent_type" />
147                                                         <enum name="ast_rtp_rtcp_received_type" />
148                                                         <enum name="ast_call_pickup_type" />
149                                                         <enum name="aoc_s_type" />
150                                                         <enum name="aoc_d_type" />
151                                                         <enum name="aoc_e_type" />
152                                                         <enum name="dahdichannel_type" />
153                                                         <enum name="mcid_type" />
154                                                         <enum name="session_timeout_type" />
155                                                         <enum name="cdr_read_message_type" />
156                                                         <enum name="cdr_write_message_type" />
157                                                         <enum name="cdr_prop_write_message_type" />
158                                                         <enum name="corosync_ping_message_type" />
159                                                         <enum name="agi_exec_start_type" />
160                                                         <enum name="agi_exec_end_type" />
161                                                         <enum name="agi_async_start_type" />
162                                                         <enum name="agi_async_exec_type" />
163                                                         <enum name="agi_async_end_type" />
164                                                         <enum name="queue_caller_join_type" />
165                                                         <enum name="queue_caller_leave_type" />
166                                                         <enum name="queue_caller_abandon_type" />
167                                                         <enum name="queue_member_status_type" />
168                                                         <enum name="queue_member_added_type" />
169                                                         <enum name="queue_member_removed_type" />
170                                                         <enum name="queue_member_pause_type" />
171                                                         <enum name="queue_member_penalty_type" />
172                                                         <enum name="queue_member_ringinuse_type" />
173                                                         <enum name="queue_agent_called_type" />
174                                                         <enum name="queue_agent_connect_type" />
175                                                         <enum name="queue_agent_complete_type" />
176                                                         <enum name="queue_agent_dump_type" />
177                                                         <enum name="queue_agent_ringnoanswer_type" />
178                                                         <enum name="meetme_join_type" />
179                                                         <enum name="meetme_leave_type" />
180                                                         <enum name="meetme_end_type" />
181                                                         <enum name="meetme_mute_type" />
182                                                         <enum name="meetme_talking_type" />
183                                                         <enum name="meetme_talk_request_type" />
184                                                         <enum name="appcdr_message_type" />
185                                                         <enum name="forkcdr_message_type" />
186                                                         <enum name="cdr_sync_message_type" />
187                                                 </enumlist>
188                                         </description>
189                                 </configOption>
190                         </configObject>
191                 </configFile>
192         </configInfo>
193 ***/
194
195 /*!
196  * \page stasis-impl Stasis Implementation Notes
197  *
198  * \par Reference counting
199  *
200  * Stasis introduces a number of objects, which are tightly related to one
201  * another. Because we rely on ref-counting for memory management, understanding
202  * these relationships is important to understanding this code.
203  *
204  * \code{.txt}
205  *
206  *   stasis_topic <----> stasis_subscription
207  *             ^          ^
208  *              \        /
209  *               \      /
210  *               dispatch
211  *                  |
212  *                  |
213  *                  v
214  *            stasis_message
215  *                  |
216  *                  |
217  *                  v
218  *          stasis_message_type
219  *
220  * \endcode
221  *
222  * The most troubling thing in this chart is the cyclic reference between
223  * stasis_topic and stasis_subscription. This is both unfortunate, and
224  * necessary. Topics need the subscription in order to dispatch messages;
225  * subscriptions need the topics to unsubscribe and check subscription status.
226  *
227  * The cycle is broken by stasis_unsubscribe(). The unsubscribe will remove the
228  * topic's reference to a subscription. When the subcription is destroyed, it
229  * will remove its reference to the topic.
230  *
231  * This means that until a subscription has be explicitly unsubscribed, it will
232  * not be destroyed. Neither will a topic be destroyed while it has subscribers.
233  * The destructors of both have assertions regarding this to catch ref-counting
234  * problems where a subscription or topic has had an extra ao2_cleanup().
235  *
236  * The \ref dispatch object is a transient object, which is posted to a
237  * subscription's taskprocessor to send a message to the subscriber. They have
238  * short life cycles, allocated on one thread, destroyed on another.
239  *
240  * During shutdown, or the deletion of a domain object, there are a flurry of
241  * ao2_cleanup()s on subscriptions and topics, as the final in-flight messages
242  * are processed. Any one of these cleanups could be the one to actually destroy
243  * a given object, so care must be taken to ensure that an object isn't
244  * referenced after an ao2_cleanup(). This includes the implicit ao2_unlock()
245  * that might happen when a RAII_VAR() goes out of scope.
246  *
247  * \par Typical life cycles
248  *
249  *  \li stasis_topic - There are several topics which live for the duration of
250  *      the Asterisk process (ast_channel_topic_all(), etc.) but most of these
251  *      are actually fed by shorter-lived topics whose lifetime is associated
252  *      with some domain object (like ast_channel_topic() for a given
253  *      ast_channel).
254  *
255  *  \li stasis_subscription - Subscriptions have a similar mix of lifetimes as
256  *      topics, for similar reasons.
257  *
258  *  \li dispatch - Very short lived; just long enough to post a message to a
259  *      subscriber.
260  *
261  *  \li stasis_message - Short to intermediate lifetimes, but that is mostly
262  *      irrelevant. Messages are strictly data and have no behavior associated
263  *      with them, so it doesn't really matter if/when they are destroyed. By
264  *      design, a component could hold a ref to a message forever without any
265  *      ill consequences (aside from consuming more memory).
266  *
267  *  \li stasis_message_type - Long life cycles, typically only destroyed on
268  *      module unloading or _clean_ process exit.
269  *
270  * \par Subscriber shutdown sequencing
271  *
272  * Subscribers are sensitive to shutdown sequencing, specifically in how the
273  * reference message types. This is fully detailed on the wiki at
274  * https://wiki.asterisk.org/wiki/x/K4BqAQ.
275  *
276  * In short, the lifetime of the \a data (and \a callback, if in a module) must
277  * be held until the stasis_subscription_final_message() has been received.
278  * Depending on the structure of the subscriber code, this can be handled by
279  * using stasis_subscription_final_message() to free resources on the final
280  * message, or using stasis_subscription_join()/stasis_unsubscribe_and_join() to
281  * block until the unsubscribe has completed.
282  */
283
284 /*! Initial size of the subscribers list. */
285 #define INITIAL_SUBSCRIBERS_MAX 4
286
287 /*! The number of buckets to use for topic pools */
288 #define TOPIC_POOL_BUCKETS 57
289
290 STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type);
291
292 /*! \internal */
293 struct stasis_topic {
294         char *name;
295         /*! Variable length array of the subscribers */
296         AST_VECTOR(, struct stasis_subscription *) subscribers;
297
298         /*! Topics forwarding into this topic */
299         AST_VECTOR(, struct stasis_topic *) upstream_topics;
300 };
301
302 /* Forward declarations for the tightly-coupled subscription object */
303 static int topic_add_subscription(struct stasis_topic *topic,
304         struct stasis_subscription *sub);
305
306 static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
307
308 /*! \brief Lock two topics. */
309 #define topic_lock_both(topic1, topic2) \
310         do { \
311                 ao2_lock(topic1); \
312                 while (ao2_trylock(topic2)) { \
313                         AO2_DEADLOCK_AVOIDANCE(topic1); \
314                 } \
315         } while (0)
316
317 static void topic_dtor(void *obj)
318 {
319         struct stasis_topic *topic = obj;
320
321         /* Subscribers hold a reference to topics, so they should all be
322          * unsubscribed before we get here. */
323         ast_assert(AST_VECTOR_SIZE(&topic->subscribers) == 0);
324
325         ast_free(topic->name);
326         topic->name = NULL;
327
328         AST_VECTOR_FREE(&topic->subscribers);
329         AST_VECTOR_FREE(&topic->upstream_topics);
330 }
331
332 struct stasis_topic *stasis_topic_create(const char *name)
333 {
334         struct stasis_topic *topic;
335         int res = 0;
336
337         topic = ao2_t_alloc(sizeof(*topic), topic_dtor, name);
338         if (!topic) {
339                 return NULL;
340         }
341
342         topic->name = ast_strdup(name);
343         res |= AST_VECTOR_INIT(&topic->subscribers, INITIAL_SUBSCRIBERS_MAX);
344         res |= AST_VECTOR_INIT(&topic->upstream_topics, 0);
345         if (!topic->name || res) {
346                 ao2_cleanup(topic);
347                 return NULL;
348         }
349
350         return topic;
351 }
352
353 const char *stasis_topic_name(const struct stasis_topic *topic)
354 {
355         return topic->name;
356 }
357
358 /*! \internal */
359 struct stasis_subscription {
360         /*! Unique ID for this subscription */
361         char uniqueid[AST_UUID_STR_LEN];
362         /*! Topic subscribed to. */
363         struct stasis_topic *topic;
364         /*! Mailbox for processing incoming messages. */
365         struct ast_taskprocessor *mailbox;
366         /*! Callback function for incoming message processing. */
367         stasis_subscription_cb callback;
368         /*! Data pointer to be handed to the callback. */
369         void *data;
370
371         /*! Condition for joining with subscription. */
372         ast_cond_t join_cond;
373         /*! Flag set when final message for sub has been received.
374          *  Be sure join_lock is held before reading/setting. */
375         int final_message_rxed;
376         /*! Flag set when final message for sub has been processed.
377          *  Be sure join_lock is held before reading/setting. */
378         int final_message_processed;
379 };
380
381 static void subscription_dtor(void *obj)
382 {
383         struct stasis_subscription *sub = obj;
384
385         /* Subscriptions need to be manually unsubscribed before destruction
386          * b/c there's a cyclic reference between topics and subscriptions */
387         ast_assert(!stasis_subscription_is_subscribed(sub));
388         /* If there are any messages in flight to this subscription; that would
389          * be bad. */
390         ast_assert(stasis_subscription_is_done(sub));
391
392         ao2_cleanup(sub->topic);
393         sub->topic = NULL;
394         ast_taskprocessor_unreference(sub->mailbox);
395         sub->mailbox = NULL;
396         ast_cond_destroy(&sub->join_cond);
397 }
398
399 /*!
400  * \brief Invoke the subscription's callback.
401  * \param sub Subscription to invoke.
402  * \param topic Topic message was published to.
403  * \param message Message to send.
404  */
405 static void subscription_invoke(struct stasis_subscription *sub,
406                                   struct stasis_message *message)
407 {
408         /* Notify that the final message has been received */
409         if (stasis_subscription_final_message(sub, message)) {
410                 SCOPED_AO2LOCK(lock, sub);
411
412                 sub->final_message_rxed = 1;
413                 ast_cond_signal(&sub->join_cond);
414         }
415
416         /* Since sub is mostly immutable, no need to lock sub */
417         sub->callback(sub->data, sub, message);
418
419         /* Notify that the final message has been processed */
420         if (stasis_subscription_final_message(sub, message)) {
421                 SCOPED_AO2LOCK(lock, sub);
422
423                 sub->final_message_processed = 1;
424                 ast_cond_signal(&sub->join_cond);
425         }
426 }
427
428 static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
429 static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
430
431 struct stasis_subscription *internal_stasis_subscribe(
432         struct stasis_topic *topic,
433         stasis_subscription_cb callback,
434         void *data,
435         int needs_mailbox)
436 {
437         RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup);
438
439         if (!topic) {
440                 return NULL;
441         }
442
443         /* The ao2 lock is used for join_cond. */
444         sub = ao2_t_alloc(sizeof(*sub), subscription_dtor, topic->name);
445         if (!sub) {
446                 return NULL;
447         }
448
449         ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid));
450
451         if (needs_mailbox) {
452                 /* With a small number of subscribers, a thread-per-sub is
453                  * acceptable. If our usage changes so that we have larger
454                  * numbers of subscribers, we'll probably want to consider
455                  * a threadpool. We had that originally, but with so few
456                  * subscribers it was actually a performance loss instead of
457                  * a gain.
458                  */
459                 sub->mailbox = ast_taskprocessor_get(sub->uniqueid,
460                         TPS_REF_DEFAULT);
461                 if (!sub->mailbox) {
462                         return NULL;
463                 }
464                 ast_taskprocessor_set_local(sub->mailbox, sub);
465                 /* Taskprocessor has a reference */
466                 ao2_ref(sub, +1);
467         }
468
469         ao2_ref(topic, +1);
470         sub->topic = topic;
471         sub->callback = callback;
472         sub->data = data;
473         ast_cond_init(&sub->join_cond, NULL);
474
475         if (topic_add_subscription(topic, sub) != 0) {
476                 return NULL;
477         }
478         send_subscription_subscribe(topic, sub);
479
480         ao2_ref(sub, +1);
481         return sub;
482 }
483
484 struct stasis_subscription *stasis_subscribe(
485         struct stasis_topic *topic,
486         stasis_subscription_cb callback,
487         void *data)
488 {
489         return internal_stasis_subscribe(topic, callback, data, 1);
490 }
491
492 static int sub_cleanup(void *data)
493 {
494         struct stasis_subscription *sub = data;
495         ao2_cleanup(sub);
496         return 0;
497 }
498
499 struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
500 {
501         /* The subscription may be the last ref to this topic. Hold
502          * the topic ref open until after the unlock. */
503         RAII_VAR(struct stasis_topic *, topic,
504                 ao2_bump(sub ? sub->topic : NULL), ao2_cleanup);
505
506         if (!sub) {
507                 return NULL;
508         }
509
510         /* We have to remove the subscription first, to ensure the unsubscribe
511          * is the final message */
512         if (topic_remove_subscription(sub->topic, sub) != 0) {
513                 ast_log(LOG_ERROR,
514                         "Internal error: subscription has invalid topic\n");
515                 return NULL;
516         }
517
518         /* Now let everyone know about the unsubscribe */
519         send_subscription_unsubscribe(topic, sub);
520
521         /* When all that's done, remove the ref the mailbox has on the sub */
522         if (sub->mailbox) {
523                 ast_taskprocessor_push(sub->mailbox, sub_cleanup, sub);
524         }
525
526         /* Unsubscribing unrefs the subscription */
527         ao2_cleanup(sub);
528         return NULL;
529 }
530
531 void stasis_subscription_join(struct stasis_subscription *subscription)
532 {
533         if (subscription) {
534                 SCOPED_AO2LOCK(lock, subscription);
535
536                 /* Wait until the processed flag has been set */
537                 while (!subscription->final_message_processed) {
538                         ast_cond_wait(&subscription->join_cond,
539                                 ao2_object_get_lockaddr(subscription));
540                 }
541         }
542 }
543
544 int stasis_subscription_is_done(struct stasis_subscription *subscription)
545 {
546         if (subscription) {
547                 SCOPED_AO2LOCK(lock, subscription);
548
549                 return subscription->final_message_rxed;
550         }
551
552         /* Null subscription is about as done as you can get */
553         return 1;
554 }
555
556 struct stasis_subscription *stasis_unsubscribe_and_join(
557         struct stasis_subscription *subscription)
558 {
559         if (!subscription) {
560                 return NULL;
561         }
562
563         /* Bump refcount to hold it past the unsubscribe */
564         ao2_ref(subscription, +1);
565         stasis_unsubscribe(subscription);
566         stasis_subscription_join(subscription);
567         /* Now decrement the refcount back */
568         ao2_cleanup(subscription);
569         return NULL;
570 }
571
572 int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
573 {
574         if (sub) {
575                 size_t i;
576                 struct stasis_topic *topic = sub->topic;
577                 SCOPED_AO2LOCK(lock_topic, topic);
578
579                 for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
580                         if (AST_VECTOR_GET(&topic->subscribers, i) == sub) {
581                                 return 1;
582                         }
583                 }
584         }
585
586         return 0;
587 }
588
589 const char *stasis_subscription_uniqueid(const struct stasis_subscription *sub)
590 {
591         return sub->uniqueid;
592 }
593
594 int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
595 {
596         struct stasis_subscription_change *change;
597
598         if (stasis_message_type(msg) != stasis_subscription_change_type()) {
599                 return 0;
600         }
601
602         change = stasis_message_data(msg);
603         if (strcmp("Unsubscribe", change->description)) {
604                 return 0;
605         }
606
607         if (strcmp(stasis_subscription_uniqueid(sub), change->uniqueid)) {
608                 return 0;
609         }
610
611         return 1;
612 }
613
614 /*!
615  * \brief Add a subscriber to a topic.
616  * \param topic Topic
617  * \param sub Subscriber
618  * \return 0 on success
619  * \return Non-zero on error
620  */
621 static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
622 {
623         size_t idx;
624         SCOPED_AO2LOCK(lock, topic);
625
626         /* The reference from the topic to the subscription is shared with
627          * the owner of the subscription, which will explicitly unsubscribe
628          * to release it.
629          *
630          * If we bumped the refcount here, the owner would have to unsubscribe
631          * and cleanup, which is a bit awkward. */
632         AST_VECTOR_APPEND(&topic->subscribers, sub);
633
634         for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
635                 topic_add_subscription(
636                         AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
637         }
638
639         return 0;
640 }
641
642 static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
643 {
644         size_t idx;
645         SCOPED_AO2LOCK(lock_topic, topic);
646
647         for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
648                 topic_remove_subscription(
649                         AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
650         }
651
652         return AST_VECTOR_REMOVE_ELEM_UNORDERED(&topic->subscribers, sub,
653                 AST_VECTOR_ELEM_CLEANUP_NOOP);
654 }
655
656 /*!
657  * \internal \brief Dispatch a message to a subscriber asynchronously
658  * \param local \ref ast_taskprocessor_local object
659  * \return 0
660  */
661 static int dispatch_exec_async(struct ast_taskprocessor_local *local)
662 {
663         struct stasis_subscription *sub = local->local_data;
664         struct stasis_message *message = local->data;
665
666         subscription_invoke(sub, message);
667         ao2_cleanup(message);
668
669         return 0;
670 }
671
672 /*!
673  * \internal \brief Data passed to \ref dispatch_exec_sync to synchronize
674  * a published message to a subscriber
675  */
676 struct sync_task_data {
677         ast_mutex_t lock;
678         ast_cond_t cond;
679         int complete;
680         void *task_data;
681 };
682
683 /*!
684  * \internal \brief Dispatch a message to a subscriber synchronously
685  * \param local \ref ast_taskprocessor_local object
686  * \return 0
687  */
688 static int dispatch_exec_sync(struct ast_taskprocessor_local *local)
689 {
690         struct stasis_subscription *sub = local->local_data;
691         struct sync_task_data *std = local->data;
692         struct stasis_message *message = std->task_data;
693
694         subscription_invoke(sub, message);
695         ao2_cleanup(message);
696
697         ast_mutex_lock(&std->lock);
698         std->complete = 1;
699         ast_cond_signal(&std->cond);
700         ast_mutex_unlock(&std->lock);
701
702         return 0;
703 }
704
705 /*!
706  * \internal \brief Dispatch a message to a subscriber
707  * \param sub The subscriber to dispatch to
708  * \param message The message to send
709  * \param synchronous If non-zero, synchronize on the subscriber receiving
710  * the message
711  */
712 static void dispatch_message(struct stasis_subscription *sub,
713         struct stasis_message *message,
714         int synchronous)
715 {
716         if (!sub->mailbox) {
717                 /* Dispatch directly */
718                 subscription_invoke(sub, message);
719                 return;
720         }
721
722         /* Bump the message for the taskprocessor push. This will get de-ref'd
723          * by the task processor callback.
724          */
725         ao2_bump(message);
726         if (!synchronous) {
727                 if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec_async, message)) {
728                         /* Push failed; ugh. */
729                         ast_log(LOG_ERROR, "Dropping async dispatch\n");
730                         ao2_cleanup(message);
731                 }
732         } else {
733                 struct sync_task_data std;
734
735                 ast_mutex_init(&std.lock);
736                 ast_cond_init(&std.cond, NULL);
737                 std.complete = 0;
738                 std.task_data = message;
739
740                 if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec_sync, &std)) {
741                         /* Push failed; ugh. */
742                         ast_log(LOG_ERROR, "Dropping sync dispatch\n");
743                         ao2_cleanup(message);
744                         ast_mutex_destroy(&std.lock);
745                         ast_cond_destroy(&std.cond);
746                         return;
747                 }
748
749                 ast_mutex_lock(&std.lock);
750                 while (!std.complete) {
751                         ast_cond_wait(&std.cond, &std.lock);
752                 }
753                 ast_mutex_unlock(&std.lock);
754
755                 ast_mutex_destroy(&std.lock);
756                 ast_cond_destroy(&std.cond);
757         }
758 }
759
760 /*!
761  * \internal \brief Publish a message to a topic's subscribers
762  * \brief topic The topic to publish to
763  * \brief message The message to publish
764  * \brief sync_sub An optional subscriber of the topic to publish synchronously
765  * to
766  */
767 static void publish_msg(struct stasis_topic *topic,
768         struct stasis_message *message, struct stasis_subscription *sync_sub)
769 {
770         size_t i;
771
772         ast_assert(topic != NULL);
773         ast_assert(message != NULL);
774
775         /*
776          * The topic may be unref'ed by the subscription invocation.
777          * Make sure we hold onto a reference while dispatching.
778          */
779         ao2_ref(topic, +1);
780         ao2_lock(topic);
781         for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
782                 struct stasis_subscription *sub = AST_VECTOR_GET(&topic->subscribers, i);
783
784                 ast_assert(sub != NULL);
785
786                 dispatch_message(sub, message, (sub == sync_sub));
787         }
788         ao2_unlock(topic);
789         ao2_ref(topic, -1);
790 }
791
792 void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
793 {
794         publish_msg(topic, message, NULL);
795 }
796
797 void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message)
798 {
799         ast_assert(sub != NULL);
800
801         publish_msg(sub->topic, message, sub);
802 }
803
804 /*!
805  * \brief Forwarding information
806  *
807  * Any message posted to \a from_topic is forwarded to \a to_topic.
808  *
809  * In cases where both the \a from_topic and \a to_topic need to be locked,
810  * always lock the \a to_topic first, then the \a from_topic. Lest you deadlock.
811  */
812 struct stasis_forward {
813         /*! Originating topic */
814         struct stasis_topic *from_topic;
815         /*! Destination topic */
816         struct stasis_topic *to_topic;
817 };
818
819 static void forward_dtor(void *obj)
820 {
821         struct stasis_forward *forward = obj;
822
823         ao2_cleanup(forward->from_topic);
824         forward->from_topic = NULL;
825         ao2_cleanup(forward->to_topic);
826         forward->to_topic = NULL;
827 }
828
829 struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward)
830 {
831         int idx;
832         struct stasis_topic *from;
833         struct stasis_topic *to;
834
835         if (!forward) {
836                 return NULL;
837         }
838
839         from = forward->from_topic;
840         to = forward->to_topic;
841
842         if (from && to) {
843                 topic_lock_both(to, from);
844                 AST_VECTOR_REMOVE_ELEM_UNORDERED(&to->upstream_topics, from,
845                         AST_VECTOR_ELEM_CLEANUP_NOOP);
846
847                 for (idx = 0; idx < AST_VECTOR_SIZE(&to->subscribers); ++idx) {
848                         topic_remove_subscription(from, AST_VECTOR_GET(&to->subscribers, idx));
849                 }
850                 ao2_unlock(from);
851                 ao2_unlock(to);
852         }
853
854         ao2_cleanup(forward);
855
856         return NULL;
857 }
858
859 struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic,
860         struct stasis_topic *to_topic)
861 {
862         int res;
863         size_t idx;
864         RAII_VAR(struct stasis_forward *, forward, NULL, ao2_cleanup);
865
866         if (!from_topic || !to_topic) {
867                 return NULL;
868         }
869
870         forward = ao2_alloc_options(sizeof(*forward), forward_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
871         if (!forward) {
872                 return NULL;
873         }
874
875         /* Forwards to ourselves are implicit. */
876         if (to_topic == from_topic) {
877                 return ao2_bump(forward);
878         }
879
880         forward->from_topic = ao2_bump(from_topic);
881         forward->to_topic = ao2_bump(to_topic);
882
883         topic_lock_both(to_topic, from_topic);
884         res = AST_VECTOR_APPEND(&to_topic->upstream_topics, from_topic);
885         if (res != 0) {
886                 ao2_unlock(from_topic);
887                 ao2_unlock(to_topic);
888                 return NULL;
889         }
890
891         for (idx = 0; idx < AST_VECTOR_SIZE(&to_topic->subscribers); ++idx) {
892                 topic_add_subscription(from_topic, AST_VECTOR_GET(&to_topic->subscribers, idx));
893         }
894         ao2_unlock(from_topic);
895         ao2_unlock(to_topic);
896
897         return ao2_bump(forward);
898 }
899
900 static void subscription_change_dtor(void *obj)
901 {
902         struct stasis_subscription_change *change = obj;
903
904         ast_string_field_free_memory(change);
905         ao2_cleanup(change->topic);
906 }
907
908 static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, const char *uniqueid, const char *description)
909 {
910         struct stasis_subscription_change *change;
911
912         change = ao2_alloc(sizeof(struct stasis_subscription_change), subscription_change_dtor);
913         if (!change || ast_string_field_init(change, 128)) {
914                 ao2_cleanup(change);
915                 return NULL;
916         }
917
918         ast_string_field_set(change, uniqueid, uniqueid);
919         ast_string_field_set(change, description, description);
920         ao2_ref(topic, +1);
921         change->topic = topic;
922
923         return change;
924 }
925
926 static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
927 {
928         struct stasis_subscription_change *change;
929         struct stasis_message *msg;
930
931         /* This assumes that we have already unsubscribed */
932         ast_assert(stasis_subscription_is_subscribed(sub));
933
934         if (!stasis_subscription_change_type()) {
935                 return;
936         }
937
938         change = subscription_change_alloc(topic, sub->uniqueid, "Subscribe");
939         if (!change) {
940                 return;
941         }
942
943         msg = stasis_message_create(stasis_subscription_change_type(), change);
944         if (!msg) {
945                 ao2_cleanup(change);
946                 return;
947         }
948
949         stasis_publish(topic, msg);
950         ao2_cleanup(msg);
951         ao2_cleanup(change);
952 }
953
954 static void send_subscription_unsubscribe(struct stasis_topic *topic,
955         struct stasis_subscription *sub)
956 {
957         struct stasis_subscription_change *change;
958         struct stasis_message *msg;
959
960         /* This assumes that we have already unsubscribed */
961         ast_assert(!stasis_subscription_is_subscribed(sub));
962
963         if (!stasis_subscription_change_type()) {
964                 return;
965         }
966
967         change = subscription_change_alloc(topic, sub->uniqueid, "Unsubscribe");
968         if (!change) {
969                 return;
970         }
971
972         msg = stasis_message_create(stasis_subscription_change_type(), change);
973         if (!msg) {
974                 ao2_cleanup(change);
975                 return;
976         }
977
978         stasis_publish(topic, msg);
979
980         /* Now we have to dispatch to the subscription itself */
981         dispatch_message(sub, msg, 0);
982
983         ao2_cleanup(msg);
984         ao2_cleanup(change);
985 }
986
987 struct topic_pool_entry {
988         struct stasis_forward *forward;
989         struct stasis_topic *topic;
990 };
991
992 static void topic_pool_entry_dtor(void *obj)
993 {
994         struct topic_pool_entry *entry = obj;
995
996         entry->forward = stasis_forward_cancel(entry->forward);
997         ao2_cleanup(entry->topic);
998         entry->topic = NULL;
999 }
1000
1001 static struct topic_pool_entry *topic_pool_entry_alloc(void)
1002 {
1003         return ao2_alloc_options(sizeof(struct topic_pool_entry), topic_pool_entry_dtor,
1004                 AO2_ALLOC_OPT_LOCK_NOLOCK);
1005 }
1006
1007 struct stasis_topic_pool {
1008         struct ao2_container *pool_container;
1009         struct stasis_topic *pool_topic;
1010 };
1011
1012 static void topic_pool_dtor(void *obj)
1013 {
1014         struct stasis_topic_pool *pool = obj;
1015
1016         ao2_cleanup(pool->pool_container);
1017         pool->pool_container = NULL;
1018         ao2_cleanup(pool->pool_topic);
1019         pool->pool_topic = NULL;
1020 }
1021
1022 static int topic_pool_entry_hash(const void *obj, const int flags)
1023 {
1024         const struct topic_pool_entry *object;
1025         const char *key;
1026
1027         switch (flags & OBJ_SEARCH_MASK) {
1028         case OBJ_SEARCH_KEY:
1029                 key = obj;
1030                 break;
1031         case OBJ_SEARCH_OBJECT:
1032                 object = obj;
1033                 key = stasis_topic_name(object->topic);
1034                 break;
1035         default:
1036                 /* Hash can only work on something with a full key. */
1037                 ast_assert(0);
1038                 return 0;
1039         }
1040         return ast_str_case_hash(key);
1041 }
1042
1043 static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
1044 {
1045         const struct topic_pool_entry *object_left = obj;
1046         const struct topic_pool_entry *object_right = arg;
1047         const char *right_key = arg;
1048         int cmp;
1049
1050         switch (flags & OBJ_SEARCH_MASK) {
1051         case OBJ_SEARCH_OBJECT:
1052                 right_key = stasis_topic_name(object_right->topic);
1053                 /* Fall through */
1054         case OBJ_SEARCH_KEY:
1055                 cmp = strcasecmp(stasis_topic_name(object_left->topic), right_key);
1056                 break;
1057         case OBJ_SEARCH_PARTIAL_KEY:
1058                 /* Not supported by container */
1059                 ast_assert(0);
1060                 cmp = -1;
1061                 break;
1062         default:
1063                 /*
1064                  * What arg points to is specific to this traversal callback
1065                  * and has no special meaning to astobj2.
1066                  */
1067                 cmp = 0;
1068                 break;
1069         }
1070         if (cmp) {
1071                 return 0;
1072         }
1073         /*
1074          * At this point the traversal callback is identical to a sorted
1075          * container.
1076          */
1077         return CMP_MATCH;
1078 }
1079
1080 struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic)
1081 {
1082         struct stasis_topic_pool *pool;
1083
1084         pool = ao2_alloc_options(sizeof(*pool), topic_pool_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
1085         if (!pool) {
1086                 return NULL;
1087         }
1088
1089         pool->pool_container = ao2_container_alloc(TOPIC_POOL_BUCKETS,
1090                 topic_pool_entry_hash, topic_pool_entry_cmp);
1091         if (!pool->pool_container) {
1092                 ao2_cleanup(pool);
1093                 return NULL;
1094         }
1095         ao2_ref(pooled_topic, +1);
1096         pool->pool_topic = pooled_topic;
1097
1098         return pool;
1099 }
1100
1101 struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
1102 {
1103         RAII_VAR(struct topic_pool_entry *, topic_pool_entry, NULL, ao2_cleanup);
1104         SCOPED_AO2LOCK(topic_container_lock, pool->pool_container);
1105
1106         topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1107         if (topic_pool_entry) {
1108                 return topic_pool_entry->topic;
1109         }
1110
1111         topic_pool_entry = topic_pool_entry_alloc();
1112         if (!topic_pool_entry) {
1113                 return NULL;
1114         }
1115
1116         topic_pool_entry->topic = stasis_topic_create(topic_name);
1117         if (!topic_pool_entry->topic) {
1118                 return NULL;
1119         }
1120
1121         topic_pool_entry->forward = stasis_forward_all(topic_pool_entry->topic, pool->pool_topic);
1122         if (!topic_pool_entry->forward) {
1123                 return NULL;
1124         }
1125
1126         if (!ao2_link_flags(pool->pool_container, topic_pool_entry, OBJ_NOLOCK)) {
1127                 return NULL;
1128         }
1129
1130         return topic_pool_entry->topic;
1131 }
1132
1133 void stasis_log_bad_type_access(const char *name)
1134 {
1135         if (!stasis_message_type_declined(name)) {
1136                 ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name);
1137         }
1138 }
1139
1140 /*! \brief A multi object blob data structure to carry user event stasis messages */
1141 struct ast_multi_object_blob {
1142         struct ast_json *blob;                             /*< A blob of JSON data */
1143         AST_VECTOR(, void *) snapshots[STASIS_UMOS_MAX];   /*< Vector of snapshots for each type */
1144 };
1145
1146 /*!
1147  * \internal
1148  * \brief Destructor for \ref ast_multi_object_blob objects
1149  */
1150 static void multi_object_blob_dtor(void *obj)
1151 {
1152         struct ast_multi_object_blob *multi = obj;
1153         int type;
1154         int i;
1155
1156         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1157                 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
1158                         ao2_cleanup(AST_VECTOR_GET(&multi->snapshots[type], i));
1159                 }
1160                 AST_VECTOR_FREE(&multi->snapshots[type]);
1161         }
1162         ast_json_unref(multi->blob);
1163 }
1164
1165 /*! \brief Create a stasis user event multi object blob */
1166 struct ast_multi_object_blob *ast_multi_object_blob_create(struct ast_json *blob)
1167 {
1168         int type;
1169         RAII_VAR(struct ast_multi_object_blob *, multi,
1170                         ao2_alloc(sizeof(*multi), multi_object_blob_dtor),
1171                         ao2_cleanup);
1172
1173         ast_assert(blob != NULL);
1174
1175         if (!multi) {
1176                 return NULL;
1177         }
1178
1179         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1180                 if (AST_VECTOR_INIT(&multi->snapshots[type], 0)) {
1181                         return NULL;
1182                 }
1183         }
1184
1185         multi->blob = ast_json_ref(blob);
1186
1187         ao2_ref(multi, +1);
1188         return multi;
1189 }
1190
1191 /*! \brief Add an object (snapshot) to the blob */
1192 void ast_multi_object_blob_add(struct ast_multi_object_blob *multi,
1193         enum stasis_user_multi_object_snapshot_type type, void *object)
1194 {
1195         if (!multi || !object) {
1196                 return;
1197         }
1198         AST_VECTOR_APPEND(&multi->snapshots[type],object);
1199 }
1200
1201 /*! \brief Publish single channel user event (for app_userevent compatibility) */
1202 void ast_multi_object_blob_single_channel_publish(struct ast_channel *chan,
1203         struct stasis_message_type *type, struct ast_json *blob)
1204 {
1205         RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
1206         RAII_VAR(struct ast_channel_snapshot *, channel_snapshot, NULL, ao2_cleanup);
1207         RAII_VAR(struct ast_multi_object_blob *, multi, NULL, ao2_cleanup);
1208
1209         if (!type) {
1210                 return;
1211         }
1212
1213         multi = ast_multi_object_blob_create(blob);
1214         if (!multi) {
1215                 return;
1216         }
1217
1218         channel_snapshot = ast_channel_snapshot_create(chan);
1219         ao2_ref(channel_snapshot, +1);
1220         ast_multi_object_blob_add(multi, STASIS_UMOS_CHANNEL, channel_snapshot);
1221
1222         message = stasis_message_create(type, multi);
1223         if (message) {
1224                 /* app_userevent still publishes to channel */
1225                 stasis_publish(ast_channel_topic(chan), message);
1226         }
1227 }
1228
1229 /*! \internal \brief convert multi object blob to ari json */
1230 static struct ast_json *multi_user_event_to_json(
1231         struct stasis_message *message,
1232         const struct stasis_message_sanitizer *sanitize)
1233 {
1234         RAII_VAR(struct ast_json *, out, NULL, ast_json_unref);
1235         struct ast_multi_object_blob *multi = stasis_message_data(message);
1236         struct ast_json *blob = multi->blob;
1237         const struct timeval *tv = stasis_message_timestamp(message);
1238         enum stasis_user_multi_object_snapshot_type type;
1239         int i;
1240
1241         out = ast_json_object_create();
1242         if (!out) {
1243                 return NULL;
1244         }
1245
1246         ast_json_object_set(out, "type", ast_json_string_create("ChannelUserevent"));
1247         ast_json_object_set(out, "timestamp", ast_json_timeval(*tv, NULL));
1248         ast_json_object_set(out, "eventname", ast_json_ref(ast_json_object_get(blob, "eventname")));
1249         ast_json_object_set(out, "userevent", ast_json_ref(blob)); /* eventname gets duplicated, that's ok */
1250
1251         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1252                 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
1253                         struct ast_json *json_object = NULL;
1254                         char *name = NULL;
1255                         void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
1256
1257                         switch (type) {
1258                         case STASIS_UMOS_CHANNEL:
1259                                 json_object = ast_channel_snapshot_to_json(snapshot, sanitize);
1260                                 name = "channel";
1261                                 break;
1262                         case STASIS_UMOS_BRIDGE:
1263                                 json_object = ast_bridge_snapshot_to_json(snapshot, sanitize);
1264                                 name = "bridge";
1265                                 break;
1266                         case STASIS_UMOS_ENDPOINT:
1267                                 json_object = ast_endpoint_snapshot_to_json(snapshot, sanitize);
1268                                 name = "endpoint";
1269                                 break;
1270                         }
1271                         if (json_object) {
1272                                 ast_json_object_set(out, name, json_object);
1273                         }
1274                 }
1275         }
1276         return ast_json_ref(out);
1277 }
1278
1279 /*! \internal \brief convert multi object blob to ami string */
1280 static struct ast_str *multi_object_blob_to_ami(void *obj)
1281 {
1282         struct ast_str *ami_str=ast_str_create(1024);
1283         struct ast_str *ami_snapshot;
1284         const struct ast_multi_object_blob *multi = obj;
1285         enum stasis_user_multi_object_snapshot_type type;
1286         int i;
1287
1288         if (!ami_str) {
1289                 return NULL;
1290         }
1291         if (!multi) {
1292                 ast_free(ami_str);
1293                 return NULL;
1294         }
1295
1296         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1297                 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
1298                         char *name = "";
1299                         void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
1300                         ami_snapshot = NULL;
1301
1302                         if (i > 0) {
1303                                 ast_asprintf(&name, "%d", i + 1);
1304                         }
1305
1306                         switch (type) {
1307                         case STASIS_UMOS_CHANNEL:
1308                                 ami_snapshot = ast_manager_build_channel_state_string_prefix(snapshot, name);
1309                                 break;
1310
1311                         case STASIS_UMOS_BRIDGE:
1312                                 ami_snapshot = ast_manager_build_bridge_state_string_prefix(snapshot, name);
1313                                 break;
1314
1315                         case STASIS_UMOS_ENDPOINT:
1316                                 /* currently not sending endpoint snapshots to AMI */
1317                                 break;
1318                         }
1319                         if (ami_snapshot) {
1320                                 ast_str_append(&ami_str, 0, "%s", ast_str_buffer(ami_snapshot));
1321                                 ast_free(ami_snapshot);
1322                         }
1323                 }
1324         }
1325
1326         return ami_str;
1327 }
1328
1329 /*! \internal \brief Callback to pass only user defined parameters from blob */
1330 static int userevent_exclusion_cb(const char *key)
1331 {
1332         if (!strcmp("eventname", key)) {
1333                 return 1;
1334         }
1335         return 0;
1336 }
1337
1338 static struct ast_manager_event_blob *multi_user_event_to_ami(
1339         struct stasis_message *message)
1340 {
1341         RAII_VAR(struct ast_str *, object_string, NULL, ast_free);
1342         RAII_VAR(struct ast_str *, body, NULL, ast_free);
1343         struct ast_multi_object_blob *multi = stasis_message_data(message);
1344         const char *eventname;
1345
1346         eventname = ast_json_string_get(ast_json_object_get(multi->blob, "eventname"));
1347         body = ast_manager_str_from_json_object(multi->blob, userevent_exclusion_cb);
1348         object_string = multi_object_blob_to_ami(multi);
1349         if (!object_string || !body) {
1350                 return NULL;
1351         }
1352
1353         return ast_manager_event_blob_create(EVENT_FLAG_USER, "UserEvent",
1354                 "%s"
1355                 "UserEvent: %s\r\n"
1356                 "%s",
1357                 ast_str_buffer(object_string),
1358                 eventname,
1359                 ast_str_buffer(body));
1360 }
1361
1362 /*! \brief A structure to hold global configuration-related options */
1363 struct stasis_declined_config {
1364         /*! The list of message types to decline */
1365         struct ao2_container *declined;
1366 };
1367
1368
1369 struct stasis_config {
1370         struct stasis_declined_config *declined_message_types;
1371 };
1372
1373 /*! \brief An aco_type structure to link the "declined_message_types" category to the stasis_declined_config type */
1374 static struct aco_type declined_option = {
1375         .type = ACO_GLOBAL,
1376         .name = "declined_message_types",
1377         .item_offset = offsetof(struct stasis_config, declined_message_types),
1378         .category_match = ACO_WHITELIST,
1379         .category = "^declined_message_types$",
1380 };
1381
1382 struct aco_type *declined_options[] = ACO_TYPES(&declined_option);
1383
1384 struct aco_file stasis_conf = {
1385         .filename = "stasis.conf",
1386         .types = ACO_TYPES(&declined_option),
1387 };
1388
1389 /*! \brief A global object container that will contain the stasis_config that gets swapped out on reloads */
1390 static AO2_GLOBAL_OBJ_STATIC(globals);
1391
1392 static void *stasis_config_alloc(void);
1393
1394 /*! \brief Register information about the configs being processed by this module */
1395 CONFIG_INFO_CORE("stasis", cfg_info, globals, stasis_config_alloc,
1396         .files = ACO_FILES(&stasis_conf),
1397 );
1398
1399 static void stasis_declined_config_destructor(void *obj)
1400 {
1401         struct stasis_declined_config *declined = obj;
1402         ao2_cleanup(declined->declined);
1403 }
1404
1405 static void stasis_config_destructor(void *obj)
1406 {
1407         struct stasis_config *cfg = obj;
1408         ao2_cleanup(cfg->declined_message_types);
1409 }
1410
1411 static void *stasis_config_alloc(void)
1412 {
1413         struct stasis_config *cfg;
1414
1415         if (!(cfg = ao2_alloc(sizeof(*cfg), stasis_config_destructor))) {
1416                 return NULL;
1417         }
1418
1419         /* Allocate/initialize memory */
1420         cfg->declined_message_types = ao2_alloc(sizeof(*cfg->declined_message_types), stasis_declined_config_destructor);
1421         if (!cfg->declined_message_types) {
1422                 goto error;
1423         }
1424
1425         cfg->declined_message_types->declined = ast_str_container_alloc(13);
1426         if (!cfg->declined_message_types->declined) {
1427                 goto error;
1428         }
1429
1430         return cfg;
1431 error:
1432         ao2_ref(cfg, -1);
1433         return NULL;
1434 }
1435
1436 int stasis_message_type_declined(const char *name)
1437 {
1438         RAII_VAR(struct stasis_config *, cfg, ao2_global_obj_ref(globals), ao2_cleanup);
1439         char *name_in_declined;
1440         int res;
1441
1442         if (!cfg || !cfg->declined_message_types) {
1443                 return 0;
1444         }
1445
1446         name_in_declined = ao2_find(cfg->declined_message_types->declined, name, OBJ_SEARCH_KEY);
1447         res = name_in_declined ? 1 : 0;
1448         ao2_cleanup(name_in_declined);
1449         if (res) {
1450                 ast_log(LOG_NOTICE, "Declining to allocate Stasis message type '%s' due to configuration\n", name);
1451         }
1452         return res;
1453 }
1454
1455 static int declined_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
1456 {
1457         struct stasis_declined_config *declined = obj;
1458
1459         if (ast_strlen_zero(var->value)) {
1460                 return 0;
1461         }
1462
1463         if (ast_str_container_add(declined->declined, var->value)) {
1464                 return -1;
1465         }
1466
1467         return 0;
1468 }
1469
1470 /*!
1471  * @{ \brief Define multi user event message type(s).
1472  */
1473
1474 STASIS_MESSAGE_TYPE_DEFN(ast_multi_user_event_type,
1475         .to_json = multi_user_event_to_json,
1476         .to_ami = multi_user_event_to_ami,
1477         );
1478
1479 /*! @} */
1480
1481 /*! \brief Cleanup function for graceful shutdowns */
1482 static void stasis_cleanup(void)
1483 {
1484         STASIS_MESSAGE_TYPE_CLEANUP(stasis_subscription_change_type);
1485         STASIS_MESSAGE_TYPE_CLEANUP(ast_multi_user_event_type);
1486         aco_info_destroy(&cfg_info);
1487         ao2_global_obj_release(globals);
1488 }
1489
1490 int stasis_init(void)
1491 {
1492         int cache_init;
1493
1494         /* Be sure the types are cleaned up after the message bus */
1495         ast_register_cleanup(stasis_cleanup);
1496
1497         if (aco_info_init(&cfg_info)) {
1498                 return -1;
1499         }
1500
1501         aco_option_register_custom(&cfg_info, "decline", ACO_EXACT, declined_options, "", declined_handler, 0);
1502
1503         if (aco_process_config(&cfg_info, 0) == ACO_PROCESS_ERROR) {
1504                 RAII_VAR(struct stasis_config *, stasis_cfg, stasis_config_alloc(), ao2_cleanup);
1505
1506                 if (aco_set_defaults(&declined_option, "declined_message_types", stasis_cfg->declined_message_types)) {
1507                         ast_log(LOG_ERROR, "Failed to load stasis.conf and failed to initialize defaults.\n");
1508                         return -1;
1509                 }
1510
1511                 ast_log(LOG_NOTICE, "Could not load stasis config; using defaults\n");
1512                 ao2_global_obj_replace_unref(globals, stasis_cfg);
1513         }
1514
1515         cache_init = stasis_cache_init();
1516         if (cache_init != 0) {
1517                 return -1;
1518         }
1519
1520         if (STASIS_MESSAGE_TYPE_INIT(stasis_subscription_change_type) != 0) {
1521                 return -1;
1522         }
1523         if (STASIS_MESSAGE_TYPE_INIT(ast_multi_user_event_type) != 0) {
1524                 return -1;
1525         }
1526
1527         return 0;
1528 }
1529