devicestate: Don't create topic when change isn't cached.
[asterisk/asterisk.git] / main / stasis.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis Message Bus API.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 #include "asterisk/astobj2.h"
33 #include "asterisk/stasis_internal.h"
34 #include "asterisk/stasis.h"
35 #include "asterisk/taskprocessor.h"
36 #include "asterisk/threadpool.h"
37 #include "asterisk/utils.h"
38 #include "asterisk/uuid.h"
39 #include "asterisk/vector.h"
40 #include "asterisk/stasis_channels.h"
41 #include "asterisk/stasis_bridges.h"
42 #include "asterisk/stasis_endpoints.h"
43 #include "asterisk/config_options.h"
44
45 /*** DOCUMENTATION
46         <managerEvent language="en_US" name="UserEvent">
47                 <managerEventInstance class="EVENT_FLAG_USER">
48                         <synopsis>A user defined event raised from the dialplan.</synopsis>
49                         <syntax>
50                                 <channel_snapshot/>
51                                 <parameter name="UserEvent">
52                                         <para>The event name, as specified in the dialplan.</para>
53                                 </parameter>
54                         </syntax>
55                         <description>
56                                 <para>Event may contain additional arbitrary parameters in addition to optional bridge and endpoint snapshots.  Multiple snapshots of the same type are prefixed with a numeric value.</para>
57                         </description>
58                         <see-also>
59                                 <ref type="application">UserEvent</ref>
60                                 <ref type="managerEvent">UserEvent</ref>
61                         </see-also>
62                 </managerEventInstance>
63         </managerEvent>
64         <configInfo name="stasis" language="en_US">
65                 <configFile name="stasis.conf">
66                         <configObject name="threadpool">
67                                 <synopsis>Settings that configure the threadpool Stasis uses to deliver some messages.</synopsis>
68                                 <configOption name="initial_size" default="5">
69                                         <synopsis>Initial number of threads in the message bus threadpool.</synopsis>
70                                 </configOption>
71                                 <configOption name="idle_timeout_sec" default="20">
72                                         <synopsis>Number of seconds before an idle thread is disposed of.</synopsis>
73                                 </configOption>
74                                 <configOption name="max_size" default="50">
75                                         <synopsis>Maximum number of threads in the threadpool.</synopsis>
76                                 </configOption>
77                         </configObject>
78                         <configObject name="declined_message_types">
79                                 <synopsis>Stasis message types for which to decline creation.</synopsis>
80                                 <configOption name="decline">
81                                         <synopsis>The message type to decline.</synopsis>
82                                         <description>
83                                                 <para>This configuration option defines the name of the Stasis
84                                                 message type that Asterisk is forbidden from creating and can be
85                                                 specified as many times as necessary to achieve the desired result.</para>
86                                                 <enumlist>
87                                                         <enum name="stasis_app_recording_snapshot_type" />
88                                                         <enum name="stasis_app_playback_snapshot_type" />
89                                                         <enum name="stasis_test_message_type" />
90                                                         <enum name="confbridge_start_type" />
91                                                         <enum name="confbridge_end_type" />
92                                                         <enum name="confbridge_join_type" />
93                                                         <enum name="confbridge_leave_type" />
94                                                         <enum name="confbridge_start_record_type" />
95                                                         <enum name="confbridge_stop_record_type" />
96                                                         <enum name="confbridge_mute_type" />
97                                                         <enum name="confbridge_unmute_type" />
98                                                         <enum name="confbridge_talking_type" />
99                                                         <enum name="cel_generic_type" />
100                                                         <enum name="ast_bridge_snapshot_type" />
101                                                         <enum name="ast_bridge_merge_message_type" />
102                                                         <enum name="ast_channel_entered_bridge_type" />
103                                                         <enum name="ast_channel_left_bridge_type" />
104                                                         <enum name="ast_blind_transfer_type" />
105                                                         <enum name="ast_attended_transfer_type" />
106                                                         <enum name="ast_endpoint_snapshot_type" />
107                                                         <enum name="ast_endpoint_state_type" />
108                                                         <enum name="ast_device_state_message_type" />
109                                                         <enum name="ast_test_suite_message_type" />
110                                                         <enum name="ast_mwi_state_type" />
111                                                         <enum name="ast_mwi_vm_app_type" />
112                                                         <enum name="ast_format_register_type" />
113                                                         <enum name="ast_format_unregister_type" />
114                                                         <enum name="ast_manager_get_generic_type" />
115                                                         <enum name="ast_parked_call_type" />
116                                                         <enum name="ast_channel_snapshot_type" />
117                                                         <enum name="ast_channel_dial_type" />
118                                                         <enum name="ast_channel_varset_type" />
119                                                         <enum name="ast_channel_hangup_request_type" />
120                                                         <enum name="ast_channel_dtmf_begin_type" />
121                                                         <enum name="ast_channel_dtmf_end_type" />
122                                                         <enum name="ast_channel_hold_type" />
123                                                         <enum name="ast_channel_unhold_type" />
124                                                         <enum name="ast_channel_chanspy_start_type" />
125                                                         <enum name="ast_channel_chanspy_stop_type" />
126                                                         <enum name="ast_channel_fax_type" />
127                                                         <enum name="ast_channel_hangup_handler_type" />
128                                                         <enum name="ast_channel_moh_start_type" />
129                                                         <enum name="ast_channel_moh_stop_type" />
130                                                         <enum name="ast_channel_monitor_start_type" />
131                                                         <enum name="ast_channel_monitor_stop_type" />
132                                                         <enum name="ast_channel_agent_login_type" />
133                                                         <enum name="ast_channel_agent_logoff_type" />
134                                                         <enum name="ast_channel_talking_start" />
135                                                         <enum name="ast_channel_talking_stop" />
136                                                         <enum name="ast_security_event_type" />
137                                                         <enum name="ast_named_acl_change_type" />
138                                                         <enum name="ast_local_bridge_type" />
139                                                         <enum name="ast_local_optimization_begin_type" />
140                                                         <enum name="ast_local_optimization_end_type" />
141                                                         <enum name="stasis_subscription_change_type" />
142                                                         <enum name="ast_multi_user_event_type" />
143                                                         <enum name="stasis_cache_clear_type" />
144                                                         <enum name="stasis_cache_update_type" />
145                                                         <enum name="ast_network_change_type" />
146                                                         <enum name="ast_system_registry_type" />
147                                                         <enum name="ast_cc_available_type" />
148                                                         <enum name="ast_cc_offertimerstart_type" />
149                                                         <enum name="ast_cc_requested_type" />
150                                                         <enum name="ast_cc_requestacknowledged_type" />
151                                                         <enum name="ast_cc_callerstopmonitoring_type" />
152                                                         <enum name="ast_cc_callerstartmonitoring_type" />
153                                                         <enum name="ast_cc_callerrecalling_type" />
154                                                         <enum name="ast_cc_recallcomplete_type" />
155                                                         <enum name="ast_cc_failure_type" />
156                                                         <enum name="ast_cc_monitorfailed_type" />
157                                                         <enum name="ast_presence_state_message_type" />
158                                                         <enum name="ast_rtp_rtcp_sent_type" />
159                                                         <enum name="ast_rtp_rtcp_received_type" />
160                                                         <enum name="ast_call_pickup_type" />
161                                                         <enum name="aoc_s_type" />
162                                                         <enum name="aoc_d_type" />
163                                                         <enum name="aoc_e_type" />
164                                                         <enum name="dahdichannel_type" />
165                                                         <enum name="mcid_type" />
166                                                         <enum name="session_timeout_type" />
167                                                         <enum name="cdr_read_message_type" />
168                                                         <enum name="cdr_write_message_type" />
169                                                         <enum name="cdr_prop_write_message_type" />
170                                                         <enum name="corosync_ping_message_type" />
171                                                         <enum name="agi_exec_start_type" />
172                                                         <enum name="agi_exec_end_type" />
173                                                         <enum name="agi_async_start_type" />
174                                                         <enum name="agi_async_exec_type" />
175                                                         <enum name="agi_async_end_type" />
176                                                         <enum name="queue_caller_join_type" />
177                                                         <enum name="queue_caller_leave_type" />
178                                                         <enum name="queue_caller_abandon_type" />
179                                                         <enum name="queue_member_status_type" />
180                                                         <enum name="queue_member_added_type" />
181                                                         <enum name="queue_member_removed_type" />
182                                                         <enum name="queue_member_pause_type" />
183                                                         <enum name="queue_member_penalty_type" />
184                                                         <enum name="queue_member_ringinuse_type" />
185                                                         <enum name="queue_agent_called_type" />
186                                                         <enum name="queue_agent_connect_type" />
187                                                         <enum name="queue_agent_complete_type" />
188                                                         <enum name="queue_agent_dump_type" />
189                                                         <enum name="queue_agent_ringnoanswer_type" />
190                                                         <enum name="meetme_join_type" />
191                                                         <enum name="meetme_leave_type" />
192                                                         <enum name="meetme_end_type" />
193                                                         <enum name="meetme_mute_type" />
194                                                         <enum name="meetme_talking_type" />
195                                                         <enum name="meetme_talk_request_type" />
196                                                         <enum name="appcdr_message_type" />
197                                                         <enum name="forkcdr_message_type" />
198                                                         <enum name="cdr_sync_message_type" />
199                                                 </enumlist>
200                                         </description>
201                                 </configOption>
202                         </configObject>
203                 </configFile>
204         </configInfo>
205 ***/
206
207 /*!
208  * \page stasis-impl Stasis Implementation Notes
209  *
210  * \par Reference counting
211  *
212  * Stasis introduces a number of objects, which are tightly related to one
213  * another. Because we rely on ref-counting for memory management, understanding
214  * these relationships is important to understanding this code.
215  *
216  * \code{.txt}
217  *
218  *   stasis_topic <----> stasis_subscription
219  *             ^          ^
220  *              \        /
221  *               \      /
222  *               dispatch
223  *                  |
224  *                  |
225  *                  v
226  *            stasis_message
227  *                  |
228  *                  |
229  *                  v
230  *          stasis_message_type
231  *
232  * \endcode
233  *
234  * The most troubling thing in this chart is the cyclic reference between
235  * stasis_topic and stasis_subscription. This is both unfortunate, and
236  * necessary. Topics need the subscription in order to dispatch messages;
237  * subscriptions need the topics to unsubscribe and check subscription status.
238  *
239  * The cycle is broken by stasis_unsubscribe(). The unsubscribe will remove the
240  * topic's reference to a subscription. When the subcription is destroyed, it
241  * will remove its reference to the topic.
242  *
243  * This means that until a subscription has be explicitly unsubscribed, it will
244  * not be destroyed. Neither will a topic be destroyed while it has subscribers.
245  * The destructors of both have assertions regarding this to catch ref-counting
246  * problems where a subscription or topic has had an extra ao2_cleanup().
247  *
248  * The \ref dispatch object is a transient object, which is posted to a
249  * subscription's taskprocessor to send a message to the subscriber. They have
250  * short life cycles, allocated on one thread, destroyed on another.
251  *
252  * During shutdown, or the deletion of a domain object, there are a flurry of
253  * ao2_cleanup()s on subscriptions and topics, as the final in-flight messages
254  * are processed. Any one of these cleanups could be the one to actually destroy
255  * a given object, so care must be taken to ensure that an object isn't
256  * referenced after an ao2_cleanup(). This includes the implicit ao2_unlock()
257  * that might happen when a RAII_VAR() goes out of scope.
258  *
259  * \par Typical life cycles
260  *
261  *  \li stasis_topic - There are several topics which live for the duration of
262  *      the Asterisk process (ast_channel_topic_all(), etc.) but most of these
263  *      are actually fed by shorter-lived topics whose lifetime is associated
264  *      with some domain object (like ast_channel_topic() for a given
265  *      ast_channel).
266  *
267  *  \li stasis_subscription - Subscriptions have a similar mix of lifetimes as
268  *      topics, for similar reasons.
269  *
270  *  \li dispatch - Very short lived; just long enough to post a message to a
271  *      subscriber.
272  *
273  *  \li stasis_message - Short to intermediate lifetimes, but that is mostly
274  *      irrelevant. Messages are strictly data and have no behavior associated
275  *      with them, so it doesn't really matter if/when they are destroyed. By
276  *      design, a component could hold a ref to a message forever without any
277  *      ill consequences (aside from consuming more memory).
278  *
279  *  \li stasis_message_type - Long life cycles, typically only destroyed on
280  *      module unloading or _clean_ process exit.
281  *
282  * \par Subscriber shutdown sequencing
283  *
284  * Subscribers are sensitive to shutdown sequencing, specifically in how the
285  * reference message types. This is fully detailed on the wiki at
286  * https://wiki.asterisk.org/wiki/x/K4BqAQ.
287  *
288  * In short, the lifetime of the \a data (and \a callback, if in a module) must
289  * be held until the stasis_subscription_final_message() has been received.
290  * Depending on the structure of the subscriber code, this can be handled by
291  * using stasis_subscription_final_message() to free resources on the final
292  * message, or using stasis_subscription_join()/stasis_unsubscribe_and_join() to
293  * block until the unsubscribe has completed.
294  */
295
296 /*! Initial size of the subscribers list. */
297 #define INITIAL_SUBSCRIBERS_MAX 4
298
299 /*! The number of buckets to use for topic pools */
300 #define TOPIC_POOL_BUCKETS 57
301
302 /*! Thread pool for topics that don't want a dedicated taskprocessor */
303 static struct ast_threadpool *pool;
304
305 STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type);
306
307 /*! \internal */
308 struct stasis_topic {
309         char *name;
310         /*! Variable length array of the subscribers */
311         AST_VECTOR(, struct stasis_subscription *) subscribers;
312
313         /*! Topics forwarding into this topic */
314         AST_VECTOR(, struct stasis_topic *) upstream_topics;
315 };
316
317 /* Forward declarations for the tightly-coupled subscription object */
318 static int topic_add_subscription(struct stasis_topic *topic,
319         struct stasis_subscription *sub);
320
321 static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
322
323 /*! \brief Lock two topics. */
324 #define topic_lock_both(topic1, topic2) \
325         do { \
326                 ao2_lock(topic1); \
327                 while (ao2_trylock(topic2)) { \
328                         AO2_DEADLOCK_AVOIDANCE(topic1); \
329                 } \
330         } while (0)
331
332 static void topic_dtor(void *obj)
333 {
334         struct stasis_topic *topic = obj;
335
336         /* Subscribers hold a reference to topics, so they should all be
337          * unsubscribed before we get here. */
338         ast_assert(AST_VECTOR_SIZE(&topic->subscribers) == 0);
339
340         ast_free(topic->name);
341         topic->name = NULL;
342
343         AST_VECTOR_FREE(&topic->subscribers);
344         AST_VECTOR_FREE(&topic->upstream_topics);
345 }
346
347 struct stasis_topic *stasis_topic_create(const char *name)
348 {
349         struct stasis_topic *topic;
350         int res = 0;
351
352         topic = ao2_t_alloc(sizeof(*topic), topic_dtor, name);
353         if (!topic) {
354                 return NULL;
355         }
356
357         topic->name = ast_strdup(name);
358         res |= AST_VECTOR_INIT(&topic->subscribers, INITIAL_SUBSCRIBERS_MAX);
359         res |= AST_VECTOR_INIT(&topic->upstream_topics, 0);
360         if (!topic->name || res) {
361                 ao2_cleanup(topic);
362                 return NULL;
363         }
364
365         return topic;
366 }
367
368 const char *stasis_topic_name(const struct stasis_topic *topic)
369 {
370         return topic->name;
371 }
372
373 /*! \internal */
374 struct stasis_subscription {
375         /*! Unique ID for this subscription */
376         char uniqueid[AST_UUID_STR_LEN];
377         /*! Topic subscribed to. */
378         struct stasis_topic *topic;
379         /*! Mailbox for processing incoming messages. */
380         struct ast_taskprocessor *mailbox;
381         /*! Callback function for incoming message processing. */
382         stasis_subscription_cb callback;
383         /*! Data pointer to be handed to the callback. */
384         void *data;
385
386         /*! Condition for joining with subscription. */
387         ast_cond_t join_cond;
388         /*! Flag set when final message for sub has been received.
389          *  Be sure join_lock is held before reading/setting. */
390         int final_message_rxed;
391         /*! Flag set when final message for sub has been processed.
392          *  Be sure join_lock is held before reading/setting. */
393         int final_message_processed;
394 };
395
396 static void subscription_dtor(void *obj)
397 {
398         struct stasis_subscription *sub = obj;
399
400         /* Subscriptions need to be manually unsubscribed before destruction
401          * b/c there's a cyclic reference between topics and subscriptions */
402         ast_assert(!stasis_subscription_is_subscribed(sub));
403         /* If there are any messages in flight to this subscription; that would
404          * be bad. */
405         ast_assert(stasis_subscription_is_done(sub));
406
407         ao2_cleanup(sub->topic);
408         sub->topic = NULL;
409         ast_taskprocessor_unreference(sub->mailbox);
410         sub->mailbox = NULL;
411         ast_cond_destroy(&sub->join_cond);
412 }
413
414 /*!
415  * \brief Invoke the subscription's callback.
416  * \param sub Subscription to invoke.
417  * \param topic Topic message was published to.
418  * \param message Message to send.
419  */
420 static void subscription_invoke(struct stasis_subscription *sub,
421                                   struct stasis_message *message)
422 {
423         /* Notify that the final message has been received */
424         if (stasis_subscription_final_message(sub, message)) {
425                 ao2_lock(sub);
426                 sub->final_message_rxed = 1;
427                 ast_cond_signal(&sub->join_cond);
428                 ao2_unlock(sub);
429         }
430
431         /* Since sub is mostly immutable, no need to lock sub */
432         sub->callback(sub->data, sub, message);
433
434         /* Notify that the final message has been processed */
435         if (stasis_subscription_final_message(sub, message)) {
436                 ao2_lock(sub);
437                 sub->final_message_processed = 1;
438                 ast_cond_signal(&sub->join_cond);
439                 ao2_unlock(sub);
440         }
441 }
442
443 static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
444 static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
445
446 void stasis_subscription_cb_noop(void *data, struct stasis_subscription *sub, struct stasis_message *message)
447 {
448 }
449
450 struct stasis_subscription *internal_stasis_subscribe(
451         struct stasis_topic *topic,
452         stasis_subscription_cb callback,
453         void *data,
454         int needs_mailbox,
455         int use_thread_pool)
456 {
457         struct stasis_subscription *sub;
458
459         if (!topic) {
460                 return NULL;
461         }
462
463         /* The ao2 lock is used for join_cond. */
464         sub = ao2_t_alloc(sizeof(*sub), subscription_dtor, stasis_topic_name(topic));
465         if (!sub) {
466                 return NULL;
467         }
468         ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid));
469
470         if (needs_mailbox) {
471                 char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
472
473                 /* Create name with seq number appended. */
474                 ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "sub%c:%s",
475                         use_thread_pool ? 'p' : 'm',
476                         stasis_topic_name(topic));
477
478                 /*
479                  * With a small number of subscribers, a thread-per-sub is
480                  * acceptable. For a large number of subscribers, a thread
481                  * pool should be used.
482                  */
483                 if (use_thread_pool) {
484                         sub->mailbox = ast_threadpool_serializer(tps_name, pool);
485                 } else {
486                         sub->mailbox = ast_taskprocessor_get(tps_name, TPS_REF_DEFAULT);
487                 }
488                 if (!sub->mailbox) {
489                         ao2_ref(sub, -1);
490
491                         return NULL;
492                 }
493                 ast_taskprocessor_set_local(sub->mailbox, sub);
494                 /* Taskprocessor has a reference */
495                 ao2_ref(sub, +1);
496         }
497
498         ao2_ref(topic, +1);
499         sub->topic = topic;
500         sub->callback = callback;
501         sub->data = data;
502         ast_cond_init(&sub->join_cond, NULL);
503
504         if (topic_add_subscription(topic, sub) != 0) {
505                 ao2_ref(sub, -1);
506
507                 return NULL;
508         }
509         send_subscription_subscribe(topic, sub);
510
511         return sub;
512 }
513
514 struct stasis_subscription *stasis_subscribe(
515         struct stasis_topic *topic,
516         stasis_subscription_cb callback,
517         void *data)
518 {
519         return internal_stasis_subscribe(topic, callback, data, 1, 0);
520 }
521
522 struct stasis_subscription *stasis_subscribe_pool(
523         struct stasis_topic *topic,
524         stasis_subscription_cb callback,
525         void *data)
526 {
527         return internal_stasis_subscribe(topic, callback, data, 1, 1);
528 }
529
530 static int sub_cleanup(void *data)
531 {
532         struct stasis_subscription *sub = data;
533         ao2_cleanup(sub);
534         return 0;
535 }
536
537 struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
538 {
539         /* The subscription may be the last ref to this topic. Hold
540          * the topic ref open until after the unlock. */
541         struct stasis_topic *topic;
542
543         if (!sub) {
544                 return NULL;
545         }
546
547         topic = ao2_bump(sub->topic);
548
549         /* We have to remove the subscription first, to ensure the unsubscribe
550          * is the final message */
551         if (topic_remove_subscription(sub->topic, sub) != 0) {
552                 ast_log(LOG_ERROR,
553                         "Internal error: subscription has invalid topic\n");
554                 ao2_cleanup(topic);
555
556                 return NULL;
557         }
558
559         /* Now let everyone know about the unsubscribe */
560         send_subscription_unsubscribe(topic, sub);
561
562         /* When all that's done, remove the ref the mailbox has on the sub */
563         if (sub->mailbox) {
564                 ast_taskprocessor_push(sub->mailbox, sub_cleanup, sub);
565         }
566
567         /* Unsubscribing unrefs the subscription */
568         ao2_cleanup(sub);
569         ao2_cleanup(topic);
570
571         return NULL;
572 }
573
574 int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscription,
575         long low_water, long high_water)
576 {
577         int res = -1;
578
579         if (subscription) {
580                 res = ast_taskprocessor_alert_set_levels(subscription->mailbox,
581                         low_water, high_water);
582         }
583         return res;
584 }
585
586 void stasis_subscription_join(struct stasis_subscription *subscription)
587 {
588         if (subscription) {
589                 ao2_lock(subscription);
590                 /* Wait until the processed flag has been set */
591                 while (!subscription->final_message_processed) {
592                         ast_cond_wait(&subscription->join_cond,
593                                 ao2_object_get_lockaddr(subscription));
594                 }
595                 ao2_unlock(subscription);
596         }
597 }
598
599 int stasis_subscription_is_done(struct stasis_subscription *subscription)
600 {
601         if (subscription) {
602                 int ret;
603
604                 ao2_lock(subscription);
605                 ret = subscription->final_message_rxed;
606                 ao2_unlock(subscription);
607
608                 return ret;
609         }
610
611         /* Null subscription is about as done as you can get */
612         return 1;
613 }
614
615 struct stasis_subscription *stasis_unsubscribe_and_join(
616         struct stasis_subscription *subscription)
617 {
618         if (!subscription) {
619                 return NULL;
620         }
621
622         /* Bump refcount to hold it past the unsubscribe */
623         ao2_ref(subscription, +1);
624         stasis_unsubscribe(subscription);
625         stasis_subscription_join(subscription);
626         /* Now decrement the refcount back */
627         ao2_cleanup(subscription);
628         return NULL;
629 }
630
631 int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
632 {
633         if (sub) {
634                 size_t i;
635                 struct stasis_topic *topic = sub->topic;
636
637                 ao2_lock(topic);
638                 for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
639                         if (AST_VECTOR_GET(&topic->subscribers, i) == sub) {
640                                 ao2_unlock(topic);
641                                 return 1;
642                         }
643                 }
644                 ao2_unlock(topic);
645         }
646
647         return 0;
648 }
649
650 const char *stasis_subscription_uniqueid(const struct stasis_subscription *sub)
651 {
652         return sub->uniqueid;
653 }
654
655 int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
656 {
657         struct stasis_subscription_change *change;
658
659         if (stasis_message_type(msg) != stasis_subscription_change_type()) {
660                 return 0;
661         }
662
663         change = stasis_message_data(msg);
664         if (strcmp("Unsubscribe", change->description)) {
665                 return 0;
666         }
667
668         if (strcmp(stasis_subscription_uniqueid(sub), change->uniqueid)) {
669                 return 0;
670         }
671
672         return 1;
673 }
674
675 /*!
676  * \brief Add a subscriber to a topic.
677  * \param topic Topic
678  * \param sub Subscriber
679  * \return 0 on success
680  * \return Non-zero on error
681  */
682 static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
683 {
684         size_t idx;
685
686         ao2_lock(topic);
687         /* The reference from the topic to the subscription is shared with
688          * the owner of the subscription, which will explicitly unsubscribe
689          * to release it.
690          *
691          * If we bumped the refcount here, the owner would have to unsubscribe
692          * and cleanup, which is a bit awkward. */
693         AST_VECTOR_APPEND(&topic->subscribers, sub);
694
695         for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
696                 topic_add_subscription(
697                         AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
698         }
699         ao2_unlock(topic);
700
701         return 0;
702 }
703
704 static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
705 {
706         size_t idx;
707         int res;
708
709         ao2_lock(topic);
710         for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
711                 topic_remove_subscription(
712                         AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
713         }
714         res = AST_VECTOR_REMOVE_ELEM_UNORDERED(&topic->subscribers, sub,
715                 AST_VECTOR_ELEM_CLEANUP_NOOP);
716         ao2_unlock(topic);
717
718         return res;
719 }
720
721 /*!
722  * \internal \brief Dispatch a message to a subscriber asynchronously
723  * \param local \ref ast_taskprocessor_local object
724  * \return 0
725  */
726 static int dispatch_exec_async(struct ast_taskprocessor_local *local)
727 {
728         struct stasis_subscription *sub = local->local_data;
729         struct stasis_message *message = local->data;
730
731         subscription_invoke(sub, message);
732         ao2_cleanup(message);
733
734         return 0;
735 }
736
737 /*!
738  * \internal \brief Data passed to \ref dispatch_exec_sync to synchronize
739  * a published message to a subscriber
740  */
741 struct sync_task_data {
742         ast_mutex_t lock;
743         ast_cond_t cond;
744         int complete;
745         void *task_data;
746 };
747
748 /*!
749  * \internal \brief Dispatch a message to a subscriber synchronously
750  * \param local \ref ast_taskprocessor_local object
751  * \return 0
752  */
753 static int dispatch_exec_sync(struct ast_taskprocessor_local *local)
754 {
755         struct stasis_subscription *sub = local->local_data;
756         struct sync_task_data *std = local->data;
757         struct stasis_message *message = std->task_data;
758
759         subscription_invoke(sub, message);
760         ao2_cleanup(message);
761
762         ast_mutex_lock(&std->lock);
763         std->complete = 1;
764         ast_cond_signal(&std->cond);
765         ast_mutex_unlock(&std->lock);
766
767         return 0;
768 }
769
770 /*!
771  * \internal \brief Dispatch a message to a subscriber
772  * \param sub The subscriber to dispatch to
773  * \param message The message to send
774  * \param synchronous If non-zero, synchronize on the subscriber receiving
775  * the message
776  */
777 static void dispatch_message(struct stasis_subscription *sub,
778         struct stasis_message *message,
779         int synchronous)
780 {
781         if (!sub->mailbox) {
782                 /* Dispatch directly */
783                 subscription_invoke(sub, message);
784                 return;
785         }
786
787         /* Bump the message for the taskprocessor push. This will get de-ref'd
788          * by the task processor callback.
789          */
790         ao2_bump(message);
791         if (!synchronous) {
792                 if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec_async, message)) {
793                         /* Push failed; ugh. */
794                         ast_log(LOG_ERROR, "Dropping async dispatch\n");
795                         ao2_cleanup(message);
796                 }
797         } else {
798                 struct sync_task_data std;
799
800                 ast_mutex_init(&std.lock);
801                 ast_cond_init(&std.cond, NULL);
802                 std.complete = 0;
803                 std.task_data = message;
804
805                 if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec_sync, &std)) {
806                         /* Push failed; ugh. */
807                         ast_log(LOG_ERROR, "Dropping sync dispatch\n");
808                         ao2_cleanup(message);
809                         ast_mutex_destroy(&std.lock);
810                         ast_cond_destroy(&std.cond);
811                         return;
812                 }
813
814                 ast_mutex_lock(&std.lock);
815                 while (!std.complete) {
816                         ast_cond_wait(&std.cond, &std.lock);
817                 }
818                 ast_mutex_unlock(&std.lock);
819
820                 ast_mutex_destroy(&std.lock);
821                 ast_cond_destroy(&std.cond);
822         }
823 }
824
825 /*!
826  * \internal \brief Publish a message to a topic's subscribers
827  * \brief topic The topic to publish to
828  * \brief message The message to publish
829  * \brief sync_sub An optional subscriber of the topic to publish synchronously
830  * to
831  */
832 static void publish_msg(struct stasis_topic *topic,
833         struct stasis_message *message, struct stasis_subscription *sync_sub)
834 {
835         size_t i;
836
837         ast_assert(topic != NULL);
838         ast_assert(message != NULL);
839
840         /*
841          * The topic may be unref'ed by the subscription invocation.
842          * Make sure we hold onto a reference while dispatching.
843          */
844         ao2_ref(topic, +1);
845         ao2_lock(topic);
846         for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
847                 struct stasis_subscription *sub = AST_VECTOR_GET(&topic->subscribers, i);
848
849                 ast_assert(sub != NULL);
850
851                 dispatch_message(sub, message, (sub == sync_sub));
852         }
853         ao2_unlock(topic);
854         ao2_ref(topic, -1);
855 }
856
857 void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
858 {
859         publish_msg(topic, message, NULL);
860 }
861
862 void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message)
863 {
864         ast_assert(sub != NULL);
865
866         publish_msg(sub->topic, message, sub);
867 }
868
869 /*!
870  * \brief Forwarding information
871  *
872  * Any message posted to \a from_topic is forwarded to \a to_topic.
873  *
874  * In cases where both the \a from_topic and \a to_topic need to be locked,
875  * always lock the \a to_topic first, then the \a from_topic. Lest you deadlock.
876  */
877 struct stasis_forward {
878         /*! Originating topic */
879         struct stasis_topic *from_topic;
880         /*! Destination topic */
881         struct stasis_topic *to_topic;
882 };
883
884 static void forward_dtor(void *obj)
885 {
886         struct stasis_forward *forward = obj;
887
888         ao2_cleanup(forward->from_topic);
889         forward->from_topic = NULL;
890         ao2_cleanup(forward->to_topic);
891         forward->to_topic = NULL;
892 }
893
894 struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward)
895 {
896         int idx;
897         struct stasis_topic *from;
898         struct stasis_topic *to;
899
900         if (!forward) {
901                 return NULL;
902         }
903
904         from = forward->from_topic;
905         to = forward->to_topic;
906
907         if (from && to) {
908                 topic_lock_both(to, from);
909                 AST_VECTOR_REMOVE_ELEM_UNORDERED(&to->upstream_topics, from,
910                         AST_VECTOR_ELEM_CLEANUP_NOOP);
911
912                 for (idx = 0; idx < AST_VECTOR_SIZE(&to->subscribers); ++idx) {
913                         topic_remove_subscription(from, AST_VECTOR_GET(&to->subscribers, idx));
914                 }
915                 ao2_unlock(from);
916                 ao2_unlock(to);
917         }
918
919         ao2_cleanup(forward);
920
921         return NULL;
922 }
923
924 struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic,
925         struct stasis_topic *to_topic)
926 {
927         int res;
928         size_t idx;
929         struct stasis_forward *forward;
930
931         if (!from_topic || !to_topic) {
932                 return NULL;
933         }
934
935         forward = ao2_alloc_options(sizeof(*forward), forward_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
936         if (!forward) {
937                 return NULL;
938         }
939
940         /* Forwards to ourselves are implicit. */
941         if (to_topic == from_topic) {
942                 return forward;
943         }
944
945         forward->from_topic = ao2_bump(from_topic);
946         forward->to_topic = ao2_bump(to_topic);
947
948         topic_lock_both(to_topic, from_topic);
949         res = AST_VECTOR_APPEND(&to_topic->upstream_topics, from_topic);
950         if (res != 0) {
951                 ao2_unlock(from_topic);
952                 ao2_unlock(to_topic);
953                 ao2_ref(forward, -1);
954                 return NULL;
955         }
956
957         for (idx = 0; idx < AST_VECTOR_SIZE(&to_topic->subscribers); ++idx) {
958                 topic_add_subscription(from_topic, AST_VECTOR_GET(&to_topic->subscribers, idx));
959         }
960         ao2_unlock(from_topic);
961         ao2_unlock(to_topic);
962
963         return forward;
964 }
965
966 static void subscription_change_dtor(void *obj)
967 {
968         struct stasis_subscription_change *change = obj;
969
970         ast_string_field_free_memory(change);
971         ao2_cleanup(change->topic);
972 }
973
974 static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, const char *uniqueid, const char *description)
975 {
976         struct stasis_subscription_change *change;
977
978         change = ao2_alloc(sizeof(struct stasis_subscription_change), subscription_change_dtor);
979         if (!change || ast_string_field_init(change, 128)) {
980                 ao2_cleanup(change);
981                 return NULL;
982         }
983
984         ast_string_field_set(change, uniqueid, uniqueid);
985         ast_string_field_set(change, description, description);
986         ao2_ref(topic, +1);
987         change->topic = topic;
988
989         return change;
990 }
991
992 static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
993 {
994         struct stasis_subscription_change *change;
995         struct stasis_message *msg;
996
997         /* This assumes that we have already unsubscribed */
998         ast_assert(stasis_subscription_is_subscribed(sub));
999
1000         if (!stasis_subscription_change_type()) {
1001                 return;
1002         }
1003
1004         change = subscription_change_alloc(topic, sub->uniqueid, "Subscribe");
1005         if (!change) {
1006                 return;
1007         }
1008
1009         msg = stasis_message_create(stasis_subscription_change_type(), change);
1010         if (!msg) {
1011                 ao2_cleanup(change);
1012                 return;
1013         }
1014
1015         stasis_publish(topic, msg);
1016         ao2_cleanup(msg);
1017         ao2_cleanup(change);
1018 }
1019
1020 static void send_subscription_unsubscribe(struct stasis_topic *topic,
1021         struct stasis_subscription *sub)
1022 {
1023         struct stasis_subscription_change *change;
1024         struct stasis_message *msg;
1025
1026         /* This assumes that we have already unsubscribed */
1027         ast_assert(!stasis_subscription_is_subscribed(sub));
1028
1029         if (!stasis_subscription_change_type()) {
1030                 return;
1031         }
1032
1033         change = subscription_change_alloc(topic, sub->uniqueid, "Unsubscribe");
1034         if (!change) {
1035                 return;
1036         }
1037
1038         msg = stasis_message_create(stasis_subscription_change_type(), change);
1039         if (!msg) {
1040                 ao2_cleanup(change);
1041                 return;
1042         }
1043
1044         stasis_publish(topic, msg);
1045
1046         /* Now we have to dispatch to the subscription itself */
1047         dispatch_message(sub, msg, 0);
1048
1049         ao2_cleanup(msg);
1050         ao2_cleanup(change);
1051 }
1052
1053 struct topic_pool_entry {
1054         struct stasis_forward *forward;
1055         struct stasis_topic *topic;
1056 };
1057
1058 static void topic_pool_entry_dtor(void *obj)
1059 {
1060         struct topic_pool_entry *entry = obj;
1061
1062         entry->forward = stasis_forward_cancel(entry->forward);
1063         ao2_cleanup(entry->topic);
1064         entry->topic = NULL;
1065 }
1066
1067 static struct topic_pool_entry *topic_pool_entry_alloc(void)
1068 {
1069         return ao2_alloc_options(sizeof(struct topic_pool_entry), topic_pool_entry_dtor,
1070                 AO2_ALLOC_OPT_LOCK_NOLOCK);
1071 }
1072
1073 struct stasis_topic_pool {
1074         struct ao2_container *pool_container;
1075         struct stasis_topic *pool_topic;
1076 };
1077
1078 static void topic_pool_dtor(void *obj)
1079 {
1080         struct stasis_topic_pool *pool = obj;
1081
1082         ao2_cleanup(pool->pool_container);
1083         pool->pool_container = NULL;
1084         ao2_cleanup(pool->pool_topic);
1085         pool->pool_topic = NULL;
1086 }
1087
1088 static int topic_pool_entry_hash(const void *obj, const int flags)
1089 {
1090         const struct topic_pool_entry *object;
1091         const char *key;
1092
1093         switch (flags & OBJ_SEARCH_MASK) {
1094         case OBJ_SEARCH_KEY:
1095                 key = obj;
1096                 break;
1097         case OBJ_SEARCH_OBJECT:
1098                 object = obj;
1099                 key = stasis_topic_name(object->topic);
1100                 break;
1101         default:
1102                 /* Hash can only work on something with a full key. */
1103                 ast_assert(0);
1104                 return 0;
1105         }
1106         return ast_str_case_hash(key);
1107 }
1108
1109 static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
1110 {
1111         const struct topic_pool_entry *object_left = obj;
1112         const struct topic_pool_entry *object_right = arg;
1113         const char *right_key = arg;
1114         int cmp;
1115
1116         switch (flags & OBJ_SEARCH_MASK) {
1117         case OBJ_SEARCH_OBJECT:
1118                 right_key = stasis_topic_name(object_right->topic);
1119                 /* Fall through */
1120         case OBJ_SEARCH_KEY:
1121                 cmp = strcasecmp(stasis_topic_name(object_left->topic), right_key);
1122                 break;
1123         case OBJ_SEARCH_PARTIAL_KEY:
1124                 /* Not supported by container */
1125                 ast_assert(0);
1126                 cmp = -1;
1127                 break;
1128         default:
1129                 /*
1130                  * What arg points to is specific to this traversal callback
1131                  * and has no special meaning to astobj2.
1132                  */
1133                 cmp = 0;
1134                 break;
1135         }
1136         if (cmp) {
1137                 return 0;
1138         }
1139         /*
1140          * At this point the traversal callback is identical to a sorted
1141          * container.
1142          */
1143         return CMP_MATCH;
1144 }
1145
1146 struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic)
1147 {
1148         struct stasis_topic_pool *pool;
1149
1150         pool = ao2_alloc_options(sizeof(*pool), topic_pool_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
1151         if (!pool) {
1152                 return NULL;
1153         }
1154
1155         pool->pool_container = ao2_container_alloc(TOPIC_POOL_BUCKETS,
1156                 topic_pool_entry_hash, topic_pool_entry_cmp);
1157         if (!pool->pool_container) {
1158                 ao2_cleanup(pool);
1159                 return NULL;
1160         }
1161         ao2_ref(pooled_topic, +1);
1162         pool->pool_topic = pooled_topic;
1163
1164         return pool;
1165 }
1166
1167 struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
1168 {
1169         RAII_VAR(struct topic_pool_entry *, topic_pool_entry, NULL, ao2_cleanup);
1170         SCOPED_AO2LOCK(topic_container_lock, pool->pool_container);
1171
1172         topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1173         if (topic_pool_entry) {
1174                 return topic_pool_entry->topic;
1175         }
1176
1177         topic_pool_entry = topic_pool_entry_alloc();
1178         if (!topic_pool_entry) {
1179                 return NULL;
1180         }
1181
1182         topic_pool_entry->topic = stasis_topic_create(topic_name);
1183         if (!topic_pool_entry->topic) {
1184                 return NULL;
1185         }
1186
1187         topic_pool_entry->forward = stasis_forward_all(topic_pool_entry->topic, pool->pool_topic);
1188         if (!topic_pool_entry->forward) {
1189                 return NULL;
1190         }
1191
1192         if (!ao2_link_flags(pool->pool_container, topic_pool_entry, OBJ_NOLOCK)) {
1193                 return NULL;
1194         }
1195
1196         return topic_pool_entry->topic;
1197 }
1198
1199 int stasis_topic_pool_topic_exists(const struct stasis_topic_pool *pool, const char *topic_name)
1200 {
1201         struct topic_pool_entry *topic_pool_entry;
1202
1203         topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY);
1204         if (!topic_pool_entry) {
1205                 return 0;
1206         }
1207
1208         ao2_ref(topic_pool_entry, -1);
1209         return 1;
1210 }
1211
1212 void stasis_log_bad_type_access(const char *name)
1213 {
1214 #ifdef AST_DEVMODE
1215         if (!stasis_message_type_declined(name)) {
1216                 ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name);
1217         }
1218 #endif
1219 }
1220
1221 /*! \brief A multi object blob data structure to carry user event stasis messages */
1222 struct ast_multi_object_blob {
1223         struct ast_json *blob;                             /*< A blob of JSON data */
1224         AST_VECTOR(, void *) snapshots[STASIS_UMOS_MAX];   /*< Vector of snapshots for each type */
1225 };
1226
1227 /*!
1228  * \internal
1229  * \brief Destructor for \ref ast_multi_object_blob objects
1230  */
1231 static void multi_object_blob_dtor(void *obj)
1232 {
1233         struct ast_multi_object_blob *multi = obj;
1234         int type;
1235         int i;
1236
1237         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1238                 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
1239                         ao2_cleanup(AST_VECTOR_GET(&multi->snapshots[type], i));
1240                 }
1241                 AST_VECTOR_FREE(&multi->snapshots[type]);
1242         }
1243         ast_json_unref(multi->blob);
1244 }
1245
1246 /*! \brief Create a stasis user event multi object blob */
1247 struct ast_multi_object_blob *ast_multi_object_blob_create(struct ast_json *blob)
1248 {
1249         int type;
1250         struct ast_multi_object_blob *multi;
1251
1252         ast_assert(blob != NULL);
1253
1254         multi = ao2_alloc(sizeof(*multi), multi_object_blob_dtor);
1255         if (!multi) {
1256                 return NULL;
1257         }
1258
1259         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1260                 if (AST_VECTOR_INIT(&multi->snapshots[type], 0)) {
1261                         ao2_ref(multi, -1);
1262
1263                         return NULL;
1264                 }
1265         }
1266
1267         multi->blob = ast_json_ref(blob);
1268
1269         return multi;
1270 }
1271
1272 /*! \brief Add an object (snapshot) to the blob */
1273 void ast_multi_object_blob_add(struct ast_multi_object_blob *multi,
1274         enum stasis_user_multi_object_snapshot_type type, void *object)
1275 {
1276         if (!multi || !object || AST_VECTOR_APPEND(&multi->snapshots[type], object)) {
1277                 ao2_cleanup(object);
1278         }
1279 }
1280
1281 /*! \brief Publish single channel user event (for app_userevent compatibility) */
1282 void ast_multi_object_blob_single_channel_publish(struct ast_channel *chan,
1283         struct stasis_message_type *type, struct ast_json *blob)
1284 {
1285         struct stasis_message *message;
1286         struct ast_channel_snapshot *channel_snapshot;
1287         struct ast_multi_object_blob *multi;
1288
1289         if (!type) {
1290                 return;
1291         }
1292
1293         multi = ast_multi_object_blob_create(blob);
1294         if (!multi) {
1295                 return;
1296         }
1297
1298         channel_snapshot = ast_channel_snapshot_create(chan);
1299         if (!channel_snapshot) {
1300                 ao2_ref(multi, -1);
1301                 return;
1302         }
1303
1304         /* this call steals the channel_snapshot reference */
1305         ast_multi_object_blob_add(multi, STASIS_UMOS_CHANNEL, channel_snapshot);
1306
1307         message = stasis_message_create(type, multi);
1308         ao2_ref(multi, -1);
1309         if (message) {
1310                 /* app_userevent still publishes to channel */
1311                 stasis_publish(ast_channel_topic(chan), message);
1312                 ao2_ref(message, -1);
1313         }
1314 }
1315
1316 /*! \internal \brief convert multi object blob to ari json */
1317 static struct ast_json *multi_user_event_to_json(
1318         struct stasis_message *message,
1319         const struct stasis_message_sanitizer *sanitize)
1320 {
1321         struct ast_json *out;
1322         struct ast_multi_object_blob *multi = stasis_message_data(message);
1323         struct ast_json *blob = multi->blob;
1324         const struct timeval *tv = stasis_message_timestamp(message);
1325         enum stasis_user_multi_object_snapshot_type type;
1326         int i;
1327
1328         out = ast_json_object_create();
1329         if (!out) {
1330                 return NULL;
1331         }
1332
1333         ast_json_object_set(out, "type", ast_json_string_create("ChannelUserevent"));
1334         ast_json_object_set(out, "timestamp", ast_json_timeval(*tv, NULL));
1335         ast_json_object_set(out, "eventname", ast_json_string_create(ast_json_string_get((ast_json_object_get(blob, "eventname")))));
1336         ast_json_object_set(out, "userevent", ast_json_deep_copy(blob));
1337
1338         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1339                 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
1340                         struct ast_json *json_object = NULL;
1341                         char *name = NULL;
1342                         void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
1343
1344                         switch (type) {
1345                         case STASIS_UMOS_CHANNEL:
1346                                 json_object = ast_channel_snapshot_to_json(snapshot, sanitize);
1347                                 name = "channel";
1348                                 break;
1349                         case STASIS_UMOS_BRIDGE:
1350                                 json_object = ast_bridge_snapshot_to_json(snapshot, sanitize);
1351                                 name = "bridge";
1352                                 break;
1353                         case STASIS_UMOS_ENDPOINT:
1354                                 json_object = ast_endpoint_snapshot_to_json(snapshot, sanitize);
1355                                 name = "endpoint";
1356                                 break;
1357                         }
1358                         if (json_object) {
1359                                 ast_json_object_set(out, name, json_object);
1360                         }
1361                 }
1362         }
1363
1364         return out;
1365 }
1366
1367 /*! \internal \brief convert multi object blob to ami string */
1368 static struct ast_str *multi_object_blob_to_ami(void *obj)
1369 {
1370         struct ast_str *ami_str=ast_str_create(1024);
1371         struct ast_str *ami_snapshot;
1372         const struct ast_multi_object_blob *multi = obj;
1373         enum stasis_user_multi_object_snapshot_type type;
1374         int i;
1375
1376         if (!ami_str) {
1377                 return NULL;
1378         }
1379         if (!multi) {
1380                 ast_free(ami_str);
1381                 return NULL;
1382         }
1383
1384         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1385                 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
1386                         char *name = NULL;
1387                         void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
1388                         ami_snapshot = NULL;
1389
1390                         if (i > 0) {
1391                                 ast_asprintf(&name, "%d", i + 1);
1392                         }
1393
1394                         switch (type) {
1395                         case STASIS_UMOS_CHANNEL:
1396                                 ami_snapshot = ast_manager_build_channel_state_string_prefix(snapshot, name ?: "");
1397                                 break;
1398
1399                         case STASIS_UMOS_BRIDGE:
1400                                 ami_snapshot = ast_manager_build_bridge_state_string_prefix(snapshot, name ?: "");
1401                                 break;
1402
1403                         case STASIS_UMOS_ENDPOINT:
1404                                 /* currently not sending endpoint snapshots to AMI */
1405                                 break;
1406                         }
1407                         if (ami_snapshot) {
1408                                 ast_str_append(&ami_str, 0, "%s", ast_str_buffer(ami_snapshot));
1409                                 ast_free(ami_snapshot);
1410                         }
1411                         ast_free(name);
1412                 }
1413         }
1414
1415         return ami_str;
1416 }
1417
1418 /*! \internal \brief Callback to pass only user defined parameters from blob */
1419 static int userevent_exclusion_cb(const char *key)
1420 {
1421         if (!strcmp("eventname", key)) {
1422                 return 1;
1423         }
1424         return 0;
1425 }
1426
1427 static struct ast_manager_event_blob *multi_user_event_to_ami(
1428         struct stasis_message *message)
1429 {
1430         RAII_VAR(struct ast_str *, object_string, NULL, ast_free);
1431         RAII_VAR(struct ast_str *, body, NULL, ast_free);
1432         struct ast_multi_object_blob *multi = stasis_message_data(message);
1433         const char *eventname;
1434
1435         eventname = ast_json_string_get(ast_json_object_get(multi->blob, "eventname"));
1436         body = ast_manager_str_from_json_object(multi->blob, userevent_exclusion_cb);
1437         object_string = multi_object_blob_to_ami(multi);
1438         if (!object_string || !body) {
1439                 return NULL;
1440         }
1441
1442         return ast_manager_event_blob_create(EVENT_FLAG_USER, "UserEvent",
1443                 "%s"
1444                 "UserEvent: %s\r\n"
1445                 "%s",
1446                 ast_str_buffer(object_string),
1447                 eventname,
1448                 ast_str_buffer(body));
1449 }
1450
1451 /*! \brief A structure to hold global configuration-related options */
1452 struct stasis_declined_config {
1453         /*! The list of message types to decline */
1454         struct ao2_container *declined;
1455 };
1456
1457 /*! \brief Threadpool configuration options */
1458 struct stasis_threadpool_conf {
1459         /*! Initial size of the thread pool */
1460         int initial_size;
1461         /*! Time, in seconds, before we expire a thread */
1462         int idle_timeout_sec;
1463         /*! Maximum number of thread to allow */
1464         int max_size;
1465 };
1466
1467 struct stasis_config {
1468         /*! Thread pool configuration options */
1469         struct stasis_threadpool_conf *threadpool_options;
1470         /*! Declined message types */
1471         struct stasis_declined_config *declined_message_types;
1472 };
1473
1474 static struct aco_type threadpool_option = {
1475         .type = ACO_GLOBAL,
1476         .name = "threadpool",
1477         .item_offset = offsetof(struct stasis_config, threadpool_options),
1478         .category = "threadpool",
1479         .category_match = ACO_WHITELIST_EXACT,
1480 };
1481
1482 static struct aco_type *threadpool_options[] = ACO_TYPES(&threadpool_option);
1483
1484 /*! \brief An aco_type structure to link the "declined_message_types" category to the stasis_declined_config type */
1485 static struct aco_type declined_option = {
1486         .type = ACO_GLOBAL,
1487         .name = "declined_message_types",
1488         .item_offset = offsetof(struct stasis_config, declined_message_types),
1489         .category_match = ACO_WHITELIST_EXACT,
1490         .category = "declined_message_types",
1491 };
1492
1493 struct aco_type *declined_options[] = ACO_TYPES(&declined_option);
1494
1495 struct aco_file stasis_conf = {
1496         .filename = "stasis.conf",
1497         .types = ACO_TYPES(&declined_option, &threadpool_option),
1498 };
1499
1500 /*! \brief A global object container that will contain the stasis_config that gets swapped out on reloads */
1501 static AO2_GLOBAL_OBJ_STATIC(globals);
1502
1503 static void *stasis_config_alloc(void);
1504
1505 /*! \brief Register information about the configs being processed by this module */
1506 CONFIG_INFO_CORE("stasis", cfg_info, globals, stasis_config_alloc,
1507         .files = ACO_FILES(&stasis_conf),
1508 );
1509
1510 static void stasis_declined_config_destructor(void *obj)
1511 {
1512         struct stasis_declined_config *declined = obj;
1513
1514         ao2_cleanup(declined->declined);
1515 }
1516
1517 static void stasis_config_destructor(void *obj)
1518 {
1519         struct stasis_config *cfg = obj;
1520
1521         ao2_cleanup(cfg->declined_message_types);
1522         ast_free(cfg->threadpool_options);
1523 }
1524
1525 static void *stasis_config_alloc(void)
1526 {
1527         struct stasis_config *cfg;
1528
1529         if (!(cfg = ao2_alloc(sizeof(*cfg), stasis_config_destructor))) {
1530                 return NULL;
1531         }
1532
1533         cfg->threadpool_options = ast_calloc(1, sizeof(*cfg->threadpool_options));
1534         if (!cfg->threadpool_options) {
1535                 ao2_ref(cfg, -1);
1536                 return NULL;
1537         }
1538
1539         cfg->declined_message_types = ao2_alloc(sizeof(*cfg->declined_message_types),
1540                 stasis_declined_config_destructor);
1541         if (!cfg->declined_message_types) {
1542                 ao2_ref(cfg, -1);
1543                 return NULL;
1544         }
1545
1546         cfg->declined_message_types->declined = ast_str_container_alloc(13);
1547         if (!cfg->declined_message_types->declined) {
1548                 ao2_ref(cfg, -1);
1549                 return NULL;
1550         }
1551
1552         return cfg;
1553 }
1554
1555 int stasis_message_type_declined(const char *name)
1556 {
1557         struct stasis_config *cfg = ao2_global_obj_ref(globals);
1558         char *name_in_declined;
1559         int res;
1560
1561         if (!cfg || !cfg->declined_message_types) {
1562                 ao2_cleanup(cfg);
1563                 return 0;
1564         }
1565
1566         name_in_declined = ao2_find(cfg->declined_message_types->declined, name, OBJ_SEARCH_KEY);
1567         res = name_in_declined ? 1 : 0;
1568         ao2_cleanup(name_in_declined);
1569         ao2_ref(cfg, -1);
1570         if (res) {
1571                 ast_log(LOG_NOTICE, "Declining to allocate Stasis message type '%s' due to configuration\n", name);
1572         }
1573         return res;
1574 }
1575
1576 static int declined_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
1577 {
1578         struct stasis_declined_config *declined = obj;
1579
1580         if (ast_strlen_zero(var->value)) {
1581                 return 0;
1582         }
1583
1584         if (ast_str_container_add(declined->declined, var->value)) {
1585                 return -1;
1586         }
1587
1588         return 0;
1589 }
1590
1591 /*!
1592  * @{ \brief Define multi user event message type(s).
1593  */
1594
1595 STASIS_MESSAGE_TYPE_DEFN(ast_multi_user_event_type,
1596         .to_json = multi_user_event_to_json,
1597         .to_ami = multi_user_event_to_ami,
1598         );
1599
1600 /*! @} */
1601
1602 /*! \brief Cleanup function for graceful shutdowns */
1603 static void stasis_cleanup(void)
1604 {
1605         ast_threadpool_shutdown(pool);
1606         pool = NULL;
1607         STASIS_MESSAGE_TYPE_CLEANUP(stasis_subscription_change_type);
1608         STASIS_MESSAGE_TYPE_CLEANUP(ast_multi_user_event_type);
1609         aco_info_destroy(&cfg_info);
1610         ao2_global_obj_release(globals);
1611 }
1612
1613 int stasis_init(void)
1614 {
1615         struct stasis_config *cfg;
1616         int cache_init;
1617         struct ast_threadpool_options threadpool_opts = { 0, };
1618
1619         /* Be sure the types are cleaned up after the message bus */
1620         ast_register_cleanup(stasis_cleanup);
1621
1622         if (aco_info_init(&cfg_info)) {
1623                 return -1;
1624         }
1625
1626         aco_option_register_custom(&cfg_info, "decline", ACO_EXACT,
1627                 declined_options, "", declined_handler, 0);
1628         aco_option_register(&cfg_info, "initial_size", ACO_EXACT,
1629                 threadpool_options, "5", OPT_INT_T, PARSE_IN_RANGE,
1630                 FLDSET(struct stasis_threadpool_conf, initial_size), 0,
1631                 INT_MAX);
1632         aco_option_register(&cfg_info, "idle_timeout_sec", ACO_EXACT,
1633                 threadpool_options, "20", OPT_INT_T, PARSE_IN_RANGE,
1634                 FLDSET(struct stasis_threadpool_conf, idle_timeout_sec), 0,
1635                 INT_MAX);
1636         aco_option_register(&cfg_info, "max_size", ACO_EXACT,
1637                 threadpool_options, "50", OPT_INT_T, PARSE_IN_RANGE,
1638                 FLDSET(struct stasis_threadpool_conf, max_size), 0,
1639                 INT_MAX);
1640
1641         if (aco_process_config(&cfg_info, 0) == ACO_PROCESS_ERROR) {
1642                 struct stasis_config *default_cfg = stasis_config_alloc();
1643
1644                 if (!default_cfg) {
1645                         return -1;
1646                 }
1647
1648                 if (aco_set_defaults(&threadpool_option, "threadpool", default_cfg->threadpool_options)) {
1649                         ast_log(LOG_ERROR, "Failed to initialize defaults on Stasis configuration object\n");
1650                         ao2_ref(default_cfg, -1);
1651
1652                         return -1;
1653                 }
1654
1655                 if (aco_set_defaults(&declined_option, "declined_message_types", default_cfg->declined_message_types)) {
1656                         ast_log(LOG_ERROR, "Failed to load stasis.conf and failed to initialize defaults.\n");
1657                         ao2_ref(default_cfg, -1);
1658
1659                         return -1;
1660                 }
1661
1662                 ast_log(LOG_NOTICE, "Could not load Stasis configuration; using defaults\n");
1663                 ao2_global_obj_replace_unref(globals, default_cfg);
1664                 cfg = default_cfg;
1665         } else {
1666                 cfg = ao2_global_obj_ref(globals);
1667                 if (!cfg) {
1668                         ast_log(LOG_ERROR, "Failed to obtain Stasis configuration object\n");
1669
1670                         return -1;
1671                 }
1672         }
1673
1674         threadpool_opts.version = AST_THREADPOOL_OPTIONS_VERSION;
1675         threadpool_opts.initial_size = cfg->threadpool_options->initial_size;
1676         threadpool_opts.auto_increment = 1;
1677         threadpool_opts.max_size = cfg->threadpool_options->max_size;
1678         threadpool_opts.idle_timeout = cfg->threadpool_options->idle_timeout_sec;
1679         pool = ast_threadpool_create("stasis-core", NULL, &threadpool_opts);
1680         ao2_ref(cfg, -1);
1681         if (!pool) {
1682                 ast_log(LOG_ERROR, "Failed to create 'stasis-core' threadpool\n");
1683
1684                 return -1;
1685         }
1686
1687         cache_init = stasis_cache_init();
1688         if (cache_init != 0) {
1689                 return -1;
1690         }
1691
1692         if (STASIS_MESSAGE_TYPE_INIT(stasis_subscription_change_type) != 0) {
1693                 return -1;
1694         }
1695         if (STASIS_MESSAGE_TYPE_INIT(ast_multi_user_event_type) != 0) {
1696                 return -1;
1697         }
1698
1699         return 0;
1700 }