stasis: Remove silly usage of RAII_VAR.
[asterisk/asterisk.git] / main / stasis.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis Message Bus API.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 #include "asterisk/astobj2.h"
33 #include "asterisk/stasis_internal.h"
34 #include "asterisk/stasis.h"
35 #include "asterisk/taskprocessor.h"
36 #include "asterisk/threadpool.h"
37 #include "asterisk/utils.h"
38 #include "asterisk/uuid.h"
39 #include "asterisk/vector.h"
40 #include "asterisk/stasis_channels.h"
41 #include "asterisk/stasis_bridges.h"
42 #include "asterisk/stasis_endpoints.h"
43 #include "asterisk/config_options.h"
44
45 /*** DOCUMENTATION
46         <managerEvent language="en_US" name="UserEvent">
47                 <managerEventInstance class="EVENT_FLAG_USER">
48                         <synopsis>A user defined event raised from the dialplan.</synopsis>
49                         <syntax>
50                                 <channel_snapshot/>
51                                 <parameter name="UserEvent">
52                                         <para>The event name, as specified in the dialplan.</para>
53                                 </parameter>
54                         </syntax>
55                         <description>
56                                 <para>Event may contain additional arbitrary parameters in addition to optional bridge and endpoint snapshots.  Multiple snapshots of the same type are prefixed with a numeric value.</para>
57                         </description>
58                         <see-also>
59                                 <ref type="application">UserEvent</ref>
60                                 <ref type="managerEvent">UserEvent</ref>
61                         </see-also>
62                 </managerEventInstance>
63         </managerEvent>
64         <configInfo name="stasis" language="en_US">
65                 <configFile name="stasis.conf">
66                         <configObject name="threadpool">
67                                 <synopsis>Settings that configure the threadpool Stasis uses to deliver some messages.</synopsis>
68                                 <configOption name="initial_size" default="5">
69                                         <synopsis>Initial number of threads in the message bus threadpool.</synopsis>
70                                 </configOption>
71                                 <configOption name="idle_timeout_sec" default="20">
72                                         <synopsis>Number of seconds before an idle thread is disposed of.</synopsis>
73                                 </configOption>
74                                 <configOption name="max_size" default="50">
75                                         <synopsis>Maximum number of threads in the threadpool.</synopsis>
76                                 </configOption>
77                         </configObject>
78                         <configObject name="declined_message_types">
79                                 <synopsis>Stasis message types for which to decline creation.</synopsis>
80                                 <configOption name="decline">
81                                         <synopsis>The message type to decline.</synopsis>
82                                         <description>
83                                                 <para>This configuration option defines the name of the Stasis
84                                                 message type that Asterisk is forbidden from creating and can be
85                                                 specified as many times as necessary to achieve the desired result.</para>
86                                                 <enumlist>
87                                                         <enum name="stasis_app_recording_snapshot_type" />
88                                                         <enum name="stasis_app_playback_snapshot_type" />
89                                                         <enum name="stasis_test_message_type" />
90                                                         <enum name="confbridge_start_type" />
91                                                         <enum name="confbridge_end_type" />
92                                                         <enum name="confbridge_join_type" />
93                                                         <enum name="confbridge_leave_type" />
94                                                         <enum name="confbridge_start_record_type" />
95                                                         <enum name="confbridge_stop_record_type" />
96                                                         <enum name="confbridge_mute_type" />
97                                                         <enum name="confbridge_unmute_type" />
98                                                         <enum name="confbridge_talking_type" />
99                                                         <enum name="cel_generic_type" />
100                                                         <enum name="ast_bridge_snapshot_type" />
101                                                         <enum name="ast_bridge_merge_message_type" />
102                                                         <enum name="ast_channel_entered_bridge_type" />
103                                                         <enum name="ast_channel_left_bridge_type" />
104                                                         <enum name="ast_blind_transfer_type" />
105                                                         <enum name="ast_attended_transfer_type" />
106                                                         <enum name="ast_endpoint_snapshot_type" />
107                                                         <enum name="ast_endpoint_state_type" />
108                                                         <enum name="ast_device_state_message_type" />
109                                                         <enum name="ast_test_suite_message_type" />
110                                                         <enum name="ast_mwi_state_type" />
111                                                         <enum name="ast_mwi_vm_app_type" />
112                                                         <enum name="ast_format_register_type" />
113                                                         <enum name="ast_format_unregister_type" />
114                                                         <enum name="ast_manager_get_generic_type" />
115                                                         <enum name="ast_parked_call_type" />
116                                                         <enum name="ast_channel_snapshot_type" />
117                                                         <enum name="ast_channel_dial_type" />
118                                                         <enum name="ast_channel_varset_type" />
119                                                         <enum name="ast_channel_hangup_request_type" />
120                                                         <enum name="ast_channel_dtmf_begin_type" />
121                                                         <enum name="ast_channel_dtmf_end_type" />
122                                                         <enum name="ast_channel_hold_type" />
123                                                         <enum name="ast_channel_unhold_type" />
124                                                         <enum name="ast_channel_chanspy_start_type" />
125                                                         <enum name="ast_channel_chanspy_stop_type" />
126                                                         <enum name="ast_channel_fax_type" />
127                                                         <enum name="ast_channel_hangup_handler_type" />
128                                                         <enum name="ast_channel_moh_start_type" />
129                                                         <enum name="ast_channel_moh_stop_type" />
130                                                         <enum name="ast_channel_monitor_start_type" />
131                                                         <enum name="ast_channel_monitor_stop_type" />
132                                                         <enum name="ast_channel_agent_login_type" />
133                                                         <enum name="ast_channel_agent_logoff_type" />
134                                                         <enum name="ast_channel_talking_start" />
135                                                         <enum name="ast_channel_talking_stop" />
136                                                         <enum name="ast_security_event_type" />
137                                                         <enum name="ast_named_acl_change_type" />
138                                                         <enum name="ast_local_bridge_type" />
139                                                         <enum name="ast_local_optimization_begin_type" />
140                                                         <enum name="ast_local_optimization_end_type" />
141                                                         <enum name="stasis_subscription_change_type" />
142                                                         <enum name="ast_multi_user_event_type" />
143                                                         <enum name="stasis_cache_clear_type" />
144                                                         <enum name="stasis_cache_update_type" />
145                                                         <enum name="ast_network_change_type" />
146                                                         <enum name="ast_system_registry_type" />
147                                                         <enum name="ast_cc_available_type" />
148                                                         <enum name="ast_cc_offertimerstart_type" />
149                                                         <enum name="ast_cc_requested_type" />
150                                                         <enum name="ast_cc_requestacknowledged_type" />
151                                                         <enum name="ast_cc_callerstopmonitoring_type" />
152                                                         <enum name="ast_cc_callerstartmonitoring_type" />
153                                                         <enum name="ast_cc_callerrecalling_type" />
154                                                         <enum name="ast_cc_recallcomplete_type" />
155                                                         <enum name="ast_cc_failure_type" />
156                                                         <enum name="ast_cc_monitorfailed_type" />
157                                                         <enum name="ast_presence_state_message_type" />
158                                                         <enum name="ast_rtp_rtcp_sent_type" />
159                                                         <enum name="ast_rtp_rtcp_received_type" />
160                                                         <enum name="ast_call_pickup_type" />
161                                                         <enum name="aoc_s_type" />
162                                                         <enum name="aoc_d_type" />
163                                                         <enum name="aoc_e_type" />
164                                                         <enum name="dahdichannel_type" />
165                                                         <enum name="mcid_type" />
166                                                         <enum name="session_timeout_type" />
167                                                         <enum name="cdr_read_message_type" />
168                                                         <enum name="cdr_write_message_type" />
169                                                         <enum name="cdr_prop_write_message_type" />
170                                                         <enum name="corosync_ping_message_type" />
171                                                         <enum name="agi_exec_start_type" />
172                                                         <enum name="agi_exec_end_type" />
173                                                         <enum name="agi_async_start_type" />
174                                                         <enum name="agi_async_exec_type" />
175                                                         <enum name="agi_async_end_type" />
176                                                         <enum name="queue_caller_join_type" />
177                                                         <enum name="queue_caller_leave_type" />
178                                                         <enum name="queue_caller_abandon_type" />
179                                                         <enum name="queue_member_status_type" />
180                                                         <enum name="queue_member_added_type" />
181                                                         <enum name="queue_member_removed_type" />
182                                                         <enum name="queue_member_pause_type" />
183                                                         <enum name="queue_member_penalty_type" />
184                                                         <enum name="queue_member_ringinuse_type" />
185                                                         <enum name="queue_agent_called_type" />
186                                                         <enum name="queue_agent_connect_type" />
187                                                         <enum name="queue_agent_complete_type" />
188                                                         <enum name="queue_agent_dump_type" />
189                                                         <enum name="queue_agent_ringnoanswer_type" />
190                                                         <enum name="meetme_join_type" />
191                                                         <enum name="meetme_leave_type" />
192                                                         <enum name="meetme_end_type" />
193                                                         <enum name="meetme_mute_type" />
194                                                         <enum name="meetme_talking_type" />
195                                                         <enum name="meetme_talk_request_type" />
196                                                         <enum name="appcdr_message_type" />
197                                                         <enum name="forkcdr_message_type" />
198                                                         <enum name="cdr_sync_message_type" />
199                                                 </enumlist>
200                                         </description>
201                                 </configOption>
202                         </configObject>
203                 </configFile>
204         </configInfo>
205 ***/
206
207 /*!
208  * \page stasis-impl Stasis Implementation Notes
209  *
210  * \par Reference counting
211  *
212  * Stasis introduces a number of objects, which are tightly related to one
213  * another. Because we rely on ref-counting for memory management, understanding
214  * these relationships is important to understanding this code.
215  *
216  * \code{.txt}
217  *
218  *   stasis_topic <----> stasis_subscription
219  *             ^          ^
220  *              \        /
221  *               \      /
222  *               dispatch
223  *                  |
224  *                  |
225  *                  v
226  *            stasis_message
227  *                  |
228  *                  |
229  *                  v
230  *          stasis_message_type
231  *
232  * \endcode
233  *
234  * The most troubling thing in this chart is the cyclic reference between
235  * stasis_topic and stasis_subscription. This is both unfortunate, and
236  * necessary. Topics need the subscription in order to dispatch messages;
237  * subscriptions need the topics to unsubscribe and check subscription status.
238  *
239  * The cycle is broken by stasis_unsubscribe(). The unsubscribe will remove the
240  * topic's reference to a subscription. When the subcription is destroyed, it
241  * will remove its reference to the topic.
242  *
243  * This means that until a subscription has be explicitly unsubscribed, it will
244  * not be destroyed. Neither will a topic be destroyed while it has subscribers.
245  * The destructors of both have assertions regarding this to catch ref-counting
246  * problems where a subscription or topic has had an extra ao2_cleanup().
247  *
248  * The \ref dispatch object is a transient object, which is posted to a
249  * subscription's taskprocessor to send a message to the subscriber. They have
250  * short life cycles, allocated on one thread, destroyed on another.
251  *
252  * During shutdown, or the deletion of a domain object, there are a flurry of
253  * ao2_cleanup()s on subscriptions and topics, as the final in-flight messages
254  * are processed. Any one of these cleanups could be the one to actually destroy
255  * a given object, so care must be taken to ensure that an object isn't
256  * referenced after an ao2_cleanup(). This includes the implicit ao2_unlock()
257  * that might happen when a RAII_VAR() goes out of scope.
258  *
259  * \par Typical life cycles
260  *
261  *  \li stasis_topic - There are several topics which live for the duration of
262  *      the Asterisk process (ast_channel_topic_all(), etc.) but most of these
263  *      are actually fed by shorter-lived topics whose lifetime is associated
264  *      with some domain object (like ast_channel_topic() for a given
265  *      ast_channel).
266  *
267  *  \li stasis_subscription - Subscriptions have a similar mix of lifetimes as
268  *      topics, for similar reasons.
269  *
270  *  \li dispatch - Very short lived; just long enough to post a message to a
271  *      subscriber.
272  *
273  *  \li stasis_message - Short to intermediate lifetimes, but that is mostly
274  *      irrelevant. Messages are strictly data and have no behavior associated
275  *      with them, so it doesn't really matter if/when they are destroyed. By
276  *      design, a component could hold a ref to a message forever without any
277  *      ill consequences (aside from consuming more memory).
278  *
279  *  \li stasis_message_type - Long life cycles, typically only destroyed on
280  *      module unloading or _clean_ process exit.
281  *
282  * \par Subscriber shutdown sequencing
283  *
284  * Subscribers are sensitive to shutdown sequencing, specifically in how the
285  * reference message types. This is fully detailed on the wiki at
286  * https://wiki.asterisk.org/wiki/x/K4BqAQ.
287  *
288  * In short, the lifetime of the \a data (and \a callback, if in a module) must
289  * be held until the stasis_subscription_final_message() has been received.
290  * Depending on the structure of the subscriber code, this can be handled by
291  * using stasis_subscription_final_message() to free resources on the final
292  * message, or using stasis_subscription_join()/stasis_unsubscribe_and_join() to
293  * block until the unsubscribe has completed.
294  */
295
296 /*! Initial size of the subscribers list. */
297 #define INITIAL_SUBSCRIBERS_MAX 4
298
299 /*! The number of buckets to use for topic pools */
300 #define TOPIC_POOL_BUCKETS 57
301
302 /*! Thread pool for topics that don't want a dedicated taskprocessor */
303 static struct ast_threadpool *pool;
304
305 STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type);
306
307 /*! \internal */
308 struct stasis_topic {
309         char *name;
310         /*! Variable length array of the subscribers */
311         AST_VECTOR(, struct stasis_subscription *) subscribers;
312
313         /*! Topics forwarding into this topic */
314         AST_VECTOR(, struct stasis_topic *) upstream_topics;
315 };
316
317 /* Forward declarations for the tightly-coupled subscription object */
318 static int topic_add_subscription(struct stasis_topic *topic,
319         struct stasis_subscription *sub);
320
321 static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
322
323 /*! \brief Lock two topics. */
324 #define topic_lock_both(topic1, topic2) \
325         do { \
326                 ao2_lock(topic1); \
327                 while (ao2_trylock(topic2)) { \
328                         AO2_DEADLOCK_AVOIDANCE(topic1); \
329                 } \
330         } while (0)
331
332 static void topic_dtor(void *obj)
333 {
334         struct stasis_topic *topic = obj;
335
336         /* Subscribers hold a reference to topics, so they should all be
337          * unsubscribed before we get here. */
338         ast_assert(AST_VECTOR_SIZE(&topic->subscribers) == 0);
339
340         ast_free(topic->name);
341         topic->name = NULL;
342
343         AST_VECTOR_FREE(&topic->subscribers);
344         AST_VECTOR_FREE(&topic->upstream_topics);
345 }
346
347 struct stasis_topic *stasis_topic_create(const char *name)
348 {
349         struct stasis_topic *topic;
350         int res = 0;
351
352         topic = ao2_t_alloc(sizeof(*topic), topic_dtor, name);
353         if (!topic) {
354                 return NULL;
355         }
356
357         topic->name = ast_strdup(name);
358         res |= AST_VECTOR_INIT(&topic->subscribers, INITIAL_SUBSCRIBERS_MAX);
359         res |= AST_VECTOR_INIT(&topic->upstream_topics, 0);
360         if (!topic->name || res) {
361                 ao2_cleanup(topic);
362                 return NULL;
363         }
364
365         return topic;
366 }
367
368 const char *stasis_topic_name(const struct stasis_topic *topic)
369 {
370         return topic->name;
371 }
372
373 /*! \internal */
374 struct stasis_subscription {
375         /*! Unique ID for this subscription */
376         char uniqueid[AST_UUID_STR_LEN];
377         /*! Topic subscribed to. */
378         struct stasis_topic *topic;
379         /*! Mailbox for processing incoming messages. */
380         struct ast_taskprocessor *mailbox;
381         /*! Callback function for incoming message processing. */
382         stasis_subscription_cb callback;
383         /*! Data pointer to be handed to the callback. */
384         void *data;
385
386         /*! Condition for joining with subscription. */
387         ast_cond_t join_cond;
388         /*! Flag set when final message for sub has been received.
389          *  Be sure join_lock is held before reading/setting. */
390         int final_message_rxed;
391         /*! Flag set when final message for sub has been processed.
392          *  Be sure join_lock is held before reading/setting. */
393         int final_message_processed;
394 };
395
396 static void subscription_dtor(void *obj)
397 {
398         struct stasis_subscription *sub = obj;
399
400         /* Subscriptions need to be manually unsubscribed before destruction
401          * b/c there's a cyclic reference between topics and subscriptions */
402         ast_assert(!stasis_subscription_is_subscribed(sub));
403         /* If there are any messages in flight to this subscription; that would
404          * be bad. */
405         ast_assert(stasis_subscription_is_done(sub));
406
407         ao2_cleanup(sub->topic);
408         sub->topic = NULL;
409         ast_taskprocessor_unreference(sub->mailbox);
410         sub->mailbox = NULL;
411         ast_cond_destroy(&sub->join_cond);
412 }
413
414 /*!
415  * \brief Invoke the subscription's callback.
416  * \param sub Subscription to invoke.
417  * \param topic Topic message was published to.
418  * \param message Message to send.
419  */
420 static void subscription_invoke(struct stasis_subscription *sub,
421                                   struct stasis_message *message)
422 {
423         /* Notify that the final message has been received */
424         if (stasis_subscription_final_message(sub, message)) {
425                 ao2_lock(sub);
426                 sub->final_message_rxed = 1;
427                 ast_cond_signal(&sub->join_cond);
428                 ao2_unlock(sub);
429         }
430
431         /* Since sub is mostly immutable, no need to lock sub */
432         sub->callback(sub->data, sub, message);
433
434         /* Notify that the final message has been processed */
435         if (stasis_subscription_final_message(sub, message)) {
436                 ao2_lock(sub);
437                 sub->final_message_processed = 1;
438                 ast_cond_signal(&sub->join_cond);
439                 ao2_unlock(sub);
440         }
441 }
442
443 static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
444 static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
445
446 void stasis_subscription_cb_noop(void *data, struct stasis_subscription *sub, struct stasis_message *message)
447 {
448 }
449
450 struct stasis_subscription *internal_stasis_subscribe(
451         struct stasis_topic *topic,
452         stasis_subscription_cb callback,
453         void *data,
454         int needs_mailbox,
455         int use_thread_pool)
456 {
457         struct stasis_subscription *sub;
458
459         if (!topic) {
460                 return NULL;
461         }
462
463         /* The ao2 lock is used for join_cond. */
464         sub = ao2_t_alloc(sizeof(*sub), subscription_dtor, stasis_topic_name(topic));
465         if (!sub) {
466                 return NULL;
467         }
468         ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid));
469
470         if (needs_mailbox) {
471                 char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
472
473                 /* Create name with seq number appended. */
474                 ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "sub%c:%s",
475                         use_thread_pool ? 'p' : 'm',
476                         stasis_topic_name(topic));
477
478                 /*
479                  * With a small number of subscribers, a thread-per-sub is
480                  * acceptable. For a large number of subscribers, a thread
481                  * pool should be used.
482                  */
483                 if (use_thread_pool) {
484                         sub->mailbox = ast_threadpool_serializer(tps_name, pool);
485                 } else {
486                         sub->mailbox = ast_taskprocessor_get(tps_name, TPS_REF_DEFAULT);
487                 }
488                 if (!sub->mailbox) {
489                         ao2_ref(sub, -1);
490
491                         return NULL;
492                 }
493                 ast_taskprocessor_set_local(sub->mailbox, sub);
494                 /* Taskprocessor has a reference */
495                 ao2_ref(sub, +1);
496         }
497
498         ao2_ref(topic, +1);
499         sub->topic = topic;
500         sub->callback = callback;
501         sub->data = data;
502         ast_cond_init(&sub->join_cond, NULL);
503
504         if (topic_add_subscription(topic, sub) != 0) {
505                 ao2_ref(sub, -1);
506
507                 return NULL;
508         }
509         send_subscription_subscribe(topic, sub);
510
511         return sub;
512 }
513
514 struct stasis_subscription *stasis_subscribe(
515         struct stasis_topic *topic,
516         stasis_subscription_cb callback,
517         void *data)
518 {
519         return internal_stasis_subscribe(topic, callback, data, 1, 0);
520 }
521
522 struct stasis_subscription *stasis_subscribe_pool(
523         struct stasis_topic *topic,
524         stasis_subscription_cb callback,
525         void *data)
526 {
527         return internal_stasis_subscribe(topic, callback, data, 1, 1);
528 }
529
530 static int sub_cleanup(void *data)
531 {
532         struct stasis_subscription *sub = data;
533         ao2_cleanup(sub);
534         return 0;
535 }
536
537 struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
538 {
539         /* The subscription may be the last ref to this topic. Hold
540          * the topic ref open until after the unlock. */
541         struct stasis_topic *topic;
542
543         if (!sub) {
544                 return NULL;
545         }
546
547         topic = ao2_bump(sub->topic);
548
549         /* We have to remove the subscription first, to ensure the unsubscribe
550          * is the final message */
551         if (topic_remove_subscription(sub->topic, sub) != 0) {
552                 ast_log(LOG_ERROR,
553                         "Internal error: subscription has invalid topic\n");
554                 ao2_cleanup(topic);
555
556                 return NULL;
557         }
558
559         /* Now let everyone know about the unsubscribe */
560         send_subscription_unsubscribe(topic, sub);
561
562         /* When all that's done, remove the ref the mailbox has on the sub */
563         if (sub->mailbox) {
564                 ast_taskprocessor_push(sub->mailbox, sub_cleanup, sub);
565         }
566
567         /* Unsubscribing unrefs the subscription */
568         ao2_cleanup(sub);
569         ao2_cleanup(topic);
570
571         return NULL;
572 }
573
574 int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscription,
575         long low_water, long high_water)
576 {
577         int res = -1;
578
579         if (subscription) {
580                 res = ast_taskprocessor_alert_set_levels(subscription->mailbox,
581                         low_water, high_water);
582         }
583         return res;
584 }
585
586 void stasis_subscription_join(struct stasis_subscription *subscription)
587 {
588         if (subscription) {
589                 ao2_lock(subscription);
590                 /* Wait until the processed flag has been set */
591                 while (!subscription->final_message_processed) {
592                         ast_cond_wait(&subscription->join_cond,
593                                 ao2_object_get_lockaddr(subscription));
594                 }
595                 ao2_unlock(subscription);
596         }
597 }
598
599 int stasis_subscription_is_done(struct stasis_subscription *subscription)
600 {
601         if (subscription) {
602                 int ret;
603
604                 ao2_lock(subscription);
605                 ret = subscription->final_message_rxed;
606                 ao2_unlock(subscription);
607
608                 return ret;
609         }
610
611         /* Null subscription is about as done as you can get */
612         return 1;
613 }
614
615 struct stasis_subscription *stasis_unsubscribe_and_join(
616         struct stasis_subscription *subscription)
617 {
618         if (!subscription) {
619                 return NULL;
620         }
621
622         /* Bump refcount to hold it past the unsubscribe */
623         ao2_ref(subscription, +1);
624         stasis_unsubscribe(subscription);
625         stasis_subscription_join(subscription);
626         /* Now decrement the refcount back */
627         ao2_cleanup(subscription);
628         return NULL;
629 }
630
631 int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
632 {
633         if (sub) {
634                 size_t i;
635                 struct stasis_topic *topic = sub->topic;
636
637                 ao2_lock(topic);
638                 for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
639                         if (AST_VECTOR_GET(&topic->subscribers, i) == sub) {
640                                 ao2_unlock(topic);
641                                 return 1;
642                         }
643                 }
644                 ao2_unlock(topic);
645         }
646
647         return 0;
648 }
649
650 const char *stasis_subscription_uniqueid(const struct stasis_subscription *sub)
651 {
652         return sub->uniqueid;
653 }
654
655 int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
656 {
657         struct stasis_subscription_change *change;
658
659         if (stasis_message_type(msg) != stasis_subscription_change_type()) {
660                 return 0;
661         }
662
663         change = stasis_message_data(msg);
664         if (strcmp("Unsubscribe", change->description)) {
665                 return 0;
666         }
667
668         if (strcmp(stasis_subscription_uniqueid(sub), change->uniqueid)) {
669                 return 0;
670         }
671
672         return 1;
673 }
674
675 /*!
676  * \brief Add a subscriber to a topic.
677  * \param topic Topic
678  * \param sub Subscriber
679  * \return 0 on success
680  * \return Non-zero on error
681  */
682 static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
683 {
684         size_t idx;
685
686         ao2_lock(topic);
687         /* The reference from the topic to the subscription is shared with
688          * the owner of the subscription, which will explicitly unsubscribe
689          * to release it.
690          *
691          * If we bumped the refcount here, the owner would have to unsubscribe
692          * and cleanup, which is a bit awkward. */
693         AST_VECTOR_APPEND(&topic->subscribers, sub);
694
695         for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
696                 topic_add_subscription(
697                         AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
698         }
699         ao2_unlock(topic);
700
701         return 0;
702 }
703
704 static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
705 {
706         size_t idx;
707         int res;
708
709         ao2_lock(topic);
710         for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
711                 topic_remove_subscription(
712                         AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
713         }
714         res = AST_VECTOR_REMOVE_ELEM_UNORDERED(&topic->subscribers, sub,
715                 AST_VECTOR_ELEM_CLEANUP_NOOP);
716         ao2_unlock(topic);
717
718         return res;
719 }
720
721 /*!
722  * \internal \brief Dispatch a message to a subscriber asynchronously
723  * \param local \ref ast_taskprocessor_local object
724  * \return 0
725  */
726 static int dispatch_exec_async(struct ast_taskprocessor_local *local)
727 {
728         struct stasis_subscription *sub = local->local_data;
729         struct stasis_message *message = local->data;
730
731         subscription_invoke(sub, message);
732         ao2_cleanup(message);
733
734         return 0;
735 }
736
737 /*!
738  * \internal \brief Data passed to \ref dispatch_exec_sync to synchronize
739  * a published message to a subscriber
740  */
741 struct sync_task_data {
742         ast_mutex_t lock;
743         ast_cond_t cond;
744         int complete;
745         void *task_data;
746 };
747
748 /*!
749  * \internal \brief Dispatch a message to a subscriber synchronously
750  * \param local \ref ast_taskprocessor_local object
751  * \return 0
752  */
753 static int dispatch_exec_sync(struct ast_taskprocessor_local *local)
754 {
755         struct stasis_subscription *sub = local->local_data;
756         struct sync_task_data *std = local->data;
757         struct stasis_message *message = std->task_data;
758
759         subscription_invoke(sub, message);
760         ao2_cleanup(message);
761
762         ast_mutex_lock(&std->lock);
763         std->complete = 1;
764         ast_cond_signal(&std->cond);
765         ast_mutex_unlock(&std->lock);
766
767         return 0;
768 }
769
770 /*!
771  * \internal \brief Dispatch a message to a subscriber
772  * \param sub The subscriber to dispatch to
773  * \param message The message to send
774  * \param synchronous If non-zero, synchronize on the subscriber receiving
775  * the message
776  */
777 static void dispatch_message(struct stasis_subscription *sub,
778         struct stasis_message *message,
779         int synchronous)
780 {
781         if (!sub->mailbox) {
782                 /* Dispatch directly */
783                 subscription_invoke(sub, message);
784                 return;
785         }
786
787         /* Bump the message for the taskprocessor push. This will get de-ref'd
788          * by the task processor callback.
789          */
790         ao2_bump(message);
791         if (!synchronous) {
792                 if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec_async, message)) {
793                         /* Push failed; ugh. */
794                         ast_log(LOG_ERROR, "Dropping async dispatch\n");
795                         ao2_cleanup(message);
796                 }
797         } else {
798                 struct sync_task_data std;
799
800                 ast_mutex_init(&std.lock);
801                 ast_cond_init(&std.cond, NULL);
802                 std.complete = 0;
803                 std.task_data = message;
804
805                 if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec_sync, &std)) {
806                         /* Push failed; ugh. */
807                         ast_log(LOG_ERROR, "Dropping sync dispatch\n");
808                         ao2_cleanup(message);
809                         ast_mutex_destroy(&std.lock);
810                         ast_cond_destroy(&std.cond);
811                         return;
812                 }
813
814                 ast_mutex_lock(&std.lock);
815                 while (!std.complete) {
816                         ast_cond_wait(&std.cond, &std.lock);
817                 }
818                 ast_mutex_unlock(&std.lock);
819
820                 ast_mutex_destroy(&std.lock);
821                 ast_cond_destroy(&std.cond);
822         }
823 }
824
825 /*!
826  * \internal \brief Publish a message to a topic's subscribers
827  * \brief topic The topic to publish to
828  * \brief message The message to publish
829  * \brief sync_sub An optional subscriber of the topic to publish synchronously
830  * to
831  */
832 static void publish_msg(struct stasis_topic *topic,
833         struct stasis_message *message, struct stasis_subscription *sync_sub)
834 {
835         size_t i;
836
837         ast_assert(topic != NULL);
838         ast_assert(message != NULL);
839
840         /*
841          * The topic may be unref'ed by the subscription invocation.
842          * Make sure we hold onto a reference while dispatching.
843          */
844         ao2_ref(topic, +1);
845         ao2_lock(topic);
846         for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
847                 struct stasis_subscription *sub = AST_VECTOR_GET(&topic->subscribers, i);
848
849                 ast_assert(sub != NULL);
850
851                 dispatch_message(sub, message, (sub == sync_sub));
852         }
853         ao2_unlock(topic);
854         ao2_ref(topic, -1);
855 }
856
857 void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
858 {
859         publish_msg(topic, message, NULL);
860 }
861
862 void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message)
863 {
864         ast_assert(sub != NULL);
865
866         publish_msg(sub->topic, message, sub);
867 }
868
869 /*!
870  * \brief Forwarding information
871  *
872  * Any message posted to \a from_topic is forwarded to \a to_topic.
873  *
874  * In cases where both the \a from_topic and \a to_topic need to be locked,
875  * always lock the \a to_topic first, then the \a from_topic. Lest you deadlock.
876  */
877 struct stasis_forward {
878         /*! Originating topic */
879         struct stasis_topic *from_topic;
880         /*! Destination topic */
881         struct stasis_topic *to_topic;
882 };
883
884 static void forward_dtor(void *obj)
885 {
886         struct stasis_forward *forward = obj;
887
888         ao2_cleanup(forward->from_topic);
889         forward->from_topic = NULL;
890         ao2_cleanup(forward->to_topic);
891         forward->to_topic = NULL;
892 }
893
894 struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward)
895 {
896         int idx;
897         struct stasis_topic *from;
898         struct stasis_topic *to;
899
900         if (!forward) {
901                 return NULL;
902         }
903
904         from = forward->from_topic;
905         to = forward->to_topic;
906
907         if (from && to) {
908                 topic_lock_both(to, from);
909                 AST_VECTOR_REMOVE_ELEM_UNORDERED(&to->upstream_topics, from,
910                         AST_VECTOR_ELEM_CLEANUP_NOOP);
911
912                 for (idx = 0; idx < AST_VECTOR_SIZE(&to->subscribers); ++idx) {
913                         topic_remove_subscription(from, AST_VECTOR_GET(&to->subscribers, idx));
914                 }
915                 ao2_unlock(from);
916                 ao2_unlock(to);
917         }
918
919         ao2_cleanup(forward);
920
921         return NULL;
922 }
923
924 struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic,
925         struct stasis_topic *to_topic)
926 {
927         int res;
928         size_t idx;
929         struct stasis_forward *forward;
930
931         if (!from_topic || !to_topic) {
932                 return NULL;
933         }
934
935         forward = ao2_alloc_options(sizeof(*forward), forward_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
936         if (!forward) {
937                 return NULL;
938         }
939
940         /* Forwards to ourselves are implicit. */
941         if (to_topic == from_topic) {
942                 return forward;
943         }
944
945         forward->from_topic = ao2_bump(from_topic);
946         forward->to_topic = ao2_bump(to_topic);
947
948         topic_lock_both(to_topic, from_topic);
949         res = AST_VECTOR_APPEND(&to_topic->upstream_topics, from_topic);
950         if (res != 0) {
951                 ao2_unlock(from_topic);
952                 ao2_unlock(to_topic);
953                 ao2_ref(forward, -1);
954                 return NULL;
955         }
956
957         for (idx = 0; idx < AST_VECTOR_SIZE(&to_topic->subscribers); ++idx) {
958                 topic_add_subscription(from_topic, AST_VECTOR_GET(&to_topic->subscribers, idx));
959         }
960         ao2_unlock(from_topic);
961         ao2_unlock(to_topic);
962
963         return forward;
964 }
965
966 static void subscription_change_dtor(void *obj)
967 {
968         struct stasis_subscription_change *change = obj;
969
970         ast_string_field_free_memory(change);
971         ao2_cleanup(change->topic);
972 }
973
974 static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, const char *uniqueid, const char *description)
975 {
976         struct stasis_subscription_change *change;
977
978         change = ao2_alloc(sizeof(struct stasis_subscription_change), subscription_change_dtor);
979         if (!change || ast_string_field_init(change, 128)) {
980                 ao2_cleanup(change);
981                 return NULL;
982         }
983
984         ast_string_field_set(change, uniqueid, uniqueid);
985         ast_string_field_set(change, description, description);
986         ao2_ref(topic, +1);
987         change->topic = topic;
988
989         return change;
990 }
991
992 static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
993 {
994         struct stasis_subscription_change *change;
995         struct stasis_message *msg;
996
997         /* This assumes that we have already unsubscribed */
998         ast_assert(stasis_subscription_is_subscribed(sub));
999
1000         if (!stasis_subscription_change_type()) {
1001                 return;
1002         }
1003
1004         change = subscription_change_alloc(topic, sub->uniqueid, "Subscribe");
1005         if (!change) {
1006                 return;
1007         }
1008
1009         msg = stasis_message_create(stasis_subscription_change_type(), change);
1010         if (!msg) {
1011                 ao2_cleanup(change);
1012                 return;
1013         }
1014
1015         stasis_publish(topic, msg);
1016         ao2_cleanup(msg);
1017         ao2_cleanup(change);
1018 }
1019
1020 static void send_subscription_unsubscribe(struct stasis_topic *topic,
1021         struct stasis_subscription *sub)
1022 {
1023         struct stasis_subscription_change *change;
1024         struct stasis_message *msg;
1025
1026         /* This assumes that we have already unsubscribed */
1027         ast_assert(!stasis_subscription_is_subscribed(sub));
1028
1029         if (!stasis_subscription_change_type()) {
1030                 return;
1031         }
1032
1033         change = subscription_change_alloc(topic, sub->uniqueid, "Unsubscribe");
1034         if (!change) {
1035                 return;
1036         }
1037
1038         msg = stasis_message_create(stasis_subscription_change_type(), change);
1039         if (!msg) {
1040                 ao2_cleanup(change);
1041                 return;
1042         }
1043
1044         stasis_publish(topic, msg);
1045
1046         /* Now we have to dispatch to the subscription itself */
1047         dispatch_message(sub, msg, 0);
1048
1049         ao2_cleanup(msg);
1050         ao2_cleanup(change);
1051 }
1052
1053 struct topic_pool_entry {
1054         struct stasis_forward *forward;
1055         struct stasis_topic *topic;
1056 };
1057
1058 static void topic_pool_entry_dtor(void *obj)
1059 {
1060         struct topic_pool_entry *entry = obj;
1061
1062         entry->forward = stasis_forward_cancel(entry->forward);
1063         ao2_cleanup(entry->topic);
1064         entry->topic = NULL;
1065 }
1066
1067 static struct topic_pool_entry *topic_pool_entry_alloc(void)
1068 {
1069         return ao2_alloc_options(sizeof(struct topic_pool_entry), topic_pool_entry_dtor,
1070                 AO2_ALLOC_OPT_LOCK_NOLOCK);
1071 }
1072
1073 struct stasis_topic_pool {
1074         struct ao2_container *pool_container;
1075         struct stasis_topic *pool_topic;
1076 };
1077
1078 static void topic_pool_dtor(void *obj)
1079 {
1080         struct stasis_topic_pool *pool = obj;
1081
1082         ao2_cleanup(pool->pool_container);
1083         pool->pool_container = NULL;
1084         ao2_cleanup(pool->pool_topic);
1085         pool->pool_topic = NULL;
1086 }
1087
1088 static int topic_pool_entry_hash(const void *obj, const int flags)
1089 {
1090         const struct topic_pool_entry *object;
1091         const char *key;
1092
1093         switch (flags & OBJ_SEARCH_MASK) {
1094         case OBJ_SEARCH_KEY:
1095                 key = obj;
1096                 break;
1097         case OBJ_SEARCH_OBJECT:
1098                 object = obj;
1099                 key = stasis_topic_name(object->topic);
1100                 break;
1101         default:
1102                 /* Hash can only work on something with a full key. */
1103                 ast_assert(0);
1104                 return 0;
1105         }
1106         return ast_str_case_hash(key);
1107 }
1108
1109 static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
1110 {
1111         const struct topic_pool_entry *object_left = obj;
1112         const struct topic_pool_entry *object_right = arg;
1113         const char *right_key = arg;
1114         int cmp;
1115
1116         switch (flags & OBJ_SEARCH_MASK) {
1117         case OBJ_SEARCH_OBJECT:
1118                 right_key = stasis_topic_name(object_right->topic);
1119                 /* Fall through */
1120         case OBJ_SEARCH_KEY:
1121                 cmp = strcasecmp(stasis_topic_name(object_left->topic), right_key);
1122                 break;
1123         case OBJ_SEARCH_PARTIAL_KEY:
1124                 /* Not supported by container */
1125                 ast_assert(0);
1126                 cmp = -1;
1127                 break;
1128         default:
1129                 /*
1130                  * What arg points to is specific to this traversal callback
1131                  * and has no special meaning to astobj2.
1132                  */
1133                 cmp = 0;
1134                 break;
1135         }
1136         if (cmp) {
1137                 return 0;
1138         }
1139         /*
1140          * At this point the traversal callback is identical to a sorted
1141          * container.
1142          */
1143         return CMP_MATCH;
1144 }
1145
1146 struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic)
1147 {
1148         struct stasis_topic_pool *pool;
1149
1150         pool = ao2_alloc_options(sizeof(*pool), topic_pool_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
1151         if (!pool) {
1152                 return NULL;
1153         }
1154
1155         pool->pool_container = ao2_container_alloc(TOPIC_POOL_BUCKETS,
1156                 topic_pool_entry_hash, topic_pool_entry_cmp);
1157         if (!pool->pool_container) {
1158                 ao2_cleanup(pool);
1159                 return NULL;
1160         }
1161         ao2_ref(pooled_topic, +1);
1162         pool->pool_topic = pooled_topic;
1163
1164         return pool;
1165 }
1166
1167 struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
1168 {
1169         RAII_VAR(struct topic_pool_entry *, topic_pool_entry, NULL, ao2_cleanup);
1170         SCOPED_AO2LOCK(topic_container_lock, pool->pool_container);
1171
1172         topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1173         if (topic_pool_entry) {
1174                 return topic_pool_entry->topic;
1175         }
1176
1177         topic_pool_entry = topic_pool_entry_alloc();
1178         if (!topic_pool_entry) {
1179                 return NULL;
1180         }
1181
1182         topic_pool_entry->topic = stasis_topic_create(topic_name);
1183         if (!topic_pool_entry->topic) {
1184                 return NULL;
1185         }
1186
1187         topic_pool_entry->forward = stasis_forward_all(topic_pool_entry->topic, pool->pool_topic);
1188         if (!topic_pool_entry->forward) {
1189                 return NULL;
1190         }
1191
1192         if (!ao2_link_flags(pool->pool_container, topic_pool_entry, OBJ_NOLOCK)) {
1193                 return NULL;
1194         }
1195
1196         return topic_pool_entry->topic;
1197 }
1198
1199 void stasis_log_bad_type_access(const char *name)
1200 {
1201 #ifdef AST_DEVMODE
1202         ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name);
1203 #endif
1204 }
1205
1206 /*! \brief A multi object blob data structure to carry user event stasis messages */
1207 struct ast_multi_object_blob {
1208         struct ast_json *blob;                             /*< A blob of JSON data */
1209         AST_VECTOR(, void *) snapshots[STASIS_UMOS_MAX];   /*< Vector of snapshots for each type */
1210 };
1211
1212 /*!
1213  * \internal
1214  * \brief Destructor for \ref ast_multi_object_blob objects
1215  */
1216 static void multi_object_blob_dtor(void *obj)
1217 {
1218         struct ast_multi_object_blob *multi = obj;
1219         int type;
1220         int i;
1221
1222         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1223                 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
1224                         ao2_cleanup(AST_VECTOR_GET(&multi->snapshots[type], i));
1225                 }
1226                 AST_VECTOR_FREE(&multi->snapshots[type]);
1227         }
1228         ast_json_unref(multi->blob);
1229 }
1230
1231 /*! \brief Create a stasis user event multi object blob */
1232 struct ast_multi_object_blob *ast_multi_object_blob_create(struct ast_json *blob)
1233 {
1234         int type;
1235         struct ast_multi_object_blob *multi;
1236
1237         ast_assert(blob != NULL);
1238
1239         multi = ao2_alloc(sizeof(*multi), multi_object_blob_dtor);
1240         if (!multi) {
1241                 return NULL;
1242         }
1243
1244         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1245                 if (AST_VECTOR_INIT(&multi->snapshots[type], 0)) {
1246                         ao2_ref(multi, -1);
1247
1248                         return NULL;
1249                 }
1250         }
1251
1252         multi->blob = ast_json_ref(blob);
1253
1254         return multi;
1255 }
1256
1257 /*! \brief Add an object (snapshot) to the blob */
1258 void ast_multi_object_blob_add(struct ast_multi_object_blob *multi,
1259         enum stasis_user_multi_object_snapshot_type type, void *object)
1260 {
1261         if (!multi || !object || AST_VECTOR_APPEND(&multi->snapshots[type], object)) {
1262                 ao2_cleanup(object);
1263         }
1264 }
1265
1266 /*! \brief Publish single channel user event (for app_userevent compatibility) */
1267 void ast_multi_object_blob_single_channel_publish(struct ast_channel *chan,
1268         struct stasis_message_type *type, struct ast_json *blob)
1269 {
1270         struct stasis_message *message;
1271         struct ast_channel_snapshot *channel_snapshot;
1272         struct ast_multi_object_blob *multi;
1273
1274         if (!type) {
1275                 return;
1276         }
1277
1278         multi = ast_multi_object_blob_create(blob);
1279         if (!multi) {
1280                 return;
1281         }
1282
1283         channel_snapshot = ast_channel_snapshot_create(chan);
1284         if (!channel_snapshot) {
1285                 ao2_ref(multi, -1);
1286                 return;
1287         }
1288
1289         /* this call steals the channel_snapshot reference */
1290         ast_multi_object_blob_add(multi, STASIS_UMOS_CHANNEL, channel_snapshot);
1291
1292         message = stasis_message_create(type, multi);
1293         ao2_ref(multi, -1);
1294         if (message) {
1295                 /* app_userevent still publishes to channel */
1296                 stasis_publish(ast_channel_topic(chan), message);
1297                 ao2_ref(message, -1);
1298         }
1299 }
1300
1301 /*! \internal \brief convert multi object blob to ari json */
1302 static struct ast_json *multi_user_event_to_json(
1303         struct stasis_message *message,
1304         const struct stasis_message_sanitizer *sanitize)
1305 {
1306         struct ast_json *out;
1307         struct ast_multi_object_blob *multi = stasis_message_data(message);
1308         struct ast_json *blob = multi->blob;
1309         const struct timeval *tv = stasis_message_timestamp(message);
1310         enum stasis_user_multi_object_snapshot_type type;
1311         int i;
1312
1313         out = ast_json_object_create();
1314         if (!out) {
1315                 return NULL;
1316         }
1317
1318         ast_json_object_set(out, "type", ast_json_string_create("ChannelUserevent"));
1319         ast_json_object_set(out, "timestamp", ast_json_timeval(*tv, NULL));
1320         ast_json_object_set(out, "eventname", ast_json_string_create(ast_json_string_get((ast_json_object_get(blob, "eventname")))));
1321         ast_json_object_set(out, "userevent", ast_json_deep_copy(blob));
1322
1323         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1324                 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
1325                         struct ast_json *json_object = NULL;
1326                         char *name = NULL;
1327                         void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
1328
1329                         switch (type) {
1330                         case STASIS_UMOS_CHANNEL:
1331                                 json_object = ast_channel_snapshot_to_json(snapshot, sanitize);
1332                                 name = "channel";
1333                                 break;
1334                         case STASIS_UMOS_BRIDGE:
1335                                 json_object = ast_bridge_snapshot_to_json(snapshot, sanitize);
1336                                 name = "bridge";
1337                                 break;
1338                         case STASIS_UMOS_ENDPOINT:
1339                                 json_object = ast_endpoint_snapshot_to_json(snapshot, sanitize);
1340                                 name = "endpoint";
1341                                 break;
1342                         }
1343                         if (json_object) {
1344                                 ast_json_object_set(out, name, json_object);
1345                         }
1346                 }
1347         }
1348
1349         return out;
1350 }
1351
1352 /*! \internal \brief convert multi object blob to ami string */
1353 static struct ast_str *multi_object_blob_to_ami(void *obj)
1354 {
1355         struct ast_str *ami_str=ast_str_create(1024);
1356         struct ast_str *ami_snapshot;
1357         const struct ast_multi_object_blob *multi = obj;
1358         enum stasis_user_multi_object_snapshot_type type;
1359         int i;
1360
1361         if (!ami_str) {
1362                 return NULL;
1363         }
1364         if (!multi) {
1365                 ast_free(ami_str);
1366                 return NULL;
1367         }
1368
1369         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1370                 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
1371                         char *name = NULL;
1372                         void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
1373                         ami_snapshot = NULL;
1374
1375                         if (i > 0) {
1376                                 ast_asprintf(&name, "%d", i + 1);
1377                         }
1378
1379                         switch (type) {
1380                         case STASIS_UMOS_CHANNEL:
1381                                 ami_snapshot = ast_manager_build_channel_state_string_prefix(snapshot, name ?: "");
1382                                 break;
1383
1384                         case STASIS_UMOS_BRIDGE:
1385                                 ami_snapshot = ast_manager_build_bridge_state_string_prefix(snapshot, name ?: "");
1386                                 break;
1387
1388                         case STASIS_UMOS_ENDPOINT:
1389                                 /* currently not sending endpoint snapshots to AMI */
1390                                 break;
1391                         }
1392                         if (ami_snapshot) {
1393                                 ast_str_append(&ami_str, 0, "%s", ast_str_buffer(ami_snapshot));
1394                                 ast_free(ami_snapshot);
1395                         }
1396                         ast_free(name);
1397                 }
1398         }
1399
1400         return ami_str;
1401 }
1402
1403 /*! \internal \brief Callback to pass only user defined parameters from blob */
1404 static int userevent_exclusion_cb(const char *key)
1405 {
1406         if (!strcmp("eventname", key)) {
1407                 return 1;
1408         }
1409         return 0;
1410 }
1411
1412 static struct ast_manager_event_blob *multi_user_event_to_ami(
1413         struct stasis_message *message)
1414 {
1415         RAII_VAR(struct ast_str *, object_string, NULL, ast_free);
1416         RAII_VAR(struct ast_str *, body, NULL, ast_free);
1417         struct ast_multi_object_blob *multi = stasis_message_data(message);
1418         const char *eventname;
1419
1420         eventname = ast_json_string_get(ast_json_object_get(multi->blob, "eventname"));
1421         body = ast_manager_str_from_json_object(multi->blob, userevent_exclusion_cb);
1422         object_string = multi_object_blob_to_ami(multi);
1423         if (!object_string || !body) {
1424                 return NULL;
1425         }
1426
1427         return ast_manager_event_blob_create(EVENT_FLAG_USER, "UserEvent",
1428                 "%s"
1429                 "UserEvent: %s\r\n"
1430                 "%s",
1431                 ast_str_buffer(object_string),
1432                 eventname,
1433                 ast_str_buffer(body));
1434 }
1435
1436 /*! \brief A structure to hold global configuration-related options */
1437 struct stasis_declined_config {
1438         /*! The list of message types to decline */
1439         struct ao2_container *declined;
1440 };
1441
1442 /*! \brief Threadpool configuration options */
1443 struct stasis_threadpool_conf {
1444         /*! Initial size of the thread pool */
1445         int initial_size;
1446         /*! Time, in seconds, before we expire a thread */
1447         int idle_timeout_sec;
1448         /*! Maximum number of thread to allow */
1449         int max_size;
1450 };
1451
1452 struct stasis_config {
1453         /*! Thread pool configuration options */
1454         struct stasis_threadpool_conf *threadpool_options;
1455         /*! Declined message types */
1456         struct stasis_declined_config *declined_message_types;
1457 };
1458
1459 static struct aco_type threadpool_option = {
1460         .type = ACO_GLOBAL,
1461         .name = "threadpool",
1462         .item_offset = offsetof(struct stasis_config, threadpool_options),
1463         .category = "threadpool",
1464         .category_match = ACO_WHITELIST_EXACT,
1465 };
1466
1467 static struct aco_type *threadpool_options[] = ACO_TYPES(&threadpool_option);
1468
1469 /*! \brief An aco_type structure to link the "declined_message_types" category to the stasis_declined_config type */
1470 static struct aco_type declined_option = {
1471         .type = ACO_GLOBAL,
1472         .name = "declined_message_types",
1473         .item_offset = offsetof(struct stasis_config, declined_message_types),
1474         .category_match = ACO_WHITELIST_EXACT,
1475         .category = "declined_message_types",
1476 };
1477
1478 struct aco_type *declined_options[] = ACO_TYPES(&declined_option);
1479
1480 struct aco_file stasis_conf = {
1481         .filename = "stasis.conf",
1482         .types = ACO_TYPES(&declined_option, &threadpool_option),
1483 };
1484
1485 /*! \brief A global object container that will contain the stasis_config that gets swapped out on reloads */
1486 static AO2_GLOBAL_OBJ_STATIC(globals);
1487
1488 static void *stasis_config_alloc(void);
1489
1490 /*! \brief Register information about the configs being processed by this module */
1491 CONFIG_INFO_CORE("stasis", cfg_info, globals, stasis_config_alloc,
1492         .files = ACO_FILES(&stasis_conf),
1493 );
1494
1495 static void stasis_declined_config_destructor(void *obj)
1496 {
1497         struct stasis_declined_config *declined = obj;
1498
1499         ao2_cleanup(declined->declined);
1500 }
1501
1502 static void stasis_config_destructor(void *obj)
1503 {
1504         struct stasis_config *cfg = obj;
1505
1506         ao2_cleanup(cfg->declined_message_types);
1507         ast_free(cfg->threadpool_options);
1508 }
1509
1510 static void *stasis_config_alloc(void)
1511 {
1512         struct stasis_config *cfg;
1513
1514         if (!(cfg = ao2_alloc(sizeof(*cfg), stasis_config_destructor))) {
1515                 return NULL;
1516         }
1517
1518         cfg->threadpool_options = ast_calloc(1, sizeof(*cfg->threadpool_options));
1519         if (!cfg->threadpool_options) {
1520                 ao2_ref(cfg, -1);
1521                 return NULL;
1522         }
1523
1524         cfg->declined_message_types = ao2_alloc(sizeof(*cfg->declined_message_types),
1525                 stasis_declined_config_destructor);
1526         if (!cfg->declined_message_types) {
1527                 ao2_ref(cfg, -1);
1528                 return NULL;
1529         }
1530
1531         cfg->declined_message_types->declined = ast_str_container_alloc(13);
1532         if (!cfg->declined_message_types->declined) {
1533                 ao2_ref(cfg, -1);
1534                 return NULL;
1535         }
1536
1537         return cfg;
1538 }
1539
1540 int stasis_message_type_declined(const char *name)
1541 {
1542         struct stasis_config *cfg = ao2_global_obj_ref(globals);
1543         char *name_in_declined;
1544         int res;
1545
1546         if (!cfg || !cfg->declined_message_types) {
1547                 ao2_cleanup(cfg);
1548                 return 0;
1549         }
1550
1551         name_in_declined = ao2_find(cfg->declined_message_types->declined, name, OBJ_SEARCH_KEY);
1552         res = name_in_declined ? 1 : 0;
1553         ao2_cleanup(name_in_declined);
1554         ao2_ref(cfg, -1);
1555         if (res) {
1556                 ast_log(LOG_NOTICE, "Declining to allocate Stasis message type '%s' due to configuration\n", name);
1557         }
1558         return res;
1559 }
1560
1561 static int declined_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
1562 {
1563         struct stasis_declined_config *declined = obj;
1564
1565         if (ast_strlen_zero(var->value)) {
1566                 return 0;
1567         }
1568
1569         if (ast_str_container_add(declined->declined, var->value)) {
1570                 return -1;
1571         }
1572
1573         return 0;
1574 }
1575
1576 /*!
1577  * @{ \brief Define multi user event message type(s).
1578  */
1579
1580 STASIS_MESSAGE_TYPE_DEFN(ast_multi_user_event_type,
1581         .to_json = multi_user_event_to_json,
1582         .to_ami = multi_user_event_to_ami,
1583         );
1584
1585 /*! @} */
1586
1587 /*! \brief Cleanup function for graceful shutdowns */
1588 static void stasis_cleanup(void)
1589 {
1590         ast_threadpool_shutdown(pool);
1591         pool = NULL;
1592         STASIS_MESSAGE_TYPE_CLEANUP(stasis_subscription_change_type);
1593         STASIS_MESSAGE_TYPE_CLEANUP(ast_multi_user_event_type);
1594         aco_info_destroy(&cfg_info);
1595         ao2_global_obj_release(globals);
1596 }
1597
1598 int stasis_init(void)
1599 {
1600         struct stasis_config *cfg;
1601         int cache_init;
1602         struct ast_threadpool_options threadpool_opts = { 0, };
1603
1604         /* Be sure the types are cleaned up after the message bus */
1605         ast_register_cleanup(stasis_cleanup);
1606
1607         if (aco_info_init(&cfg_info)) {
1608                 return -1;
1609         }
1610
1611         aco_option_register_custom(&cfg_info, "decline", ACO_EXACT,
1612                 declined_options, "", declined_handler, 0);
1613         aco_option_register(&cfg_info, "initial_size", ACO_EXACT,
1614                 threadpool_options, "5", OPT_INT_T, PARSE_IN_RANGE,
1615                 FLDSET(struct stasis_threadpool_conf, initial_size), 0,
1616                 INT_MAX);
1617         aco_option_register(&cfg_info, "idle_timeout_sec", ACO_EXACT,
1618                 threadpool_options, "20", OPT_INT_T, PARSE_IN_RANGE,
1619                 FLDSET(struct stasis_threadpool_conf, idle_timeout_sec), 0,
1620                 INT_MAX);
1621         aco_option_register(&cfg_info, "max_size", ACO_EXACT,
1622                 threadpool_options, "50", OPT_INT_T, PARSE_IN_RANGE,
1623                 FLDSET(struct stasis_threadpool_conf, max_size), 0,
1624                 INT_MAX);
1625
1626         if (aco_process_config(&cfg_info, 0) == ACO_PROCESS_ERROR) {
1627                 struct stasis_config *default_cfg = stasis_config_alloc();
1628
1629                 if (!default_cfg) {
1630                         return -1;
1631                 }
1632
1633                 if (aco_set_defaults(&threadpool_option, "threadpool", default_cfg->threadpool_options)) {
1634                         ast_log(LOG_ERROR, "Failed to initialize defaults on Stasis configuration object\n");
1635                         ao2_ref(default_cfg, -1);
1636
1637                         return -1;
1638                 }
1639
1640                 if (aco_set_defaults(&declined_option, "declined_message_types", default_cfg->declined_message_types)) {
1641                         ast_log(LOG_ERROR, "Failed to load stasis.conf and failed to initialize defaults.\n");
1642                         ao2_ref(default_cfg, -1);
1643
1644                         return -1;
1645                 }
1646
1647                 ast_log(LOG_NOTICE, "Could not load Stasis configuration; using defaults\n");
1648                 ao2_global_obj_replace_unref(globals, default_cfg);
1649                 cfg = default_cfg;
1650         } else {
1651                 cfg = ao2_global_obj_ref(globals);
1652                 if (!cfg) {
1653                         ast_log(LOG_ERROR, "Failed to obtain Stasis configuration object\n");
1654
1655                         return -1;
1656                 }
1657         }
1658
1659         threadpool_opts.version = AST_THREADPOOL_OPTIONS_VERSION;
1660         threadpool_opts.initial_size = cfg->threadpool_options->initial_size;
1661         threadpool_opts.auto_increment = 1;
1662         threadpool_opts.max_size = cfg->threadpool_options->max_size;
1663         threadpool_opts.idle_timeout = cfg->threadpool_options->idle_timeout_sec;
1664         pool = ast_threadpool_create("stasis-core", NULL, &threadpool_opts);
1665         ao2_ref(cfg, -1);
1666         if (!pool) {
1667                 ast_log(LOG_ERROR, "Failed to create 'stasis-core' threadpool\n");
1668
1669                 return -1;
1670         }
1671
1672         cache_init = stasis_cache_init();
1673         if (cache_init != 0) {
1674                 return -1;
1675         }
1676
1677         if (STASIS_MESSAGE_TYPE_INIT(stasis_subscription_change_type) != 0) {
1678                 return -1;
1679         }
1680         if (STASIS_MESSAGE_TYPE_INIT(ast_multi_user_event_type) != 0) {
1681                 return -1;
1682         }
1683
1684         return 0;
1685 }