Remove as much trailing whitespace as possible.
[asterisk/asterisk.git] / main / stasis.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis Message Bus API.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 #include "asterisk/astobj2.h"
33 #include "asterisk/stasis_internal.h"
34 #include "asterisk/stasis.h"
35 #include "asterisk/taskprocessor.h"
36 #include "asterisk/threadpool.h"
37 #include "asterisk/utils.h"
38 #include "asterisk/uuid.h"
39 #include "asterisk/vector.h"
40 #include "asterisk/stasis_channels.h"
41 #include "asterisk/stasis_bridges.h"
42 #include "asterisk/stasis_endpoints.h"
43 #include "asterisk/config_options.h"
44
45 /*** DOCUMENTATION
46         <managerEvent language="en_US" name="UserEvent">
47                 <managerEventInstance class="EVENT_FLAG_USER">
48                         <synopsis>A user defined event raised from the dialplan.</synopsis>
49                         <syntax>
50                                 <channel_snapshot/>
51                                 <parameter name="UserEvent">
52                                         <para>The event name, as specified in the dialplan.</para>
53                                 </parameter>
54                         </syntax>
55                         <description>
56                                 <para>Event may contain additional arbitrary parameters in addition to optional bridge and endpoint snapshots.  Multiple snapshots of the same type are prefixed with a numeric value.</para>
57                         </description>
58                         <see-also>
59                                 <ref type="application">UserEvent</ref>
60                                 <ref type="managerEvent">UserEvent</ref>
61                         </see-also>
62                 </managerEventInstance>
63         </managerEvent>
64         <configInfo name="stasis" language="en_US">
65                 <configFile name="stasis.conf">
66                         <configObject name="threadpool">
67                                 <synopsis>Settings that configure the threadpool Stasis uses to deliver some messages.</synopsis>
68                                 <configOption name="initial_size" default="5">
69                                         <synopsis>Initial number of threads in the message bus threadpool.</synopsis>
70                                 </configOption>
71                                 <configOption name="idle_timeout_sec" default="20">
72                                         <synopsis>Number of seconds before an idle thread is disposed of.</synopsis>
73                                 </configOption>
74                                 <configOption name="max_size" default="50">
75                                         <synopsis>Maximum number of threads in the threadpool.</synopsis>
76                                 </configOption>
77                         </configObject>
78                         <configObject name="declined_message_types">
79                                 <synopsis>Stasis message types for which to decline creation.</synopsis>
80                                 <configOption name="decline">
81                                         <synopsis>The message type to decline.</synopsis>
82                                         <description>
83                                                 <para>This configuration option defines the name of the Stasis
84                                                 message type that Asterisk is forbidden from creating and can be
85                                                 specified as many times as necessary to achieve the desired result.</para>
86                                                 <enumlist>
87                                                         <enum name="stasis_app_recording_snapshot_type" />
88                                                         <enum name="stasis_app_playback_snapshot_type" />
89                                                         <enum name="stasis_test_message_type" />
90                                                         <enum name="confbridge_start_type" />
91                                                         <enum name="confbridge_end_type" />
92                                                         <enum name="confbridge_join_type" />
93                                                         <enum name="confbridge_leave_type" />
94                                                         <enum name="confbridge_start_record_type" />
95                                                         <enum name="confbridge_stop_record_type" />
96                                                         <enum name="confbridge_mute_type" />
97                                                         <enum name="confbridge_unmute_type" />
98                                                         <enum name="confbridge_talking_type" />
99                                                         <enum name="cel_generic_type" />
100                                                         <enum name="ast_bridge_snapshot_type" />
101                                                         <enum name="ast_bridge_merge_message_type" />
102                                                         <enum name="ast_channel_entered_bridge_type" />
103                                                         <enum name="ast_channel_left_bridge_type" />
104                                                         <enum name="ast_blind_transfer_type" />
105                                                         <enum name="ast_attended_transfer_type" />
106                                                         <enum name="ast_endpoint_snapshot_type" />
107                                                         <enum name="ast_endpoint_state_type" />
108                                                         <enum name="ast_device_state_message_type" />
109                                                         <enum name="ast_test_suite_message_type" />
110                                                         <enum name="ast_mwi_state_type" />
111                                                         <enum name="ast_mwi_vm_app_type" />
112                                                         <enum name="ast_format_register_type" />
113                                                         <enum name="ast_format_unregister_type" />
114                                                         <enum name="ast_manager_get_generic_type" />
115                                                         <enum name="ast_parked_call_type" />
116                                                         <enum name="ast_channel_snapshot_type" />
117                                                         <enum name="ast_channel_dial_type" />
118                                                         <enum name="ast_channel_varset_type" />
119                                                         <enum name="ast_channel_hangup_request_type" />
120                                                         <enum name="ast_channel_dtmf_begin_type" />
121                                                         <enum name="ast_channel_dtmf_end_type" />
122                                                         <enum name="ast_channel_hold_type" />
123                                                         <enum name="ast_channel_unhold_type" />
124                                                         <enum name="ast_channel_chanspy_start_type" />
125                                                         <enum name="ast_channel_chanspy_stop_type" />
126                                                         <enum name="ast_channel_fax_type" />
127                                                         <enum name="ast_channel_hangup_handler_type" />
128                                                         <enum name="ast_channel_moh_start_type" />
129                                                         <enum name="ast_channel_moh_stop_type" />
130                                                         <enum name="ast_channel_monitor_start_type" />
131                                                         <enum name="ast_channel_monitor_stop_type" />
132                                                         <enum name="ast_channel_agent_login_type" />
133                                                         <enum name="ast_channel_agent_logoff_type" />
134                                                         <enum name="ast_channel_talking_start" />
135                                                         <enum name="ast_channel_talking_stop" />
136                                                         <enum name="ast_security_event_type" />
137                                                         <enum name="ast_named_acl_change_type" />
138                                                         <enum name="ast_local_bridge_type" />
139                                                         <enum name="ast_local_optimization_begin_type" />
140                                                         <enum name="ast_local_optimization_end_type" />
141                                                         <enum name="stasis_subscription_change_type" />
142                                                         <enum name="ast_multi_user_event_type" />
143                                                         <enum name="stasis_cache_clear_type" />
144                                                         <enum name="stasis_cache_update_type" />
145                                                         <enum name="ast_network_change_type" />
146                                                         <enum name="ast_system_registry_type" />
147                                                         <enum name="ast_cc_available_type" />
148                                                         <enum name="ast_cc_offertimerstart_type" />
149                                                         <enum name="ast_cc_requested_type" />
150                                                         <enum name="ast_cc_requestacknowledged_type" />
151                                                         <enum name="ast_cc_callerstopmonitoring_type" />
152                                                         <enum name="ast_cc_callerstartmonitoring_type" />
153                                                         <enum name="ast_cc_callerrecalling_type" />
154                                                         <enum name="ast_cc_recallcomplete_type" />
155                                                         <enum name="ast_cc_failure_type" />
156                                                         <enum name="ast_cc_monitorfailed_type" />
157                                                         <enum name="ast_presence_state_message_type" />
158                                                         <enum name="ast_rtp_rtcp_sent_type" />
159                                                         <enum name="ast_rtp_rtcp_received_type" />
160                                                         <enum name="ast_call_pickup_type" />
161                                                         <enum name="aoc_s_type" />
162                                                         <enum name="aoc_d_type" />
163                                                         <enum name="aoc_e_type" />
164                                                         <enum name="dahdichannel_type" />
165                                                         <enum name="mcid_type" />
166                                                         <enum name="session_timeout_type" />
167                                                         <enum name="cdr_read_message_type" />
168                                                         <enum name="cdr_write_message_type" />
169                                                         <enum name="cdr_prop_write_message_type" />
170                                                         <enum name="corosync_ping_message_type" />
171                                                         <enum name="agi_exec_start_type" />
172                                                         <enum name="agi_exec_end_type" />
173                                                         <enum name="agi_async_start_type" />
174                                                         <enum name="agi_async_exec_type" />
175                                                         <enum name="agi_async_end_type" />
176                                                         <enum name="queue_caller_join_type" />
177                                                         <enum name="queue_caller_leave_type" />
178                                                         <enum name="queue_caller_abandon_type" />
179                                                         <enum name="queue_member_status_type" />
180                                                         <enum name="queue_member_added_type" />
181                                                         <enum name="queue_member_removed_type" />
182                                                         <enum name="queue_member_pause_type" />
183                                                         <enum name="queue_member_penalty_type" />
184                                                         <enum name="queue_member_ringinuse_type" />
185                                                         <enum name="queue_agent_called_type" />
186                                                         <enum name="queue_agent_connect_type" />
187                                                         <enum name="queue_agent_complete_type" />
188                                                         <enum name="queue_agent_dump_type" />
189                                                         <enum name="queue_agent_ringnoanswer_type" />
190                                                         <enum name="meetme_join_type" />
191                                                         <enum name="meetme_leave_type" />
192                                                         <enum name="meetme_end_type" />
193                                                         <enum name="meetme_mute_type" />
194                                                         <enum name="meetme_talking_type" />
195                                                         <enum name="meetme_talk_request_type" />
196                                                         <enum name="appcdr_message_type" />
197                                                         <enum name="forkcdr_message_type" />
198                                                         <enum name="cdr_sync_message_type" />
199                                                 </enumlist>
200                                         </description>
201                                 </configOption>
202                         </configObject>
203                 </configFile>
204         </configInfo>
205 ***/
206
207 /*!
208  * \page stasis-impl Stasis Implementation Notes
209  *
210  * \par Reference counting
211  *
212  * Stasis introduces a number of objects, which are tightly related to one
213  * another. Because we rely on ref-counting for memory management, understanding
214  * these relationships is important to understanding this code.
215  *
216  * \code{.txt}
217  *
218  *   stasis_topic <----> stasis_subscription
219  *             ^          ^
220  *              \        /
221  *               \      /
222  *               dispatch
223  *                  |
224  *                  |
225  *                  v
226  *            stasis_message
227  *                  |
228  *                  |
229  *                  v
230  *          stasis_message_type
231  *
232  * \endcode
233  *
234  * The most troubling thing in this chart is the cyclic reference between
235  * stasis_topic and stasis_subscription. This is both unfortunate, and
236  * necessary. Topics need the subscription in order to dispatch messages;
237  * subscriptions need the topics to unsubscribe and check subscription status.
238  *
239  * The cycle is broken by stasis_unsubscribe(). The unsubscribe will remove the
240  * topic's reference to a subscription. When the subcription is destroyed, it
241  * will remove its reference to the topic.
242  *
243  * This means that until a subscription has be explicitly unsubscribed, it will
244  * not be destroyed. Neither will a topic be destroyed while it has subscribers.
245  * The destructors of both have assertions regarding this to catch ref-counting
246  * problems where a subscription or topic has had an extra ao2_cleanup().
247  *
248  * The \ref dispatch object is a transient object, which is posted to a
249  * subscription's taskprocessor to send a message to the subscriber. They have
250  * short life cycles, allocated on one thread, destroyed on another.
251  *
252  * During shutdown, or the deletion of a domain object, there are a flurry of
253  * ao2_cleanup()s on subscriptions and topics, as the final in-flight messages
254  * are processed. Any one of these cleanups could be the one to actually destroy
255  * a given object, so care must be taken to ensure that an object isn't
256  * referenced after an ao2_cleanup(). This includes the implicit ao2_unlock()
257  * that might happen when a RAII_VAR() goes out of scope.
258  *
259  * \par Typical life cycles
260  *
261  *  \li stasis_topic - There are several topics which live for the duration of
262  *      the Asterisk process (ast_channel_topic_all(), etc.) but most of these
263  *      are actually fed by shorter-lived topics whose lifetime is associated
264  *      with some domain object (like ast_channel_topic() for a given
265  *      ast_channel).
266  *
267  *  \li stasis_subscription - Subscriptions have a similar mix of lifetimes as
268  *      topics, for similar reasons.
269  *
270  *  \li dispatch - Very short lived; just long enough to post a message to a
271  *      subscriber.
272  *
273  *  \li stasis_message - Short to intermediate lifetimes, but that is mostly
274  *      irrelevant. Messages are strictly data and have no behavior associated
275  *      with them, so it doesn't really matter if/when they are destroyed. By
276  *      design, a component could hold a ref to a message forever without any
277  *      ill consequences (aside from consuming more memory).
278  *
279  *  \li stasis_message_type - Long life cycles, typically only destroyed on
280  *      module unloading or _clean_ process exit.
281  *
282  * \par Subscriber shutdown sequencing
283  *
284  * Subscribers are sensitive to shutdown sequencing, specifically in how the
285  * reference message types. This is fully detailed on the wiki at
286  * https://wiki.asterisk.org/wiki/x/K4BqAQ.
287  *
288  * In short, the lifetime of the \a data (and \a callback, if in a module) must
289  * be held until the stasis_subscription_final_message() has been received.
290  * Depending on the structure of the subscriber code, this can be handled by
291  * using stasis_subscription_final_message() to free resources on the final
292  * message, or using stasis_subscription_join()/stasis_unsubscribe_and_join() to
293  * block until the unsubscribe has completed.
294  */
295
296 /*! Initial size of the subscribers list. */
297 #define INITIAL_SUBSCRIBERS_MAX 4
298
299 /*! The number of buckets to use for topic pools */
300 #define TOPIC_POOL_BUCKETS 57
301
302 /*! Thread pool for topics that don't want a dedicated taskprocessor */
303 static struct ast_threadpool *pool;
304
305 STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type);
306
307 /*! \internal */
308 struct stasis_topic {
309         char *name;
310         /*! Variable length array of the subscribers */
311         AST_VECTOR(, struct stasis_subscription *) subscribers;
312
313         /*! Topics forwarding into this topic */
314         AST_VECTOR(, struct stasis_topic *) upstream_topics;
315 };
316
317 /* Forward declarations for the tightly-coupled subscription object */
318 static int topic_add_subscription(struct stasis_topic *topic,
319         struct stasis_subscription *sub);
320
321 static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
322
323 /*! \brief Lock two topics. */
324 #define topic_lock_both(topic1, topic2) \
325         do { \
326                 ao2_lock(topic1); \
327                 while (ao2_trylock(topic2)) { \
328                         AO2_DEADLOCK_AVOIDANCE(topic1); \
329                 } \
330         } while (0)
331
332 static void topic_dtor(void *obj)
333 {
334         struct stasis_topic *topic = obj;
335
336         /* Subscribers hold a reference to topics, so they should all be
337          * unsubscribed before we get here. */
338         ast_assert(AST_VECTOR_SIZE(&topic->subscribers) == 0);
339
340         ast_free(topic->name);
341         topic->name = NULL;
342
343         AST_VECTOR_FREE(&topic->subscribers);
344         AST_VECTOR_FREE(&topic->upstream_topics);
345 }
346
347 struct stasis_topic *stasis_topic_create(const char *name)
348 {
349         struct stasis_topic *topic;
350         int res = 0;
351
352         topic = ao2_t_alloc(sizeof(*topic), topic_dtor, name);
353         if (!topic) {
354                 return NULL;
355         }
356
357         topic->name = ast_strdup(name);
358         res |= AST_VECTOR_INIT(&topic->subscribers, INITIAL_SUBSCRIBERS_MAX);
359         res |= AST_VECTOR_INIT(&topic->upstream_topics, 0);
360         if (!topic->name || res) {
361                 ao2_cleanup(topic);
362                 return NULL;
363         }
364
365         return topic;
366 }
367
368 const char *stasis_topic_name(const struct stasis_topic *topic)
369 {
370         return topic->name;
371 }
372
373 /*! \internal */
374 struct stasis_subscription {
375         /*! Unique ID for this subscription */
376         char uniqueid[AST_UUID_STR_LEN];
377         /*! Topic subscribed to. */
378         struct stasis_topic *topic;
379         /*! Mailbox for processing incoming messages. */
380         struct ast_taskprocessor *mailbox;
381         /*! Callback function for incoming message processing. */
382         stasis_subscription_cb callback;
383         /*! Data pointer to be handed to the callback. */
384         void *data;
385
386         /*! Condition for joining with subscription. */
387         ast_cond_t join_cond;
388         /*! Flag set when final message for sub has been received.
389          *  Be sure join_lock is held before reading/setting. */
390         int final_message_rxed;
391         /*! Flag set when final message for sub has been processed.
392          *  Be sure join_lock is held before reading/setting. */
393         int final_message_processed;
394 };
395
396 static void subscription_dtor(void *obj)
397 {
398         struct stasis_subscription *sub = obj;
399
400         /* Subscriptions need to be manually unsubscribed before destruction
401          * b/c there's a cyclic reference between topics and subscriptions */
402         ast_assert(!stasis_subscription_is_subscribed(sub));
403         /* If there are any messages in flight to this subscription; that would
404          * be bad. */
405         ast_assert(stasis_subscription_is_done(sub));
406
407         ao2_cleanup(sub->topic);
408         sub->topic = NULL;
409         ast_taskprocessor_unreference(sub->mailbox);
410         sub->mailbox = NULL;
411         ast_cond_destroy(&sub->join_cond);
412 }
413
414 /*!
415  * \brief Invoke the subscription's callback.
416  * \param sub Subscription to invoke.
417  * \param topic Topic message was published to.
418  * \param message Message to send.
419  */
420 static void subscription_invoke(struct stasis_subscription *sub,
421                                   struct stasis_message *message)
422 {
423         /* Notify that the final message has been received */
424         if (stasis_subscription_final_message(sub, message)) {
425                 SCOPED_AO2LOCK(lock, sub);
426
427                 sub->final_message_rxed = 1;
428                 ast_cond_signal(&sub->join_cond);
429         }
430
431         /* Since sub is mostly immutable, no need to lock sub */
432         sub->callback(sub->data, sub, message);
433
434         /* Notify that the final message has been processed */
435         if (stasis_subscription_final_message(sub, message)) {
436                 SCOPED_AO2LOCK(lock, sub);
437
438                 sub->final_message_processed = 1;
439                 ast_cond_signal(&sub->join_cond);
440         }
441 }
442
443 static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
444 static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
445
446 void stasis_subscription_cb_noop(void *data, struct stasis_subscription *sub, struct stasis_message *message)
447 {
448 }
449
450 struct stasis_subscription *internal_stasis_subscribe(
451         struct stasis_topic *topic,
452         stasis_subscription_cb callback,
453         void *data,
454         int needs_mailbox,
455         int use_thread_pool)
456 {
457         RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup);
458
459         if (!topic) {
460                 return NULL;
461         }
462
463         /* The ao2 lock is used for join_cond. */
464         sub = ao2_t_alloc(sizeof(*sub), subscription_dtor, stasis_topic_name(topic));
465         if (!sub) {
466                 return NULL;
467         }
468         ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid));
469
470         if (needs_mailbox) {
471                 char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
472
473                 /* Create name with seq number appended. */
474                 ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "sub%c:%s",
475                         use_thread_pool ? 'p' : 'm',
476                         stasis_topic_name(topic));
477
478                 /*
479                  * With a small number of subscribers, a thread-per-sub is
480                  * acceptable. For a large number of subscribers, a thread
481                  * pool should be used.
482                  */
483                 if (use_thread_pool) {
484                         sub->mailbox = ast_threadpool_serializer(tps_name, pool);
485                 } else {
486                         sub->mailbox = ast_taskprocessor_get(tps_name, TPS_REF_DEFAULT);
487                 }
488                 if (!sub->mailbox) {
489                         return NULL;
490                 }
491                 ast_taskprocessor_set_local(sub->mailbox, sub);
492                 /* Taskprocessor has a reference */
493                 ao2_ref(sub, +1);
494         }
495
496         ao2_ref(topic, +1);
497         sub->topic = topic;
498         sub->callback = callback;
499         sub->data = data;
500         ast_cond_init(&sub->join_cond, NULL);
501
502         if (topic_add_subscription(topic, sub) != 0) {
503                 return NULL;
504         }
505         send_subscription_subscribe(topic, sub);
506
507         ao2_ref(sub, +1);
508         return sub;
509 }
510
511 struct stasis_subscription *stasis_subscribe(
512         struct stasis_topic *topic,
513         stasis_subscription_cb callback,
514         void *data)
515 {
516         return internal_stasis_subscribe(topic, callback, data, 1, 0);
517 }
518
519 struct stasis_subscription *stasis_subscribe_pool(
520         struct stasis_topic *topic,
521         stasis_subscription_cb callback,
522         void *data)
523 {
524         return internal_stasis_subscribe(topic, callback, data, 1, 1);
525 }
526
527 static int sub_cleanup(void *data)
528 {
529         struct stasis_subscription *sub = data;
530         ao2_cleanup(sub);
531         return 0;
532 }
533
534 struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
535 {
536         /* The subscription may be the last ref to this topic. Hold
537          * the topic ref open until after the unlock. */
538         RAII_VAR(struct stasis_topic *, topic,
539                 ao2_bump(sub ? sub->topic : NULL), ao2_cleanup);
540
541         if (!sub) {
542                 return NULL;
543         }
544
545         /* We have to remove the subscription first, to ensure the unsubscribe
546          * is the final message */
547         if (topic_remove_subscription(sub->topic, sub) != 0) {
548                 ast_log(LOG_ERROR,
549                         "Internal error: subscription has invalid topic\n");
550                 return NULL;
551         }
552
553         /* Now let everyone know about the unsubscribe */
554         send_subscription_unsubscribe(topic, sub);
555
556         /* When all that's done, remove the ref the mailbox has on the sub */
557         if (sub->mailbox) {
558                 ast_taskprocessor_push(sub->mailbox, sub_cleanup, sub);
559         }
560
561         /* Unsubscribing unrefs the subscription */
562         ao2_cleanup(sub);
563         return NULL;
564 }
565
566 int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscription,
567         long low_water, long high_water)
568 {
569         int res = -1;
570
571         if (subscription) {
572                 res = ast_taskprocessor_alert_set_levels(subscription->mailbox,
573                         low_water, high_water);
574         }
575         return res;
576 }
577
578 void stasis_subscription_join(struct stasis_subscription *subscription)
579 {
580         if (subscription) {
581                 SCOPED_AO2LOCK(lock, subscription);
582
583                 /* Wait until the processed flag has been set */
584                 while (!subscription->final_message_processed) {
585                         ast_cond_wait(&subscription->join_cond,
586                                 ao2_object_get_lockaddr(subscription));
587                 }
588         }
589 }
590
591 int stasis_subscription_is_done(struct stasis_subscription *subscription)
592 {
593         if (subscription) {
594                 SCOPED_AO2LOCK(lock, subscription);
595
596                 return subscription->final_message_rxed;
597         }
598
599         /* Null subscription is about as done as you can get */
600         return 1;
601 }
602
603 struct stasis_subscription *stasis_unsubscribe_and_join(
604         struct stasis_subscription *subscription)
605 {
606         if (!subscription) {
607                 return NULL;
608         }
609
610         /* Bump refcount to hold it past the unsubscribe */
611         ao2_ref(subscription, +1);
612         stasis_unsubscribe(subscription);
613         stasis_subscription_join(subscription);
614         /* Now decrement the refcount back */
615         ao2_cleanup(subscription);
616         return NULL;
617 }
618
619 int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
620 {
621         if (sub) {
622                 size_t i;
623                 struct stasis_topic *topic = sub->topic;
624                 SCOPED_AO2LOCK(lock_topic, topic);
625
626                 for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
627                         if (AST_VECTOR_GET(&topic->subscribers, i) == sub) {
628                                 return 1;
629                         }
630                 }
631         }
632
633         return 0;
634 }
635
636 const char *stasis_subscription_uniqueid(const struct stasis_subscription *sub)
637 {
638         return sub->uniqueid;
639 }
640
641 int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
642 {
643         struct stasis_subscription_change *change;
644
645         if (stasis_message_type(msg) != stasis_subscription_change_type()) {
646                 return 0;
647         }
648
649         change = stasis_message_data(msg);
650         if (strcmp("Unsubscribe", change->description)) {
651                 return 0;
652         }
653
654         if (strcmp(stasis_subscription_uniqueid(sub), change->uniqueid)) {
655                 return 0;
656         }
657
658         return 1;
659 }
660
661 /*!
662  * \brief Add a subscriber to a topic.
663  * \param topic Topic
664  * \param sub Subscriber
665  * \return 0 on success
666  * \return Non-zero on error
667  */
668 static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
669 {
670         size_t idx;
671         SCOPED_AO2LOCK(lock, topic);
672
673         /* The reference from the topic to the subscription is shared with
674          * the owner of the subscription, which will explicitly unsubscribe
675          * to release it.
676          *
677          * If we bumped the refcount here, the owner would have to unsubscribe
678          * and cleanup, which is a bit awkward. */
679         AST_VECTOR_APPEND(&topic->subscribers, sub);
680
681         for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
682                 topic_add_subscription(
683                         AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
684         }
685
686         return 0;
687 }
688
689 static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
690 {
691         size_t idx;
692         SCOPED_AO2LOCK(lock_topic, topic);
693
694         for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
695                 topic_remove_subscription(
696                         AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
697         }
698
699         return AST_VECTOR_REMOVE_ELEM_UNORDERED(&topic->subscribers, sub,
700                 AST_VECTOR_ELEM_CLEANUP_NOOP);
701 }
702
703 /*!
704  * \internal \brief Dispatch a message to a subscriber asynchronously
705  * \param local \ref ast_taskprocessor_local object
706  * \return 0
707  */
708 static int dispatch_exec_async(struct ast_taskprocessor_local *local)
709 {
710         struct stasis_subscription *sub = local->local_data;
711         struct stasis_message *message = local->data;
712
713         subscription_invoke(sub, message);
714         ao2_cleanup(message);
715
716         return 0;
717 }
718
719 /*!
720  * \internal \brief Data passed to \ref dispatch_exec_sync to synchronize
721  * a published message to a subscriber
722  */
723 struct sync_task_data {
724         ast_mutex_t lock;
725         ast_cond_t cond;
726         int complete;
727         void *task_data;
728 };
729
730 /*!
731  * \internal \brief Dispatch a message to a subscriber synchronously
732  * \param local \ref ast_taskprocessor_local object
733  * \return 0
734  */
735 static int dispatch_exec_sync(struct ast_taskprocessor_local *local)
736 {
737         struct stasis_subscription *sub = local->local_data;
738         struct sync_task_data *std = local->data;
739         struct stasis_message *message = std->task_data;
740
741         subscription_invoke(sub, message);
742         ao2_cleanup(message);
743
744         ast_mutex_lock(&std->lock);
745         std->complete = 1;
746         ast_cond_signal(&std->cond);
747         ast_mutex_unlock(&std->lock);
748
749         return 0;
750 }
751
752 /*!
753  * \internal \brief Dispatch a message to a subscriber
754  * \param sub The subscriber to dispatch to
755  * \param message The message to send
756  * \param synchronous If non-zero, synchronize on the subscriber receiving
757  * the message
758  */
759 static void dispatch_message(struct stasis_subscription *sub,
760         struct stasis_message *message,
761         int synchronous)
762 {
763         if (!sub->mailbox) {
764                 /* Dispatch directly */
765                 subscription_invoke(sub, message);
766                 return;
767         }
768
769         /* Bump the message for the taskprocessor push. This will get de-ref'd
770          * by the task processor callback.
771          */
772         ao2_bump(message);
773         if (!synchronous) {
774                 if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec_async, message)) {
775                         /* Push failed; ugh. */
776                         ast_log(LOG_ERROR, "Dropping async dispatch\n");
777                         ao2_cleanup(message);
778                 }
779         } else {
780                 struct sync_task_data std;
781
782                 ast_mutex_init(&std.lock);
783                 ast_cond_init(&std.cond, NULL);
784                 std.complete = 0;
785                 std.task_data = message;
786
787                 if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec_sync, &std)) {
788                         /* Push failed; ugh. */
789                         ast_log(LOG_ERROR, "Dropping sync dispatch\n");
790                         ao2_cleanup(message);
791                         ast_mutex_destroy(&std.lock);
792                         ast_cond_destroy(&std.cond);
793                         return;
794                 }
795
796                 ast_mutex_lock(&std.lock);
797                 while (!std.complete) {
798                         ast_cond_wait(&std.cond, &std.lock);
799                 }
800                 ast_mutex_unlock(&std.lock);
801
802                 ast_mutex_destroy(&std.lock);
803                 ast_cond_destroy(&std.cond);
804         }
805 }
806
807 /*!
808  * \internal \brief Publish a message to a topic's subscribers
809  * \brief topic The topic to publish to
810  * \brief message The message to publish
811  * \brief sync_sub An optional subscriber of the topic to publish synchronously
812  * to
813  */
814 static void publish_msg(struct stasis_topic *topic,
815         struct stasis_message *message, struct stasis_subscription *sync_sub)
816 {
817         size_t i;
818
819         ast_assert(topic != NULL);
820         ast_assert(message != NULL);
821
822         /*
823          * The topic may be unref'ed by the subscription invocation.
824          * Make sure we hold onto a reference while dispatching.
825          */
826         ao2_ref(topic, +1);
827         ao2_lock(topic);
828         for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
829                 struct stasis_subscription *sub = AST_VECTOR_GET(&topic->subscribers, i);
830
831                 ast_assert(sub != NULL);
832
833                 dispatch_message(sub, message, (sub == sync_sub));
834         }
835         ao2_unlock(topic);
836         ao2_ref(topic, -1);
837 }
838
839 void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
840 {
841         publish_msg(topic, message, NULL);
842 }
843
844 void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message)
845 {
846         ast_assert(sub != NULL);
847
848         publish_msg(sub->topic, message, sub);
849 }
850
851 /*!
852  * \brief Forwarding information
853  *
854  * Any message posted to \a from_topic is forwarded to \a to_topic.
855  *
856  * In cases where both the \a from_topic and \a to_topic need to be locked,
857  * always lock the \a to_topic first, then the \a from_topic. Lest you deadlock.
858  */
859 struct stasis_forward {
860         /*! Originating topic */
861         struct stasis_topic *from_topic;
862         /*! Destination topic */
863         struct stasis_topic *to_topic;
864 };
865
866 static void forward_dtor(void *obj)
867 {
868         struct stasis_forward *forward = obj;
869
870         ao2_cleanup(forward->from_topic);
871         forward->from_topic = NULL;
872         ao2_cleanup(forward->to_topic);
873         forward->to_topic = NULL;
874 }
875
876 struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward)
877 {
878         int idx;
879         struct stasis_topic *from;
880         struct stasis_topic *to;
881
882         if (!forward) {
883                 return NULL;
884         }
885
886         from = forward->from_topic;
887         to = forward->to_topic;
888
889         if (from && to) {
890                 topic_lock_both(to, from);
891                 AST_VECTOR_REMOVE_ELEM_UNORDERED(&to->upstream_topics, from,
892                         AST_VECTOR_ELEM_CLEANUP_NOOP);
893
894                 for (idx = 0; idx < AST_VECTOR_SIZE(&to->subscribers); ++idx) {
895                         topic_remove_subscription(from, AST_VECTOR_GET(&to->subscribers, idx));
896                 }
897                 ao2_unlock(from);
898                 ao2_unlock(to);
899         }
900
901         ao2_cleanup(forward);
902
903         return NULL;
904 }
905
906 struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic,
907         struct stasis_topic *to_topic)
908 {
909         int res;
910         size_t idx;
911         struct stasis_forward *forward;
912
913         if (!from_topic || !to_topic) {
914                 return NULL;
915         }
916
917         forward = ao2_alloc_options(sizeof(*forward), forward_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
918         if (!forward) {
919                 return NULL;
920         }
921
922         /* Forwards to ourselves are implicit. */
923         if (to_topic == from_topic) {
924                 return forward;
925         }
926
927         forward->from_topic = ao2_bump(from_topic);
928         forward->to_topic = ao2_bump(to_topic);
929
930         topic_lock_both(to_topic, from_topic);
931         res = AST_VECTOR_APPEND(&to_topic->upstream_topics, from_topic);
932         if (res != 0) {
933                 ao2_unlock(from_topic);
934                 ao2_unlock(to_topic);
935                 ao2_ref(forward, -1);
936                 return NULL;
937         }
938
939         for (idx = 0; idx < AST_VECTOR_SIZE(&to_topic->subscribers); ++idx) {
940                 topic_add_subscription(from_topic, AST_VECTOR_GET(&to_topic->subscribers, idx));
941         }
942         ao2_unlock(from_topic);
943         ao2_unlock(to_topic);
944
945         return forward;
946 }
947
948 static void subscription_change_dtor(void *obj)
949 {
950         struct stasis_subscription_change *change = obj;
951
952         ast_string_field_free_memory(change);
953         ao2_cleanup(change->topic);
954 }
955
956 static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, const char *uniqueid, const char *description)
957 {
958         struct stasis_subscription_change *change;
959
960         change = ao2_alloc(sizeof(struct stasis_subscription_change), subscription_change_dtor);
961         if (!change || ast_string_field_init(change, 128)) {
962                 ao2_cleanup(change);
963                 return NULL;
964         }
965
966         ast_string_field_set(change, uniqueid, uniqueid);
967         ast_string_field_set(change, description, description);
968         ao2_ref(topic, +1);
969         change->topic = topic;
970
971         return change;
972 }
973
974 static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
975 {
976         struct stasis_subscription_change *change;
977         struct stasis_message *msg;
978
979         /* This assumes that we have already unsubscribed */
980         ast_assert(stasis_subscription_is_subscribed(sub));
981
982         if (!stasis_subscription_change_type()) {
983                 return;
984         }
985
986         change = subscription_change_alloc(topic, sub->uniqueid, "Subscribe");
987         if (!change) {
988                 return;
989         }
990
991         msg = stasis_message_create(stasis_subscription_change_type(), change);
992         if (!msg) {
993                 ao2_cleanup(change);
994                 return;
995         }
996
997         stasis_publish(topic, msg);
998         ao2_cleanup(msg);
999         ao2_cleanup(change);
1000 }
1001
1002 static void send_subscription_unsubscribe(struct stasis_topic *topic,
1003         struct stasis_subscription *sub)
1004 {
1005         struct stasis_subscription_change *change;
1006         struct stasis_message *msg;
1007
1008         /* This assumes that we have already unsubscribed */
1009         ast_assert(!stasis_subscription_is_subscribed(sub));
1010
1011         if (!stasis_subscription_change_type()) {
1012                 return;
1013         }
1014
1015         change = subscription_change_alloc(topic, sub->uniqueid, "Unsubscribe");
1016         if (!change) {
1017                 return;
1018         }
1019
1020         msg = stasis_message_create(stasis_subscription_change_type(), change);
1021         if (!msg) {
1022                 ao2_cleanup(change);
1023                 return;
1024         }
1025
1026         stasis_publish(topic, msg);
1027
1028         /* Now we have to dispatch to the subscription itself */
1029         dispatch_message(sub, msg, 0);
1030
1031         ao2_cleanup(msg);
1032         ao2_cleanup(change);
1033 }
1034
1035 struct topic_pool_entry {
1036         struct stasis_forward *forward;
1037         struct stasis_topic *topic;
1038 };
1039
1040 static void topic_pool_entry_dtor(void *obj)
1041 {
1042         struct topic_pool_entry *entry = obj;
1043
1044         entry->forward = stasis_forward_cancel(entry->forward);
1045         ao2_cleanup(entry->topic);
1046         entry->topic = NULL;
1047 }
1048
1049 static struct topic_pool_entry *topic_pool_entry_alloc(void)
1050 {
1051         return ao2_alloc_options(sizeof(struct topic_pool_entry), topic_pool_entry_dtor,
1052                 AO2_ALLOC_OPT_LOCK_NOLOCK);
1053 }
1054
1055 struct stasis_topic_pool {
1056         struct ao2_container *pool_container;
1057         struct stasis_topic *pool_topic;
1058 };
1059
1060 static void topic_pool_dtor(void *obj)
1061 {
1062         struct stasis_topic_pool *pool = obj;
1063
1064         ao2_cleanup(pool->pool_container);
1065         pool->pool_container = NULL;
1066         ao2_cleanup(pool->pool_topic);
1067         pool->pool_topic = NULL;
1068 }
1069
1070 static int topic_pool_entry_hash(const void *obj, const int flags)
1071 {
1072         const struct topic_pool_entry *object;
1073         const char *key;
1074
1075         switch (flags & OBJ_SEARCH_MASK) {
1076         case OBJ_SEARCH_KEY:
1077                 key = obj;
1078                 break;
1079         case OBJ_SEARCH_OBJECT:
1080                 object = obj;
1081                 key = stasis_topic_name(object->topic);
1082                 break;
1083         default:
1084                 /* Hash can only work on something with a full key. */
1085                 ast_assert(0);
1086                 return 0;
1087         }
1088         return ast_str_case_hash(key);
1089 }
1090
1091 static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
1092 {
1093         const struct topic_pool_entry *object_left = obj;
1094         const struct topic_pool_entry *object_right = arg;
1095         const char *right_key = arg;
1096         int cmp;
1097
1098         switch (flags & OBJ_SEARCH_MASK) {
1099         case OBJ_SEARCH_OBJECT:
1100                 right_key = stasis_topic_name(object_right->topic);
1101                 /* Fall through */
1102         case OBJ_SEARCH_KEY:
1103                 cmp = strcasecmp(stasis_topic_name(object_left->topic), right_key);
1104                 break;
1105         case OBJ_SEARCH_PARTIAL_KEY:
1106                 /* Not supported by container */
1107                 ast_assert(0);
1108                 cmp = -1;
1109                 break;
1110         default:
1111                 /*
1112                  * What arg points to is specific to this traversal callback
1113                  * and has no special meaning to astobj2.
1114                  */
1115                 cmp = 0;
1116                 break;
1117         }
1118         if (cmp) {
1119                 return 0;
1120         }
1121         /*
1122          * At this point the traversal callback is identical to a sorted
1123          * container.
1124          */
1125         return CMP_MATCH;
1126 }
1127
1128 struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic)
1129 {
1130         struct stasis_topic_pool *pool;
1131
1132         pool = ao2_alloc_options(sizeof(*pool), topic_pool_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
1133         if (!pool) {
1134                 return NULL;
1135         }
1136
1137         pool->pool_container = ao2_container_alloc(TOPIC_POOL_BUCKETS,
1138                 topic_pool_entry_hash, topic_pool_entry_cmp);
1139         if (!pool->pool_container) {
1140                 ao2_cleanup(pool);
1141                 return NULL;
1142         }
1143         ao2_ref(pooled_topic, +1);
1144         pool->pool_topic = pooled_topic;
1145
1146         return pool;
1147 }
1148
1149 struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
1150 {
1151         RAII_VAR(struct topic_pool_entry *, topic_pool_entry, NULL, ao2_cleanup);
1152         SCOPED_AO2LOCK(topic_container_lock, pool->pool_container);
1153
1154         topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1155         if (topic_pool_entry) {
1156                 return topic_pool_entry->topic;
1157         }
1158
1159         topic_pool_entry = topic_pool_entry_alloc();
1160         if (!topic_pool_entry) {
1161                 return NULL;
1162         }
1163
1164         topic_pool_entry->topic = stasis_topic_create(topic_name);
1165         if (!topic_pool_entry->topic) {
1166                 return NULL;
1167         }
1168
1169         topic_pool_entry->forward = stasis_forward_all(topic_pool_entry->topic, pool->pool_topic);
1170         if (!topic_pool_entry->forward) {
1171                 return NULL;
1172         }
1173
1174         if (!ao2_link_flags(pool->pool_container, topic_pool_entry, OBJ_NOLOCK)) {
1175                 return NULL;
1176         }
1177
1178         return topic_pool_entry->topic;
1179 }
1180
1181 void stasis_log_bad_type_access(const char *name)
1182 {
1183 #ifdef AST_DEVMODE
1184         ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name);
1185 #endif
1186 }
1187
1188 /*! \brief A multi object blob data structure to carry user event stasis messages */
1189 struct ast_multi_object_blob {
1190         struct ast_json *blob;                             /*< A blob of JSON data */
1191         AST_VECTOR(, void *) snapshots[STASIS_UMOS_MAX];   /*< Vector of snapshots for each type */
1192 };
1193
1194 /*!
1195  * \internal
1196  * \brief Destructor for \ref ast_multi_object_blob objects
1197  */
1198 static void multi_object_blob_dtor(void *obj)
1199 {
1200         struct ast_multi_object_blob *multi = obj;
1201         int type;
1202         int i;
1203
1204         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1205                 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
1206                         ao2_cleanup(AST_VECTOR_GET(&multi->snapshots[type], i));
1207                 }
1208                 AST_VECTOR_FREE(&multi->snapshots[type]);
1209         }
1210         ast_json_unref(multi->blob);
1211 }
1212
1213 /*! \brief Create a stasis user event multi object blob */
1214 struct ast_multi_object_blob *ast_multi_object_blob_create(struct ast_json *blob)
1215 {
1216         int type;
1217         RAII_VAR(struct ast_multi_object_blob *, multi,
1218                         ao2_alloc(sizeof(*multi), multi_object_blob_dtor),
1219                         ao2_cleanup);
1220
1221         ast_assert(blob != NULL);
1222
1223         if (!multi) {
1224                 return NULL;
1225         }
1226
1227         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1228                 if (AST_VECTOR_INIT(&multi->snapshots[type], 0)) {
1229                         return NULL;
1230                 }
1231         }
1232
1233         multi->blob = ast_json_ref(blob);
1234
1235         ao2_ref(multi, +1);
1236         return multi;
1237 }
1238
1239 /*! \brief Add an object (snapshot) to the blob */
1240 void ast_multi_object_blob_add(struct ast_multi_object_blob *multi,
1241         enum stasis_user_multi_object_snapshot_type type, void *object)
1242 {
1243         if (!multi || !object || AST_VECTOR_APPEND(&multi->snapshots[type], object)) {
1244                 ao2_cleanup(object);
1245         }
1246 }
1247
1248 /*! \brief Publish single channel user event (for app_userevent compatibility) */
1249 void ast_multi_object_blob_single_channel_publish(struct ast_channel *chan,
1250         struct stasis_message_type *type, struct ast_json *blob)
1251 {
1252         RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
1253         RAII_VAR(struct ast_channel_snapshot *, channel_snapshot, NULL, ao2_cleanup);
1254         RAII_VAR(struct ast_multi_object_blob *, multi, NULL, ao2_cleanup);
1255
1256         if (!type) {
1257                 return;
1258         }
1259
1260         multi = ast_multi_object_blob_create(blob);
1261         if (!multi) {
1262                 return;
1263         }
1264
1265         channel_snapshot = ast_channel_snapshot_create(chan);
1266         ao2_ref(channel_snapshot, +1);
1267         ast_multi_object_blob_add(multi, STASIS_UMOS_CHANNEL, channel_snapshot);
1268
1269         message = stasis_message_create(type, multi);
1270         if (message) {
1271                 /* app_userevent still publishes to channel */
1272                 stasis_publish(ast_channel_topic(chan), message);
1273         }
1274 }
1275
1276 /*! \internal \brief convert multi object blob to ari json */
1277 static struct ast_json *multi_user_event_to_json(
1278         struct stasis_message *message,
1279         const struct stasis_message_sanitizer *sanitize)
1280 {
1281         RAII_VAR(struct ast_json *, out, NULL, ast_json_unref);
1282         struct ast_multi_object_blob *multi = stasis_message_data(message);
1283         struct ast_json *blob = multi->blob;
1284         const struct timeval *tv = stasis_message_timestamp(message);
1285         enum stasis_user_multi_object_snapshot_type type;
1286         int i;
1287
1288         out = ast_json_object_create();
1289         if (!out) {
1290                 return NULL;
1291         }
1292
1293         ast_json_object_set(out, "type", ast_json_string_create("ChannelUserevent"));
1294         ast_json_object_set(out, "timestamp", ast_json_timeval(*tv, NULL));
1295         ast_json_object_set(out, "eventname", ast_json_string_create(ast_json_string_get((ast_json_object_get(blob, "eventname")))));
1296         ast_json_object_set(out, "userevent", ast_json_deep_copy(blob));
1297
1298         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1299                 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
1300                         struct ast_json *json_object = NULL;
1301                         char *name = NULL;
1302                         void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
1303
1304                         switch (type) {
1305                         case STASIS_UMOS_CHANNEL:
1306                                 json_object = ast_channel_snapshot_to_json(snapshot, sanitize);
1307                                 name = "channel";
1308                                 break;
1309                         case STASIS_UMOS_BRIDGE:
1310                                 json_object = ast_bridge_snapshot_to_json(snapshot, sanitize);
1311                                 name = "bridge";
1312                                 break;
1313                         case STASIS_UMOS_ENDPOINT:
1314                                 json_object = ast_endpoint_snapshot_to_json(snapshot, sanitize);
1315                                 name = "endpoint";
1316                                 break;
1317                         }
1318                         if (json_object) {
1319                                 ast_json_object_set(out, name, json_object);
1320                         }
1321                 }
1322         }
1323         return ast_json_ref(out);
1324 }
1325
1326 /*! \internal \brief convert multi object blob to ami string */
1327 static struct ast_str *multi_object_blob_to_ami(void *obj)
1328 {
1329         struct ast_str *ami_str=ast_str_create(1024);
1330         struct ast_str *ami_snapshot;
1331         const struct ast_multi_object_blob *multi = obj;
1332         enum stasis_user_multi_object_snapshot_type type;
1333         int i;
1334
1335         if (!ami_str) {
1336                 return NULL;
1337         }
1338         if (!multi) {
1339                 ast_free(ami_str);
1340                 return NULL;
1341         }
1342
1343         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1344                 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
1345                         char *name = NULL;
1346                         void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
1347                         ami_snapshot = NULL;
1348
1349                         if (i > 0) {
1350                                 ast_asprintf(&name, "%d", i + 1);
1351                         }
1352
1353                         switch (type) {
1354                         case STASIS_UMOS_CHANNEL:
1355                                 ami_snapshot = ast_manager_build_channel_state_string_prefix(snapshot, name ?: "");
1356                                 break;
1357
1358                         case STASIS_UMOS_BRIDGE:
1359                                 ami_snapshot = ast_manager_build_bridge_state_string_prefix(snapshot, name ?: "");
1360                                 break;
1361
1362                         case STASIS_UMOS_ENDPOINT:
1363                                 /* currently not sending endpoint snapshots to AMI */
1364                                 break;
1365                         }
1366                         if (ami_snapshot) {
1367                                 ast_str_append(&ami_str, 0, "%s", ast_str_buffer(ami_snapshot));
1368                                 ast_free(ami_snapshot);
1369                         }
1370                         ast_free(name);
1371                 }
1372         }
1373
1374         return ami_str;
1375 }
1376
1377 /*! \internal \brief Callback to pass only user defined parameters from blob */
1378 static int userevent_exclusion_cb(const char *key)
1379 {
1380         if (!strcmp("eventname", key)) {
1381                 return 1;
1382         }
1383         return 0;
1384 }
1385
1386 static struct ast_manager_event_blob *multi_user_event_to_ami(
1387         struct stasis_message *message)
1388 {
1389         RAII_VAR(struct ast_str *, object_string, NULL, ast_free);
1390         RAII_VAR(struct ast_str *, body, NULL, ast_free);
1391         struct ast_multi_object_blob *multi = stasis_message_data(message);
1392         const char *eventname;
1393
1394         eventname = ast_json_string_get(ast_json_object_get(multi->blob, "eventname"));
1395         body = ast_manager_str_from_json_object(multi->blob, userevent_exclusion_cb);
1396         object_string = multi_object_blob_to_ami(multi);
1397         if (!object_string || !body) {
1398                 return NULL;
1399         }
1400
1401         return ast_manager_event_blob_create(EVENT_FLAG_USER, "UserEvent",
1402                 "%s"
1403                 "UserEvent: %s\r\n"
1404                 "%s",
1405                 ast_str_buffer(object_string),
1406                 eventname,
1407                 ast_str_buffer(body));
1408 }
1409
1410 /*! \brief A structure to hold global configuration-related options */
1411 struct stasis_declined_config {
1412         /*! The list of message types to decline */
1413         struct ao2_container *declined;
1414 };
1415
1416 /*! \brief Threadpool configuration options */
1417 struct stasis_threadpool_conf {
1418         /*! Initial size of the thread pool */
1419         int initial_size;
1420         /*! Time, in seconds, before we expire a thread */
1421         int idle_timeout_sec;
1422         /*! Maximum number of thread to allow */
1423         int max_size;
1424 };
1425
1426 struct stasis_config {
1427         /*! Thread pool configuration options */
1428         struct stasis_threadpool_conf *threadpool_options;
1429         /*! Declined message types */
1430         struct stasis_declined_config *declined_message_types;
1431 };
1432
1433 static struct aco_type threadpool_option = {
1434         .type = ACO_GLOBAL,
1435         .name = "threadpool",
1436         .item_offset = offsetof(struct stasis_config, threadpool_options),
1437         .category = "threadpool",
1438         .category_match = ACO_WHITELIST_EXACT,
1439 };
1440
1441 static struct aco_type *threadpool_options[] = ACO_TYPES(&threadpool_option);
1442
1443 /*! \brief An aco_type structure to link the "declined_message_types" category to the stasis_declined_config type */
1444 static struct aco_type declined_option = {
1445         .type = ACO_GLOBAL,
1446         .name = "declined_message_types",
1447         .item_offset = offsetof(struct stasis_config, declined_message_types),
1448         .category_match = ACO_WHITELIST_EXACT,
1449         .category = "declined_message_types",
1450 };
1451
1452 struct aco_type *declined_options[] = ACO_TYPES(&declined_option);
1453
1454 struct aco_file stasis_conf = {
1455         .filename = "stasis.conf",
1456         .types = ACO_TYPES(&declined_option, &threadpool_option),
1457 };
1458
1459 /*! \brief A global object container that will contain the stasis_config that gets swapped out on reloads */
1460 static AO2_GLOBAL_OBJ_STATIC(globals);
1461
1462 static void *stasis_config_alloc(void);
1463
1464 /*! \brief Register information about the configs being processed by this module */
1465 CONFIG_INFO_CORE("stasis", cfg_info, globals, stasis_config_alloc,
1466         .files = ACO_FILES(&stasis_conf),
1467 );
1468
1469 static void stasis_declined_config_destructor(void *obj)
1470 {
1471         struct stasis_declined_config *declined = obj;
1472
1473         ao2_cleanup(declined->declined);
1474 }
1475
1476 static void stasis_config_destructor(void *obj)
1477 {
1478         struct stasis_config *cfg = obj;
1479
1480         ao2_cleanup(cfg->declined_message_types);
1481         ast_free(cfg->threadpool_options);
1482 }
1483
1484 static void *stasis_config_alloc(void)
1485 {
1486         struct stasis_config *cfg;
1487
1488         if (!(cfg = ao2_alloc(sizeof(*cfg), stasis_config_destructor))) {
1489                 return NULL;
1490         }
1491
1492         cfg->threadpool_options = ast_calloc(1, sizeof(*cfg->threadpool_options));
1493         if (!cfg->threadpool_options) {
1494                 ao2_ref(cfg, -1);
1495                 return NULL;
1496         }
1497
1498         cfg->declined_message_types = ao2_alloc(sizeof(*cfg->declined_message_types),
1499                 stasis_declined_config_destructor);
1500         if (!cfg->declined_message_types) {
1501                 ao2_ref(cfg, -1);
1502                 return NULL;
1503         }
1504
1505         cfg->declined_message_types->declined = ast_str_container_alloc(13);
1506         if (!cfg->declined_message_types->declined) {
1507                 ao2_ref(cfg, -1);
1508                 return NULL;
1509         }
1510
1511         return cfg;
1512 }
1513
1514 int stasis_message_type_declined(const char *name)
1515 {
1516         RAII_VAR(struct stasis_config *, cfg, ao2_global_obj_ref(globals), ao2_cleanup);
1517         char *name_in_declined;
1518         int res;
1519
1520         if (!cfg || !cfg->declined_message_types) {
1521                 return 0;
1522         }
1523
1524         name_in_declined = ao2_find(cfg->declined_message_types->declined, name, OBJ_SEARCH_KEY);
1525         res = name_in_declined ? 1 : 0;
1526         ao2_cleanup(name_in_declined);
1527         if (res) {
1528                 ast_log(LOG_NOTICE, "Declining to allocate Stasis message type '%s' due to configuration\n", name);
1529         }
1530         return res;
1531 }
1532
1533 static int declined_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
1534 {
1535         struct stasis_declined_config *declined = obj;
1536
1537         if (ast_strlen_zero(var->value)) {
1538                 return 0;
1539         }
1540
1541         if (ast_str_container_add(declined->declined, var->value)) {
1542                 return -1;
1543         }
1544
1545         return 0;
1546 }
1547
1548 /*!
1549  * @{ \brief Define multi user event message type(s).
1550  */
1551
1552 STASIS_MESSAGE_TYPE_DEFN(ast_multi_user_event_type,
1553         .to_json = multi_user_event_to_json,
1554         .to_ami = multi_user_event_to_ami,
1555         );
1556
1557 /*! @} */
1558
1559 /*! \brief Cleanup function for graceful shutdowns */
1560 static void stasis_cleanup(void)
1561 {
1562         ast_threadpool_shutdown(pool);
1563         pool = NULL;
1564         STASIS_MESSAGE_TYPE_CLEANUP(stasis_subscription_change_type);
1565         STASIS_MESSAGE_TYPE_CLEANUP(ast_multi_user_event_type);
1566         aco_info_destroy(&cfg_info);
1567         ao2_global_obj_release(globals);
1568 }
1569
1570 int stasis_init(void)
1571 {
1572         RAII_VAR(struct stasis_config *, cfg, NULL, ao2_cleanup);
1573         int cache_init;
1574         struct ast_threadpool_options threadpool_opts = { 0, };
1575
1576         /* Be sure the types are cleaned up after the message bus */
1577         ast_register_cleanup(stasis_cleanup);
1578
1579         if (aco_info_init(&cfg_info)) {
1580                 return -1;
1581         }
1582
1583         aco_option_register_custom(&cfg_info, "decline", ACO_EXACT,
1584                 declined_options, "", declined_handler, 0);
1585         aco_option_register(&cfg_info, "initial_size", ACO_EXACT,
1586                 threadpool_options, "5", OPT_INT_T, PARSE_IN_RANGE,
1587                 FLDSET(struct stasis_threadpool_conf, initial_size), 0,
1588                 INT_MAX);
1589         aco_option_register(&cfg_info, "idle_timeout_sec", ACO_EXACT,
1590                 threadpool_options, "20", OPT_INT_T, PARSE_IN_RANGE,
1591                 FLDSET(struct stasis_threadpool_conf, idle_timeout_sec), 0,
1592                 INT_MAX);
1593         aco_option_register(&cfg_info, "max_size", ACO_EXACT,
1594                 threadpool_options, "50", OPT_INT_T, PARSE_IN_RANGE,
1595                 FLDSET(struct stasis_threadpool_conf, max_size), 0,
1596                 INT_MAX);
1597
1598         if (aco_process_config(&cfg_info, 0) == ACO_PROCESS_ERROR) {
1599                 struct stasis_config *default_cfg = stasis_config_alloc();
1600
1601                 if (!default_cfg) {
1602                         return -1;
1603                 }
1604
1605                 if (aco_set_defaults(&threadpool_option, "threadpool", default_cfg->threadpool_options)) {
1606                         ast_log(LOG_ERROR, "Failed to initialize defaults on Stasis configuration object\n");
1607                         ao2_ref(default_cfg, -1);
1608                         return -1;
1609                 }
1610
1611                 if (aco_set_defaults(&declined_option, "declined_message_types", default_cfg->declined_message_types)) {
1612                         ast_log(LOG_ERROR, "Failed to load stasis.conf and failed to initialize defaults.\n");
1613                         return -1;
1614                 }
1615
1616                 ast_log(LOG_NOTICE, "Could not load Stasis configuration; using defaults\n");
1617                 ao2_global_obj_replace_unref(globals, default_cfg);
1618                 cfg = default_cfg;
1619         } else {
1620                 cfg = ao2_global_obj_ref(globals);
1621                 if (!cfg) {
1622                         ast_log(LOG_ERROR, "Failed to obtain Stasis configuration object\n");
1623                         return -1;
1624                 }
1625         }
1626
1627         threadpool_opts.version = AST_THREADPOOL_OPTIONS_VERSION;
1628         threadpool_opts.initial_size = cfg->threadpool_options->initial_size;
1629         threadpool_opts.auto_increment = 1;
1630         threadpool_opts.max_size = cfg->threadpool_options->max_size;
1631         threadpool_opts.idle_timeout = cfg->threadpool_options->idle_timeout_sec;
1632         pool = ast_threadpool_create("stasis-core", NULL, &threadpool_opts);
1633         if (!pool) {
1634                 ast_log(LOG_ERROR, "Failed to create 'stasis-core' threadpool\n");
1635                 return -1;
1636         }
1637
1638         cache_init = stasis_cache_init();
1639         if (cache_init != 0) {
1640                 return -1;
1641         }
1642
1643         if (STASIS_MESSAGE_TYPE_INIT(stasis_subscription_change_type) != 0) {
1644                 return -1;
1645         }
1646         if (STASIS_MESSAGE_TYPE_INIT(ast_multi_user_event_type) != 0) {
1647                 return -1;
1648         }
1649
1650         return 0;
1651 }