Replace most uses of ast_register_atexit with ast_register_cleanup.
[asterisk/asterisk.git] / main / stasis.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis Message Bus API.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
33
34 #include "asterisk/astobj2.h"
35 #include "asterisk/stasis_internal.h"
36 #include "asterisk/stasis.h"
37 #include "asterisk/taskprocessor.h"
38 #include "asterisk/threadpool.h"
39 #include "asterisk/utils.h"
40 #include "asterisk/uuid.h"
41 #include "asterisk/vector.h"
42 #include "asterisk/stasis_channels.h"
43 #include "asterisk/stasis_bridges.h"
44 #include "asterisk/stasis_endpoints.h"
45 #include "asterisk/config_options.h"
46
47 /*** DOCUMENTATION
48         <managerEvent language="en_US" name="UserEvent">
49                 <managerEventInstance class="EVENT_FLAG_USER">
50                         <synopsis>A user defined event raised from the dialplan.</synopsis>
51                         <syntax>
52                                 <channel_snapshot/>
53                                 <parameter name="UserEvent">
54                                         <para>The event name, as specified in the dialplan.</para>
55                                 </parameter>
56                         </syntax>
57                         <description>
58                                 <para>Event may contain additional arbitrary parameters in addition to optional bridge and endpoint snapshots.  Multiple snapshots of the same type are prefixed with a numeric value.</para>
59                         </description>
60                         <see-also>
61                                 <ref type="application">UserEvent</ref>
62                         </see-also>
63                 </managerEventInstance>
64         </managerEvent>
65         <configInfo name="stasis" language="en_US">
66                 <configFile name="stasis.conf">
67                         <configObject name="threadpool">
68                                 <synopsis>Settings that configure the threadpool Stasis uses to deliver some messages.</synopsis>
69                                 <configOption name="initial_size" default="5">
70                                         <synopsis>Initial number of threads in the message bus threadpool.</synopsis>
71                                 </configOption>
72                                 <configOption name="idle_timeout_sec" default="20">
73                                         <synopsis>Number of seconds before an idle thread is disposed of.</synopsis>
74                                 </configOption>
75                                 <configOption name="max_size" default="50">
76                                         <synopsis>Maximum number of threads in the threadpool.</synopsis>
77                                 </configOption>
78                         </configObject>
79                         <configObject name="declined_message_types">
80                                 <synopsis>Stasis message types for which to decline creation.</synopsis>
81                                 <configOption name="decline">
82                                         <synopsis>The message type to decline.</synopsis>
83                                         <description>
84                                                 <para>This configuration option defines the name of the Stasis
85                                                 message type that Asterisk is forbidden from creating and can be
86                                                 specified as many times as necessary to achieve the desired result.</para>
87                                                 <enumlist>
88                                                         <enum name="stasis_app_recording_snapshot_type" />
89                                                         <enum name="stasis_app_playback_snapshot_type" />
90                                                         <enum name="stasis_test_message_type" />
91                                                         <enum name="confbridge_start_type" />
92                                                         <enum name="confbridge_end_type" />
93                                                         <enum name="confbridge_join_type" />
94                                                         <enum name="confbridge_leave_type" />
95                                                         <enum name="confbridge_start_record_type" />
96                                                         <enum name="confbridge_stop_record_type" />
97                                                         <enum name="confbridge_mute_type" />
98                                                         <enum name="confbridge_unmute_type" />
99                                                         <enum name="confbridge_talking_type" />
100                                                         <enum name="cel_generic_type" />
101                                                         <enum name="ast_bridge_snapshot_type" />
102                                                         <enum name="ast_bridge_merge_message_type" />
103                                                         <enum name="ast_channel_entered_bridge_type" />
104                                                         <enum name="ast_channel_left_bridge_type" />
105                                                         <enum name="ast_blind_transfer_type" />
106                                                         <enum name="ast_attended_transfer_type" />
107                                                         <enum name="ast_endpoint_snapshot_type" />
108                                                         <enum name="ast_endpoint_state_type" />
109                                                         <enum name="ast_device_state_message_type" />
110                                                         <enum name="ast_test_suite_message_type" />
111                                                         <enum name="ast_mwi_state_type" />
112                                                         <enum name="ast_mwi_vm_app_type" />
113                                                         <enum name="ast_format_register_type" />
114                                                         <enum name="ast_format_unregister_type" />
115                                                         <enum name="ast_manager_get_generic_type" />
116                                                         <enum name="ast_parked_call_type" />
117                                                         <enum name="ast_channel_snapshot_type" />
118                                                         <enum name="ast_channel_dial_type" />
119                                                         <enum name="ast_channel_varset_type" />
120                                                         <enum name="ast_channel_hangup_request_type" />
121                                                         <enum name="ast_channel_dtmf_begin_type" />
122                                                         <enum name="ast_channel_dtmf_end_type" />
123                                                         <enum name="ast_channel_hold_type" />
124                                                         <enum name="ast_channel_unhold_type" />
125                                                         <enum name="ast_channel_chanspy_start_type" />
126                                                         <enum name="ast_channel_chanspy_stop_type" />
127                                                         <enum name="ast_channel_fax_type" />
128                                                         <enum name="ast_channel_hangup_handler_type" />
129                                                         <enum name="ast_channel_moh_start_type" />
130                                                         <enum name="ast_channel_moh_stop_type" />
131                                                         <enum name="ast_channel_monitor_start_type" />
132                                                         <enum name="ast_channel_monitor_stop_type" />
133                                                         <enum name="ast_channel_agent_login_type" />
134                                                         <enum name="ast_channel_agent_logoff_type" />
135                                                         <enum name="ast_channel_talking_start" />
136                                                         <enum name="ast_channel_talking_stop" />
137                                                         <enum name="ast_security_event_type" />
138                                                         <enum name="ast_named_acl_change_type" />
139                                                         <enum name="ast_local_bridge_type" />
140                                                         <enum name="ast_local_optimization_begin_type" />
141                                                         <enum name="ast_local_optimization_end_type" />
142                                                         <enum name="stasis_subscription_change_type" />
143                                                         <enum name="ast_multi_user_event_type" />
144                                                         <enum name="stasis_cache_clear_type" />
145                                                         <enum name="stasis_cache_update_type" />
146                                                         <enum name="ast_network_change_type" />
147                                                         <enum name="ast_system_registry_type" />
148                                                         <enum name="ast_cc_available_type" />
149                                                         <enum name="ast_cc_offertimerstart_type" />
150                                                         <enum name="ast_cc_requested_type" />
151                                                         <enum name="ast_cc_requestacknowledged_type" />
152                                                         <enum name="ast_cc_callerstopmonitoring_type" />
153                                                         <enum name="ast_cc_callerstartmonitoring_type" />
154                                                         <enum name="ast_cc_callerrecalling_type" />
155                                                         <enum name="ast_cc_recallcomplete_type" />
156                                                         <enum name="ast_cc_failure_type" />
157                                                         <enum name="ast_cc_monitorfailed_type" />
158                                                         <enum name="ast_presence_state_message_type" />
159                                                         <enum name="ast_rtp_rtcp_sent_type" />
160                                                         <enum name="ast_rtp_rtcp_received_type" />
161                                                         <enum name="ast_call_pickup_type" />
162                                                         <enum name="aoc_s_type" />
163                                                         <enum name="aoc_d_type" />
164                                                         <enum name="aoc_e_type" />
165                                                         <enum name="dahdichannel_type" />
166                                                         <enum name="mcid_type" />
167                                                         <enum name="session_timeout_type" />
168                                                         <enum name="cdr_read_message_type" />
169                                                         <enum name="cdr_write_message_type" />
170                                                         <enum name="cdr_prop_write_message_type" />
171                                                         <enum name="corosync_ping_message_type" />
172                                                         <enum name="agi_exec_start_type" />
173                                                         <enum name="agi_exec_end_type" />
174                                                         <enum name="agi_async_start_type" />
175                                                         <enum name="agi_async_exec_type" />
176                                                         <enum name="agi_async_end_type" />
177                                                         <enum name="queue_caller_join_type" />
178                                                         <enum name="queue_caller_leave_type" />
179                                                         <enum name="queue_caller_abandon_type" />
180                                                         <enum name="queue_member_status_type" />
181                                                         <enum name="queue_member_added_type" />
182                                                         <enum name="queue_member_removed_type" />
183                                                         <enum name="queue_member_pause_type" />
184                                                         <enum name="queue_member_penalty_type" />
185                                                         <enum name="queue_member_ringinuse_type" />
186                                                         <enum name="queue_agent_called_type" />
187                                                         <enum name="queue_agent_connect_type" />
188                                                         <enum name="queue_agent_complete_type" />
189                                                         <enum name="queue_agent_dump_type" />
190                                                         <enum name="queue_agent_ringnoanswer_type" />
191                                                         <enum name="meetme_join_type" />
192                                                         <enum name="meetme_leave_type" />
193                                                         <enum name="meetme_end_type" />
194                                                         <enum name="meetme_mute_type" />
195                                                         <enum name="meetme_talking_type" />
196                                                         <enum name="meetme_talk_request_type" />
197                                                         <enum name="appcdr_message_type" />
198                                                         <enum name="forkcdr_message_type" />
199                                                         <enum name="cdr_sync_message_type" />
200                                                 </enumlist>
201                                         </description>
202                                 </configOption>
203                         </configObject>
204                 </configFile>
205         </configInfo>
206 ***/
207
208 /*!
209  * \page stasis-impl Stasis Implementation Notes
210  *
211  * \par Reference counting
212  *
213  * Stasis introduces a number of objects, which are tightly related to one
214  * another. Because we rely on ref-counting for memory management, understanding
215  * these relationships is important to understanding this code.
216  *
217  * \code{.txt}
218  *
219  *   stasis_topic <----> stasis_subscription
220  *             ^          ^
221  *              \        /
222  *               \      /
223  *               dispatch
224  *                  |
225  *                  |
226  *                  v
227  *            stasis_message
228  *                  |
229  *                  |
230  *                  v
231  *          stasis_message_type
232  *
233  * \endcode
234  *
235  * The most troubling thing in this chart is the cyclic reference between
236  * stasis_topic and stasis_subscription. This is both unfortunate, and
237  * necessary. Topics need the subscription in order to dispatch messages;
238  * subscriptions need the topics to unsubscribe and check subscription status.
239  *
240  * The cycle is broken by stasis_unsubscribe(). The unsubscribe will remove the
241  * topic's reference to a subscription. When the subcription is destroyed, it
242  * will remove its reference to the topic.
243  *
244  * This means that until a subscription has be explicitly unsubscribed, it will
245  * not be destroyed. Neither will a topic be destroyed while it has subscribers.
246  * The destructors of both have assertions regarding this to catch ref-counting
247  * problems where a subscription or topic has had an extra ao2_cleanup().
248  *
249  * The \ref dispatch object is a transient object, which is posted to a
250  * subscription's taskprocessor to send a message to the subscriber. They have
251  * short life cycles, allocated on one thread, destroyed on another.
252  *
253  * During shutdown, or the deletion of a domain object, there are a flurry of
254  * ao2_cleanup()s on subscriptions and topics, as the final in-flight messages
255  * are processed. Any one of these cleanups could be the one to actually destroy
256  * a given object, so care must be taken to ensure that an object isn't
257  * referenced after an ao2_cleanup(). This includes the implicit ao2_unlock()
258  * that might happen when a RAII_VAR() goes out of scope.
259  *
260  * \par Typical life cycles
261  *
262  *  \li stasis_topic - There are several topics which live for the duration of
263  *      the Asterisk process (ast_channel_topic_all(), etc.) but most of these
264  *      are actually fed by shorter-lived topics whose lifetime is associated
265  *      with some domain object (like ast_channel_topic() for a given
266  *      ast_channel).
267  *
268  *  \li stasis_subscription - Subscriptions have a similar mix of lifetimes as
269  *      topics, for similar reasons.
270  *
271  *  \li dispatch - Very short lived; just long enough to post a message to a
272  *      subscriber.
273  *
274  *  \li stasis_message - Short to intermediate lifetimes, but that is mostly
275  *      irrelevant. Messages are strictly data and have no behavior associated
276  *      with them, so it doesn't really matter if/when they are destroyed. By
277  *      design, a component could hold a ref to a message forever without any
278  *      ill consequences (aside from consuming more memory).
279  *
280  *  \li stasis_message_type - Long life cycles, typically only destroyed on
281  *      module unloading or _clean_ process exit.
282  *
283  * \par Subscriber shutdown sequencing
284  *
285  * Subscribers are sensitive to shutdown sequencing, specifically in how the
286  * reference message types. This is fully detailed on the wiki at
287  * https://wiki.asterisk.org/wiki/x/K4BqAQ.
288  *
289  * In short, the lifetime of the \a data (and \a callback, if in a module) must
290  * be held until the stasis_subscription_final_message() has been received.
291  * Depending on the structure of the subscriber code, this can be handled by
292  * using stasis_subscription_final_message() to free resources on the final
293  * message, or using stasis_subscription_join()/stasis_unsubscribe_and_join() to
294  * block until the unsubscribe has completed.
295  */
296
297 /*! Initial size of the subscribers list. */
298 #define INITIAL_SUBSCRIBERS_MAX 4
299
300 /*! The number of buckets to use for topic pools */
301 #define TOPIC_POOL_BUCKETS 57
302
303 /*! Thread pool for topics that don't want a dedicated taskprocessor */
304 static struct ast_threadpool *pool;
305
306 STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type);
307
308 /*! \internal */
309 struct stasis_topic {
310         char *name;
311         /*! Variable length array of the subscribers */
312         AST_VECTOR(, struct stasis_subscription *) subscribers;
313
314         /*! Topics forwarding into this topic */
315         AST_VECTOR(, struct stasis_topic *) upstream_topics;
316 };
317
318 /* Forward declarations for the tightly-coupled subscription object */
319 static int topic_add_subscription(struct stasis_topic *topic,
320         struct stasis_subscription *sub);
321
322 static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
323
324 /*! \brief Lock two topics. */
325 #define topic_lock_both(topic1, topic2) \
326         do { \
327                 ao2_lock(topic1); \
328                 while (ao2_trylock(topic2)) { \
329                         AO2_DEADLOCK_AVOIDANCE(topic1); \
330                 } \
331         } while (0)
332
333 static void topic_dtor(void *obj)
334 {
335         struct stasis_topic *topic = obj;
336
337         /* Subscribers hold a reference to topics, so they should all be
338          * unsubscribed before we get here. */
339         ast_assert(AST_VECTOR_SIZE(&topic->subscribers) == 0);
340
341         ast_free(topic->name);
342         topic->name = NULL;
343
344         AST_VECTOR_FREE(&topic->subscribers);
345         AST_VECTOR_FREE(&topic->upstream_topics);
346 }
347
348 struct stasis_topic *stasis_topic_create(const char *name)
349 {
350         struct stasis_topic *topic;
351         int res = 0;
352
353         topic = ao2_t_alloc(sizeof(*topic), topic_dtor, name);
354         if (!topic) {
355                 return NULL;
356         }
357
358         topic->name = ast_strdup(name);
359         res |= AST_VECTOR_INIT(&topic->subscribers, INITIAL_SUBSCRIBERS_MAX);
360         res |= AST_VECTOR_INIT(&topic->upstream_topics, 0);
361         if (!topic->name || res) {
362                 ao2_cleanup(topic);
363                 return NULL;
364         }
365
366         return topic;
367 }
368
369 const char *stasis_topic_name(const struct stasis_topic *topic)
370 {
371         return topic->name;
372 }
373
374 /*! \internal */
375 struct stasis_subscription {
376         /*! Unique ID for this subscription */
377         char uniqueid[AST_UUID_STR_LEN];
378         /*! Topic subscribed to. */
379         struct stasis_topic *topic;
380         /*! Mailbox for processing incoming messages. */
381         struct ast_taskprocessor *mailbox;
382         /*! Callback function for incoming message processing. */
383         stasis_subscription_cb callback;
384         /*! Data pointer to be handed to the callback. */
385         void *data;
386
387         /*! Condition for joining with subscription. */
388         ast_cond_t join_cond;
389         /*! Flag set when final message for sub has been received.
390          *  Be sure join_lock is held before reading/setting. */
391         int final_message_rxed;
392         /*! Flag set when final message for sub has been processed.
393          *  Be sure join_lock is held before reading/setting. */
394         int final_message_processed;
395 };
396
397 static void subscription_dtor(void *obj)
398 {
399         struct stasis_subscription *sub = obj;
400
401         /* Subscriptions need to be manually unsubscribed before destruction
402          * b/c there's a cyclic reference between topics and subscriptions */
403         ast_assert(!stasis_subscription_is_subscribed(sub));
404         /* If there are any messages in flight to this subscription; that would
405          * be bad. */
406         ast_assert(stasis_subscription_is_done(sub));
407
408         ao2_cleanup(sub->topic);
409         sub->topic = NULL;
410         ast_taskprocessor_unreference(sub->mailbox);
411         sub->mailbox = NULL;
412         ast_cond_destroy(&sub->join_cond);
413 }
414
415 /*!
416  * \brief Invoke the subscription's callback.
417  * \param sub Subscription to invoke.
418  * \param topic Topic message was published to.
419  * \param message Message to send.
420  */
421 static void subscription_invoke(struct stasis_subscription *sub,
422                                   struct stasis_message *message)
423 {
424         /* Notify that the final message has been received */
425         if (stasis_subscription_final_message(sub, message)) {
426                 SCOPED_AO2LOCK(lock, sub);
427
428                 sub->final_message_rxed = 1;
429                 ast_cond_signal(&sub->join_cond);
430         }
431
432         /* Since sub is mostly immutable, no need to lock sub */
433         sub->callback(sub->data, sub, message);
434
435         /* Notify that the final message has been processed */
436         if (stasis_subscription_final_message(sub, message)) {
437                 SCOPED_AO2LOCK(lock, sub);
438
439                 sub->final_message_processed = 1;
440                 ast_cond_signal(&sub->join_cond);
441         }
442 }
443
444 static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
445 static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
446
447 struct stasis_subscription *internal_stasis_subscribe(
448         struct stasis_topic *topic,
449         stasis_subscription_cb callback,
450         void *data,
451         int needs_mailbox,
452         int use_thread_pool)
453 {
454         RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup);
455
456         if (!topic) {
457                 return NULL;
458         }
459
460         /* The ao2 lock is used for join_cond. */
461         sub = ao2_t_alloc(sizeof(*sub), subscription_dtor, topic->name);
462         if (!sub) {
463                 return NULL;
464         }
465         ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid));
466
467         if (needs_mailbox) {
468                 /* With a small number of subscribers, a thread-per-sub is
469                  * acceptable. For larger number of subscribers, a thread
470                  * pool should be used.
471                  */
472                 if (use_thread_pool) {
473                         sub->mailbox = ast_threadpool_serializer(sub->uniqueid, pool);
474                 } else {
475                         sub->mailbox = ast_taskprocessor_get(sub->uniqueid,
476                                 TPS_REF_DEFAULT);
477                 }
478                 if (!sub->mailbox) {
479                         return NULL;
480                 }
481                 ast_taskprocessor_set_local(sub->mailbox, sub);
482                 /* Taskprocessor has a reference */
483                 ao2_ref(sub, +1);
484         }
485
486         ao2_ref(topic, +1);
487         sub->topic = topic;
488         sub->callback = callback;
489         sub->data = data;
490         ast_cond_init(&sub->join_cond, NULL);
491
492         if (topic_add_subscription(topic, sub) != 0) {
493                 return NULL;
494         }
495         send_subscription_subscribe(topic, sub);
496
497         ao2_ref(sub, +1);
498         return sub;
499 }
500
501 struct stasis_subscription *stasis_subscribe(
502         struct stasis_topic *topic,
503         stasis_subscription_cb callback,
504         void *data)
505 {
506         return internal_stasis_subscribe(topic, callback, data, 1, 0);
507 }
508
509 struct stasis_subscription *stasis_subscribe_pool(
510         struct stasis_topic *topic,
511         stasis_subscription_cb callback,
512         void *data)
513 {
514         return internal_stasis_subscribe(topic, callback, data, 1, 1);
515 }
516
517 static int sub_cleanup(void *data)
518 {
519         struct stasis_subscription *sub = data;
520         ao2_cleanup(sub);
521         return 0;
522 }
523
524 struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
525 {
526         /* The subscription may be the last ref to this topic. Hold
527          * the topic ref open until after the unlock. */
528         RAII_VAR(struct stasis_topic *, topic,
529                 ao2_bump(sub ? sub->topic : NULL), ao2_cleanup);
530
531         if (!sub) {
532                 return NULL;
533         }
534
535         /* We have to remove the subscription first, to ensure the unsubscribe
536          * is the final message */
537         if (topic_remove_subscription(sub->topic, sub) != 0) {
538                 ast_log(LOG_ERROR,
539                         "Internal error: subscription has invalid topic\n");
540                 return NULL;
541         }
542
543         /* Now let everyone know about the unsubscribe */
544         send_subscription_unsubscribe(topic, sub);
545
546         /* When all that's done, remove the ref the mailbox has on the sub */
547         if (sub->mailbox) {
548                 ast_taskprocessor_push(sub->mailbox, sub_cleanup, sub);
549         }
550
551         /* Unsubscribing unrefs the subscription */
552         ao2_cleanup(sub);
553         return NULL;
554 }
555
556 void stasis_subscription_join(struct stasis_subscription *subscription)
557 {
558         if (subscription) {
559                 SCOPED_AO2LOCK(lock, subscription);
560
561                 /* Wait until the processed flag has been set */
562                 while (!subscription->final_message_processed) {
563                         ast_cond_wait(&subscription->join_cond,
564                                 ao2_object_get_lockaddr(subscription));
565                 }
566         }
567 }
568
569 int stasis_subscription_is_done(struct stasis_subscription *subscription)
570 {
571         if (subscription) {
572                 SCOPED_AO2LOCK(lock, subscription);
573
574                 return subscription->final_message_rxed;
575         }
576
577         /* Null subscription is about as done as you can get */
578         return 1;
579 }
580
581 struct stasis_subscription *stasis_unsubscribe_and_join(
582         struct stasis_subscription *subscription)
583 {
584         if (!subscription) {
585                 return NULL;
586         }
587
588         /* Bump refcount to hold it past the unsubscribe */
589         ao2_ref(subscription, +1);
590         stasis_unsubscribe(subscription);
591         stasis_subscription_join(subscription);
592         /* Now decrement the refcount back */
593         ao2_cleanup(subscription);
594         return NULL;
595 }
596
597 int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
598 {
599         if (sub) {
600                 size_t i;
601                 struct stasis_topic *topic = sub->topic;
602                 SCOPED_AO2LOCK(lock_topic, topic);
603
604                 for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
605                         if (AST_VECTOR_GET(&topic->subscribers, i) == sub) {
606                                 return 1;
607                         }
608                 }
609         }
610
611         return 0;
612 }
613
614 const char *stasis_subscription_uniqueid(const struct stasis_subscription *sub)
615 {
616         return sub->uniqueid;
617 }
618
619 int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
620 {
621         struct stasis_subscription_change *change;
622
623         if (stasis_message_type(msg) != stasis_subscription_change_type()) {
624                 return 0;
625         }
626
627         change = stasis_message_data(msg);
628         if (strcmp("Unsubscribe", change->description)) {
629                 return 0;
630         }
631
632         if (strcmp(stasis_subscription_uniqueid(sub), change->uniqueid)) {
633                 return 0;
634         }
635
636         return 1;
637 }
638
639 /*!
640  * \brief Add a subscriber to a topic.
641  * \param topic Topic
642  * \param sub Subscriber
643  * \return 0 on success
644  * \return Non-zero on error
645  */
646 static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
647 {
648         size_t idx;
649         SCOPED_AO2LOCK(lock, topic);
650
651         /* The reference from the topic to the subscription is shared with
652          * the owner of the subscription, which will explicitly unsubscribe
653          * to release it.
654          *
655          * If we bumped the refcount here, the owner would have to unsubscribe
656          * and cleanup, which is a bit awkward. */
657         AST_VECTOR_APPEND(&topic->subscribers, sub);
658
659         for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
660                 topic_add_subscription(
661                         AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
662         }
663
664         return 0;
665 }
666
667 static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
668 {
669         size_t idx;
670         SCOPED_AO2LOCK(lock_topic, topic);
671
672         for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
673                 topic_remove_subscription(
674                         AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
675         }
676
677         return AST_VECTOR_REMOVE_ELEM_UNORDERED(&topic->subscribers, sub,
678                 AST_VECTOR_ELEM_CLEANUP_NOOP);
679 }
680
681 /*!
682  * \internal \brief Dispatch a message to a subscriber asynchronously
683  * \param local \ref ast_taskprocessor_local object
684  * \return 0
685  */
686 static int dispatch_exec_async(struct ast_taskprocessor_local *local)
687 {
688         struct stasis_subscription *sub = local->local_data;
689         struct stasis_message *message = local->data;
690
691         subscription_invoke(sub, message);
692         ao2_cleanup(message);
693
694         return 0;
695 }
696
697 /*!
698  * \internal \brief Data passed to \ref dispatch_exec_sync to synchronize
699  * a published message to a subscriber
700  */
701 struct sync_task_data {
702         ast_mutex_t lock;
703         ast_cond_t cond;
704         int complete;
705         void *task_data;
706 };
707
708 /*!
709  * \internal \brief Dispatch a message to a subscriber synchronously
710  * \param local \ref ast_taskprocessor_local object
711  * \return 0
712  */
713 static int dispatch_exec_sync(struct ast_taskprocessor_local *local)
714 {
715         struct stasis_subscription *sub = local->local_data;
716         struct sync_task_data *std = local->data;
717         struct stasis_message *message = std->task_data;
718
719         subscription_invoke(sub, message);
720         ao2_cleanup(message);
721
722         ast_mutex_lock(&std->lock);
723         std->complete = 1;
724         ast_cond_signal(&std->cond);
725         ast_mutex_unlock(&std->lock);
726
727         return 0;
728 }
729
730 /*!
731  * \internal \brief Dispatch a message to a subscriber
732  * \param sub The subscriber to dispatch to
733  * \param message The message to send
734  * \param synchronous If non-zero, synchronize on the subscriber receiving
735  * the message
736  */
737 static void dispatch_message(struct stasis_subscription *sub,
738         struct stasis_message *message,
739         int synchronous)
740 {
741         if (!sub->mailbox) {
742                 /* Dispatch directly */
743                 subscription_invoke(sub, message);
744                 return;
745         }
746
747         /* Bump the message for the taskprocessor push. This will get de-ref'd
748          * by the task processor callback.
749          */
750         ao2_bump(message);
751         if (!synchronous) {
752                 if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec_async, message)) {
753                         /* Push failed; ugh. */
754                         ast_log(LOG_ERROR, "Dropping async dispatch\n");
755                         ao2_cleanup(message);
756                 }
757         } else {
758                 struct sync_task_data std;
759
760                 ast_mutex_init(&std.lock);
761                 ast_cond_init(&std.cond, NULL);
762                 std.complete = 0;
763                 std.task_data = message;
764
765                 if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec_sync, &std)) {
766                         /* Push failed; ugh. */
767                         ast_log(LOG_ERROR, "Dropping sync dispatch\n");
768                         ao2_cleanup(message);
769                         ast_mutex_destroy(&std.lock);
770                         ast_cond_destroy(&std.cond);
771                         return;
772                 }
773
774                 ast_mutex_lock(&std.lock);
775                 while (!std.complete) {
776                         ast_cond_wait(&std.cond, &std.lock);
777                 }
778                 ast_mutex_unlock(&std.lock);
779
780                 ast_mutex_destroy(&std.lock);
781                 ast_cond_destroy(&std.cond);
782         }
783 }
784
785 /*!
786  * \internal \brief Publish a message to a topic's subscribers
787  * \brief topic The topic to publish to
788  * \brief message The message to publish
789  * \brief sync_sub An optional subscriber of the topic to publish synchronously
790  * to
791  */
792 static void publish_msg(struct stasis_topic *topic,
793         struct stasis_message *message, struct stasis_subscription *sync_sub)
794 {
795         size_t i;
796
797         ast_assert(topic != NULL);
798         ast_assert(message != NULL);
799
800         /*
801          * The topic may be unref'ed by the subscription invocation.
802          * Make sure we hold onto a reference while dispatching.
803          */
804         ao2_ref(topic, +1);
805         ao2_lock(topic);
806         for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
807                 struct stasis_subscription *sub = AST_VECTOR_GET(&topic->subscribers, i);
808
809                 ast_assert(sub != NULL);
810
811                 dispatch_message(sub, message, (sub == sync_sub));
812         }
813         ao2_unlock(topic);
814         ao2_ref(topic, -1);
815 }
816
817 void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
818 {
819         publish_msg(topic, message, NULL);
820 }
821
822 void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message)
823 {
824         ast_assert(sub != NULL);
825
826         publish_msg(sub->topic, message, sub);
827 }
828
829 /*!
830  * \brief Forwarding information
831  *
832  * Any message posted to \a from_topic is forwarded to \a to_topic.
833  *
834  * In cases where both the \a from_topic and \a to_topic need to be locked,
835  * always lock the \a to_topic first, then the \a from_topic. Lest you deadlock.
836  */
837 struct stasis_forward {
838         /*! Originating topic */
839         struct stasis_topic *from_topic;
840         /*! Destination topic */
841         struct stasis_topic *to_topic;
842 };
843
844 static void forward_dtor(void *obj)
845 {
846         struct stasis_forward *forward = obj;
847
848         ao2_cleanup(forward->from_topic);
849         forward->from_topic = NULL;
850         ao2_cleanup(forward->to_topic);
851         forward->to_topic = NULL;
852 }
853
854 struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward)
855 {
856         int idx;
857         struct stasis_topic *from;
858         struct stasis_topic *to;
859
860         if (!forward) {
861                 return NULL;
862         }
863
864         from = forward->from_topic;
865         to = forward->to_topic;
866
867         if (from && to) {
868                 topic_lock_both(to, from);
869                 AST_VECTOR_REMOVE_ELEM_UNORDERED(&to->upstream_topics, from,
870                         AST_VECTOR_ELEM_CLEANUP_NOOP);
871
872                 for (idx = 0; idx < AST_VECTOR_SIZE(&to->subscribers); ++idx) {
873                         topic_remove_subscription(from, AST_VECTOR_GET(&to->subscribers, idx));
874                 }
875                 ao2_unlock(from);
876                 ao2_unlock(to);
877         }
878
879         ao2_cleanup(forward);
880
881         return NULL;
882 }
883
884 struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic,
885         struct stasis_topic *to_topic)
886 {
887         int res;
888         size_t idx;
889         RAII_VAR(struct stasis_forward *, forward, NULL, ao2_cleanup);
890
891         if (!from_topic || !to_topic) {
892                 return NULL;
893         }
894
895         forward = ao2_alloc_options(sizeof(*forward), forward_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
896         if (!forward) {
897                 return NULL;
898         }
899
900         /* Forwards to ourselves are implicit. */
901         if (to_topic == from_topic) {
902                 return ao2_bump(forward);
903         }
904
905         forward->from_topic = ao2_bump(from_topic);
906         forward->to_topic = ao2_bump(to_topic);
907
908         topic_lock_both(to_topic, from_topic);
909         res = AST_VECTOR_APPEND(&to_topic->upstream_topics, from_topic);
910         if (res != 0) {
911                 ao2_unlock(from_topic);
912                 ao2_unlock(to_topic);
913                 return NULL;
914         }
915
916         for (idx = 0; idx < AST_VECTOR_SIZE(&to_topic->subscribers); ++idx) {
917                 topic_add_subscription(from_topic, AST_VECTOR_GET(&to_topic->subscribers, idx));
918         }
919         ao2_unlock(from_topic);
920         ao2_unlock(to_topic);
921
922         return ao2_bump(forward);
923 }
924
925 static void subscription_change_dtor(void *obj)
926 {
927         struct stasis_subscription_change *change = obj;
928
929         ast_string_field_free_memory(change);
930         ao2_cleanup(change->topic);
931 }
932
933 static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, const char *uniqueid, const char *description)
934 {
935         struct stasis_subscription_change *change;
936
937         change = ao2_alloc(sizeof(struct stasis_subscription_change), subscription_change_dtor);
938         if (!change || ast_string_field_init(change, 128)) {
939                 ao2_cleanup(change);
940                 return NULL;
941         }
942
943         ast_string_field_set(change, uniqueid, uniqueid);
944         ast_string_field_set(change, description, description);
945         ao2_ref(topic, +1);
946         change->topic = topic;
947
948         return change;
949 }
950
951 static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
952 {
953         struct stasis_subscription_change *change;
954         struct stasis_message *msg;
955
956         /* This assumes that we have already unsubscribed */
957         ast_assert(stasis_subscription_is_subscribed(sub));
958
959         if (!stasis_subscription_change_type()) {
960                 return;
961         }
962
963         change = subscription_change_alloc(topic, sub->uniqueid, "Subscribe");
964         if (!change) {
965                 return;
966         }
967
968         msg = stasis_message_create(stasis_subscription_change_type(), change);
969         if (!msg) {
970                 ao2_cleanup(change);
971                 return;
972         }
973
974         stasis_publish(topic, msg);
975         ao2_cleanup(msg);
976         ao2_cleanup(change);
977 }
978
979 static void send_subscription_unsubscribe(struct stasis_topic *topic,
980         struct stasis_subscription *sub)
981 {
982         struct stasis_subscription_change *change;
983         struct stasis_message *msg;
984
985         /* This assumes that we have already unsubscribed */
986         ast_assert(!stasis_subscription_is_subscribed(sub));
987
988         if (!stasis_subscription_change_type()) {
989                 return;
990         }
991
992         change = subscription_change_alloc(topic, sub->uniqueid, "Unsubscribe");
993         if (!change) {
994                 return;
995         }
996
997         msg = stasis_message_create(stasis_subscription_change_type(), change);
998         if (!msg) {
999                 ao2_cleanup(change);
1000                 return;
1001         }
1002
1003         stasis_publish(topic, msg);
1004
1005         /* Now we have to dispatch to the subscription itself */
1006         dispatch_message(sub, msg, 0);
1007
1008         ao2_cleanup(msg);
1009         ao2_cleanup(change);
1010 }
1011
1012 struct topic_pool_entry {
1013         struct stasis_forward *forward;
1014         struct stasis_topic *topic;
1015 };
1016
1017 static void topic_pool_entry_dtor(void *obj)
1018 {
1019         struct topic_pool_entry *entry = obj;
1020
1021         entry->forward = stasis_forward_cancel(entry->forward);
1022         ao2_cleanup(entry->topic);
1023         entry->topic = NULL;
1024 }
1025
1026 static struct topic_pool_entry *topic_pool_entry_alloc(void)
1027 {
1028         return ao2_alloc_options(sizeof(struct topic_pool_entry), topic_pool_entry_dtor,
1029                 AO2_ALLOC_OPT_LOCK_NOLOCK);
1030 }
1031
1032 struct stasis_topic_pool {
1033         struct ao2_container *pool_container;
1034         struct stasis_topic *pool_topic;
1035 };
1036
1037 static void topic_pool_dtor(void *obj)
1038 {
1039         struct stasis_topic_pool *pool = obj;
1040
1041         ao2_cleanup(pool->pool_container);
1042         pool->pool_container = NULL;
1043         ao2_cleanup(pool->pool_topic);
1044         pool->pool_topic = NULL;
1045 }
1046
1047 static int topic_pool_entry_hash(const void *obj, const int flags)
1048 {
1049         const struct topic_pool_entry *object;
1050         const char *key;
1051
1052         switch (flags & OBJ_SEARCH_MASK) {
1053         case OBJ_SEARCH_KEY:
1054                 key = obj;
1055                 break;
1056         case OBJ_SEARCH_OBJECT:
1057                 object = obj;
1058                 key = stasis_topic_name(object->topic);
1059                 break;
1060         default:
1061                 /* Hash can only work on something with a full key. */
1062                 ast_assert(0);
1063                 return 0;
1064         }
1065         return ast_str_case_hash(key);
1066 }
1067
1068 static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
1069 {
1070         const struct topic_pool_entry *object_left = obj;
1071         const struct topic_pool_entry *object_right = arg;
1072         const char *right_key = arg;
1073         int cmp;
1074
1075         switch (flags & OBJ_SEARCH_MASK) {
1076         case OBJ_SEARCH_OBJECT:
1077                 right_key = stasis_topic_name(object_right->topic);
1078                 /* Fall through */
1079         case OBJ_SEARCH_KEY:
1080                 cmp = strcasecmp(stasis_topic_name(object_left->topic), right_key);
1081                 break;
1082         case OBJ_SEARCH_PARTIAL_KEY:
1083                 /* Not supported by container */
1084                 ast_assert(0);
1085                 cmp = -1;
1086                 break;
1087         default:
1088                 /*
1089                  * What arg points to is specific to this traversal callback
1090                  * and has no special meaning to astobj2.
1091                  */
1092                 cmp = 0;
1093                 break;
1094         }
1095         if (cmp) {
1096                 return 0;
1097         }
1098         /*
1099          * At this point the traversal callback is identical to a sorted
1100          * container.
1101          */
1102         return CMP_MATCH;
1103 }
1104
1105 struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic)
1106 {
1107         struct stasis_topic_pool *pool;
1108
1109         pool = ao2_alloc_options(sizeof(*pool), topic_pool_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
1110         if (!pool) {
1111                 return NULL;
1112         }
1113
1114         pool->pool_container = ao2_container_alloc(TOPIC_POOL_BUCKETS,
1115                 topic_pool_entry_hash, topic_pool_entry_cmp);
1116         if (!pool->pool_container) {
1117                 ao2_cleanup(pool);
1118                 return NULL;
1119         }
1120         ao2_ref(pooled_topic, +1);
1121         pool->pool_topic = pooled_topic;
1122
1123         return pool;
1124 }
1125
1126 struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
1127 {
1128         RAII_VAR(struct topic_pool_entry *, topic_pool_entry, NULL, ao2_cleanup);
1129         SCOPED_AO2LOCK(topic_container_lock, pool->pool_container);
1130
1131         topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1132         if (topic_pool_entry) {
1133                 return topic_pool_entry->topic;
1134         }
1135
1136         topic_pool_entry = topic_pool_entry_alloc();
1137         if (!topic_pool_entry) {
1138                 return NULL;
1139         }
1140
1141         topic_pool_entry->topic = stasis_topic_create(topic_name);
1142         if (!topic_pool_entry->topic) {
1143                 return NULL;
1144         }
1145
1146         topic_pool_entry->forward = stasis_forward_all(topic_pool_entry->topic, pool->pool_topic);
1147         if (!topic_pool_entry->forward) {
1148                 return NULL;
1149         }
1150
1151         if (!ao2_link_flags(pool->pool_container, topic_pool_entry, OBJ_NOLOCK)) {
1152                 return NULL;
1153         }
1154
1155         return topic_pool_entry->topic;
1156 }
1157
1158 void stasis_log_bad_type_access(const char *name)
1159 {
1160 #ifdef AST_DEVMODE
1161         ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name);
1162 #endif
1163 }
1164
1165 /*! \brief A multi object blob data structure to carry user event stasis messages */
1166 struct ast_multi_object_blob {
1167         struct ast_json *blob;                             /*< A blob of JSON data */
1168         AST_VECTOR(, void *) snapshots[STASIS_UMOS_MAX];   /*< Vector of snapshots for each type */
1169 };
1170
1171 /*!
1172  * \internal
1173  * \brief Destructor for \ref ast_multi_object_blob objects
1174  */
1175 static void multi_object_blob_dtor(void *obj)
1176 {
1177         struct ast_multi_object_blob *multi = obj;
1178         int type;
1179         int i;
1180
1181         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1182                 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
1183                         ao2_cleanup(AST_VECTOR_GET(&multi->snapshots[type], i));
1184                 }
1185                 AST_VECTOR_FREE(&multi->snapshots[type]);
1186         }
1187         ast_json_unref(multi->blob);
1188 }
1189
1190 /*! \brief Create a stasis user event multi object blob */
1191 struct ast_multi_object_blob *ast_multi_object_blob_create(struct ast_json *blob)
1192 {
1193         int type;
1194         RAII_VAR(struct ast_multi_object_blob *, multi,
1195                         ao2_alloc(sizeof(*multi), multi_object_blob_dtor),
1196                         ao2_cleanup);
1197
1198         ast_assert(blob != NULL);
1199
1200         if (!multi) {
1201                 return NULL;
1202         }
1203
1204         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1205                 if (AST_VECTOR_INIT(&multi->snapshots[type], 0)) {
1206                         return NULL;
1207                 }
1208         }
1209
1210         multi->blob = ast_json_ref(blob);
1211
1212         ao2_ref(multi, +1);
1213         return multi;
1214 }
1215
1216 /*! \brief Add an object (snapshot) to the blob */
1217 void ast_multi_object_blob_add(struct ast_multi_object_blob *multi,
1218         enum stasis_user_multi_object_snapshot_type type, void *object)
1219 {
1220         if (!multi || !object) {
1221                 return;
1222         }
1223         AST_VECTOR_APPEND(&multi->snapshots[type],object);
1224 }
1225
1226 /*! \brief Publish single channel user event (for app_userevent compatibility) */
1227 void ast_multi_object_blob_single_channel_publish(struct ast_channel *chan,
1228         struct stasis_message_type *type, struct ast_json *blob)
1229 {
1230         RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
1231         RAII_VAR(struct ast_channel_snapshot *, channel_snapshot, NULL, ao2_cleanup);
1232         RAII_VAR(struct ast_multi_object_blob *, multi, NULL, ao2_cleanup);
1233
1234         if (!type) {
1235                 return;
1236         }
1237
1238         multi = ast_multi_object_blob_create(blob);
1239         if (!multi) {
1240                 return;
1241         }
1242
1243         channel_snapshot = ast_channel_snapshot_create(chan);
1244         ao2_ref(channel_snapshot, +1);
1245         ast_multi_object_blob_add(multi, STASIS_UMOS_CHANNEL, channel_snapshot);
1246
1247         message = stasis_message_create(type, multi);
1248         if (message) {
1249                 /* app_userevent still publishes to channel */
1250                 stasis_publish(ast_channel_topic(chan), message);
1251         }
1252 }
1253
1254 /*! \internal \brief convert multi object blob to ari json */
1255 static struct ast_json *multi_user_event_to_json(
1256         struct stasis_message *message,
1257         const struct stasis_message_sanitizer *sanitize)
1258 {
1259         RAII_VAR(struct ast_json *, out, NULL, ast_json_unref);
1260         struct ast_multi_object_blob *multi = stasis_message_data(message);
1261         struct ast_json *blob = multi->blob;
1262         const struct timeval *tv = stasis_message_timestamp(message);
1263         enum stasis_user_multi_object_snapshot_type type;
1264         int i;
1265
1266         out = ast_json_object_create();
1267         if (!out) {
1268                 return NULL;
1269         }
1270
1271         ast_json_object_set(out, "type", ast_json_string_create("ChannelUserevent"));
1272         ast_json_object_set(out, "timestamp", ast_json_timeval(*tv, NULL));
1273         ast_json_object_set(out, "eventname", ast_json_ref(ast_json_object_get(blob, "eventname")));
1274         ast_json_object_set(out, "userevent", ast_json_ref(blob)); /* eventname gets duplicated, that's ok */
1275
1276         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1277                 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
1278                         struct ast_json *json_object = NULL;
1279                         char *name = NULL;
1280                         void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
1281
1282                         switch (type) {
1283                         case STASIS_UMOS_CHANNEL:
1284                                 json_object = ast_channel_snapshot_to_json(snapshot, sanitize);
1285                                 name = "channel";
1286                                 break;
1287                         case STASIS_UMOS_BRIDGE:
1288                                 json_object = ast_bridge_snapshot_to_json(snapshot, sanitize);
1289                                 name = "bridge";
1290                                 break;
1291                         case STASIS_UMOS_ENDPOINT:
1292                                 json_object = ast_endpoint_snapshot_to_json(snapshot, sanitize);
1293                                 name = "endpoint";
1294                                 break;
1295                         }
1296                         if (json_object) {
1297                                 ast_json_object_set(out, name, json_object);
1298                         }
1299                 }
1300         }
1301         return ast_json_ref(out);
1302 }
1303
1304 /*! \internal \brief convert multi object blob to ami string */
1305 static struct ast_str *multi_object_blob_to_ami(void *obj)
1306 {
1307         struct ast_str *ami_str=ast_str_create(1024);
1308         struct ast_str *ami_snapshot;
1309         const struct ast_multi_object_blob *multi = obj;
1310         enum stasis_user_multi_object_snapshot_type type;
1311         int i;
1312
1313         if (!ami_str) {
1314                 return NULL;
1315         }
1316         if (!multi) {
1317                 ast_free(ami_str);
1318                 return NULL;
1319         }
1320
1321         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1322                 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
1323                         char *name = "";
1324                         void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
1325                         ami_snapshot = NULL;
1326
1327                         if (i > 0) {
1328                                 ast_asprintf(&name, "%d", i + 1);
1329                         }
1330
1331                         switch (type) {
1332                         case STASIS_UMOS_CHANNEL:
1333                                 ami_snapshot = ast_manager_build_channel_state_string_prefix(snapshot, name);
1334                                 break;
1335
1336                         case STASIS_UMOS_BRIDGE:
1337                                 ami_snapshot = ast_manager_build_bridge_state_string_prefix(snapshot, name);
1338                                 break;
1339
1340                         case STASIS_UMOS_ENDPOINT:
1341                                 /* currently not sending endpoint snapshots to AMI */
1342                                 break;
1343                         }
1344                         if (ami_snapshot) {
1345                                 ast_str_append(&ami_str, 0, "%s", ast_str_buffer(ami_snapshot));
1346                                 ast_free(ami_snapshot);
1347                         }
1348                 }
1349         }
1350
1351         return ami_str;
1352 }
1353
1354 /*! \internal \brief Callback to pass only user defined parameters from blob */
1355 static int userevent_exclusion_cb(const char *key)
1356 {
1357         if (!strcmp("eventname", key)) {
1358                 return 1;
1359         }
1360         return 0;
1361 }
1362
1363 static struct ast_manager_event_blob *multi_user_event_to_ami(
1364         struct stasis_message *message)
1365 {
1366         RAII_VAR(struct ast_str *, object_string, NULL, ast_free);
1367         RAII_VAR(struct ast_str *, body, NULL, ast_free);
1368         struct ast_multi_object_blob *multi = stasis_message_data(message);
1369         const char *eventname;
1370
1371         eventname = ast_json_string_get(ast_json_object_get(multi->blob, "eventname"));
1372         body = ast_manager_str_from_json_object(multi->blob, userevent_exclusion_cb);
1373         object_string = multi_object_blob_to_ami(multi);
1374         if (!object_string || !body) {
1375                 return NULL;
1376         }
1377
1378         return ast_manager_event_blob_create(EVENT_FLAG_USER, "UserEvent",
1379                 "%s"
1380                 "UserEvent: %s\r\n"
1381                 "%s",
1382                 ast_str_buffer(object_string),
1383                 eventname,
1384                 ast_str_buffer(body));
1385 }
1386
1387 /*! \brief A structure to hold global configuration-related options */
1388 struct stasis_declined_config {
1389         /*! The list of message types to decline */
1390         struct ao2_container *declined;
1391 };
1392
1393 /*! \brief Threadpool configuration options */
1394 struct stasis_threadpool_conf {
1395         /*! Initial size of the thread pool */
1396         int initial_size;
1397         /*! Time, in seconds, before we expire a thread */
1398         int idle_timeout_sec;
1399         /*! Maximum number of thread to allow */
1400         int max_size;
1401 };
1402
1403 struct stasis_config {
1404         /*! Thread pool configuration options */
1405         struct stasis_threadpool_conf *threadpool_options;
1406         /*! Declined message types */
1407         struct stasis_declined_config *declined_message_types;
1408 };
1409
1410 static struct aco_type threadpool_option = {
1411         .type = ACO_GLOBAL,
1412         .name = "threadpool",
1413         .item_offset = offsetof(struct stasis_config, threadpool_options),
1414         .category = "^threadpool$",
1415         .category_match = ACO_WHITELIST,
1416 };
1417
1418 static struct aco_type *threadpool_options[] = ACO_TYPES(&threadpool_option);
1419
1420 /*! \brief An aco_type structure to link the "declined_message_types" category to the stasis_declined_config type */
1421 static struct aco_type declined_option = {
1422         .type = ACO_GLOBAL,
1423         .name = "declined_message_types",
1424         .item_offset = offsetof(struct stasis_config, declined_message_types),
1425         .category_match = ACO_WHITELIST,
1426         .category = "^declined_message_types$",
1427 };
1428
1429 struct aco_type *declined_options[] = ACO_TYPES(&declined_option);
1430
1431 struct aco_file stasis_conf = {
1432         .filename = "stasis.conf",
1433         .types = ACO_TYPES(&declined_option, &threadpool_option),
1434 };
1435
1436 /*! \brief A global object container that will contain the stasis_config that gets swapped out on reloads */
1437 static AO2_GLOBAL_OBJ_STATIC(globals);
1438
1439 static void *stasis_config_alloc(void);
1440
1441 /*! \brief Register information about the configs being processed by this module */
1442 CONFIG_INFO_CORE("stasis", cfg_info, globals, stasis_config_alloc,
1443         .files = ACO_FILES(&stasis_conf),
1444 );
1445
1446 static void stasis_declined_config_destructor(void *obj)
1447 {
1448         struct stasis_declined_config *declined = obj;
1449
1450         ao2_cleanup(declined->declined);
1451 }
1452
1453 static void stasis_config_destructor(void *obj)
1454 {
1455         struct stasis_config *cfg = obj;
1456
1457         ao2_cleanup(cfg->declined_message_types);
1458         ast_free(cfg->threadpool_options);
1459 }
1460
1461 static void *stasis_config_alloc(void)
1462 {
1463         struct stasis_config *cfg;
1464
1465         if (!(cfg = ao2_alloc(sizeof(*cfg), stasis_config_destructor))) {
1466                 return NULL;
1467         }
1468
1469         cfg->threadpool_options = ast_calloc(1, sizeof(*cfg->threadpool_options));
1470         if (!cfg->threadpool_options) {
1471                 ao2_ref(cfg, -1);
1472                 return NULL;
1473         }
1474
1475         cfg->declined_message_types = ao2_alloc(sizeof(*cfg->declined_message_types),
1476                 stasis_declined_config_destructor);
1477         if (!cfg->declined_message_types) {
1478                 ao2_ref(cfg, -1);
1479                 return NULL;
1480         }
1481
1482         cfg->declined_message_types->declined = ast_str_container_alloc(13);
1483         if (!cfg->declined_message_types->declined) {
1484                 ao2_ref(cfg, -1);
1485                 return NULL;
1486         }
1487
1488         return cfg;
1489 }
1490
1491 int stasis_message_type_declined(const char *name)
1492 {
1493         RAII_VAR(struct stasis_config *, cfg, ao2_global_obj_ref(globals), ao2_cleanup);
1494         char *name_in_declined;
1495         int res;
1496
1497         if (!cfg || !cfg->declined_message_types) {
1498                 return 0;
1499         }
1500
1501         name_in_declined = ao2_find(cfg->declined_message_types->declined, name, OBJ_SEARCH_KEY);
1502         res = name_in_declined ? 1 : 0;
1503         ao2_cleanup(name_in_declined);
1504         if (res) {
1505                 ast_log(LOG_NOTICE, "Declining to allocate Stasis message type '%s' due to configuration\n", name);
1506         }
1507         return res;
1508 }
1509
1510 static int declined_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
1511 {
1512         struct stasis_declined_config *declined = obj;
1513
1514         if (ast_strlen_zero(var->value)) {
1515                 return 0;
1516         }
1517
1518         if (ast_str_container_add(declined->declined, var->value)) {
1519                 return -1;
1520         }
1521
1522         return 0;
1523 }
1524
1525 /*!
1526  * @{ \brief Define multi user event message type(s).
1527  */
1528
1529 STASIS_MESSAGE_TYPE_DEFN(ast_multi_user_event_type,
1530         .to_json = multi_user_event_to_json,
1531         .to_ami = multi_user_event_to_ami,
1532         );
1533
1534 /*! @} */
1535
1536 /*! \brief Cleanup function for graceful shutdowns */
1537 static void stasis_cleanup(void)
1538 {
1539         ast_threadpool_shutdown(pool);
1540         pool = NULL;
1541         STASIS_MESSAGE_TYPE_CLEANUP(stasis_subscription_change_type);
1542         STASIS_MESSAGE_TYPE_CLEANUP(ast_multi_user_event_type);
1543         aco_info_destroy(&cfg_info);
1544         ao2_global_obj_release(globals);
1545 }
1546
1547 int stasis_init(void)
1548 {
1549         RAII_VAR(struct stasis_config *, cfg, NULL, ao2_cleanup);
1550         int cache_init;
1551         struct ast_threadpool_options threadpool_opts = { 0, };
1552
1553         /* Be sure the types are cleaned up after the message bus */
1554         ast_register_cleanup(stasis_cleanup);
1555
1556         if (aco_info_init(&cfg_info)) {
1557                 return -1;
1558         }
1559
1560         aco_option_register_custom(&cfg_info, "decline", ACO_EXACT,
1561                 declined_options, "", declined_handler, 0);
1562         aco_option_register(&cfg_info, "initial_size", ACO_EXACT,
1563                 threadpool_options, "5", OPT_INT_T, PARSE_IN_RANGE,
1564                 FLDSET(struct stasis_threadpool_conf, initial_size), 0,
1565                 INT_MAX);
1566         aco_option_register(&cfg_info, "idle_timeout_sec", ACO_EXACT,
1567                 threadpool_options, "20", OPT_INT_T, PARSE_IN_RANGE,
1568                 FLDSET(struct stasis_threadpool_conf, idle_timeout_sec), 0,
1569                 INT_MAX);
1570         aco_option_register(&cfg_info, "max_size", ACO_EXACT,
1571                 threadpool_options, "50", OPT_INT_T, PARSE_IN_RANGE,
1572                 FLDSET(struct stasis_threadpool_conf, max_size), 0,
1573                 INT_MAX);
1574
1575         if (aco_process_config(&cfg_info, 0) == ACO_PROCESS_ERROR) {
1576                 struct stasis_config *default_cfg = stasis_config_alloc();
1577
1578                 if (!default_cfg) {
1579                         return -1;
1580                 }
1581
1582                 if (aco_set_defaults(&threadpool_option, "threadpool", default_cfg->threadpool_options)) {
1583                         ast_log(LOG_ERROR, "Failed to initialize defaults on Stasis configuration object\n");
1584                         ao2_ref(default_cfg, -1);
1585                         return -1;
1586                 }
1587
1588                 if (aco_set_defaults(&declined_option, "declined_message_types", default_cfg->declined_message_types)) {
1589                         ast_log(LOG_ERROR, "Failed to load stasis.conf and failed to initialize defaults.\n");
1590                         return -1;
1591                 }
1592
1593                 ast_log(LOG_NOTICE, "Could not load Stasis configuration; using defaults\n");
1594                 ao2_global_obj_replace_unref(globals, default_cfg);
1595                 cfg = default_cfg;
1596         } else {
1597                 cfg = ao2_global_obj_ref(globals);
1598                 if (!cfg) {
1599                         ast_log(LOG_ERROR, "Failed to obtain Stasis configuration object\n");
1600                         return -1;
1601                 }
1602         }
1603
1604         threadpool_opts.version = AST_THREADPOOL_OPTIONS_VERSION;
1605         threadpool_opts.initial_size = cfg->threadpool_options->initial_size;
1606         threadpool_opts.auto_increment = 1;
1607         threadpool_opts.max_size = cfg->threadpool_options->max_size;
1608         threadpool_opts.idle_timeout = cfg->threadpool_options->idle_timeout_sec;
1609         pool = ast_threadpool_create("stasis-core", NULL, &threadpool_opts);
1610         if (!pool) {
1611                 ast_log(LOG_ERROR, "Failed to create 'stasis-core' threadpool\n");
1612                 return -1;
1613         }
1614
1615         cache_init = stasis_cache_init();
1616         if (cache_init != 0) {
1617                 return -1;
1618         }
1619
1620         if (STASIS_MESSAGE_TYPE_INIT(stasis_subscription_change_type) != 0) {
1621                 return -1;
1622         }
1623         if (STASIS_MESSAGE_TYPE_INIT(ast_multi_user_event_type) != 0) {
1624                 return -1;
1625         }
1626
1627         return 0;
1628 }
1629