ed838733b99b286b56c8a476b7f1f4465dae75d3
[asterisk/asterisk.git] / main / stasis.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis Message Bus API.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 #include "asterisk/astobj2.h"
33 #include "asterisk/stasis_internal.h"
34 #include "asterisk/stasis.h"
35 #include "asterisk/taskprocessor.h"
36 #include "asterisk/threadpool.h"
37 #include "asterisk/utils.h"
38 #include "asterisk/uuid.h"
39 #include "asterisk/vector.h"
40 #include "asterisk/stasis_channels.h"
41 #include "asterisk/stasis_bridges.h"
42 #include "asterisk/stasis_endpoints.h"
43 #include "asterisk/config_options.h"
44
45 /*** DOCUMENTATION
46         <managerEvent language="en_US" name="UserEvent">
47                 <managerEventInstance class="EVENT_FLAG_USER">
48                         <synopsis>A user defined event raised from the dialplan.</synopsis>
49                         <syntax>
50                                 <channel_snapshot/>
51                                 <parameter name="UserEvent">
52                                         <para>The event name, as specified in the dialplan.</para>
53                                 </parameter>
54                         </syntax>
55                         <description>
56                                 <para>Event may contain additional arbitrary parameters in addition to optional bridge and endpoint snapshots.  Multiple snapshots of the same type are prefixed with a numeric value.</para>
57                         </description>
58                         <see-also>
59                                 <ref type="application">UserEvent</ref>
60                                 <ref type="managerEvent">UserEvent</ref>
61                         </see-also>
62                 </managerEventInstance>
63         </managerEvent>
64         <configInfo name="stasis" language="en_US">
65                 <configFile name="stasis.conf">
66                         <configObject name="threadpool">
67                                 <synopsis>Settings that configure the threadpool Stasis uses to deliver some messages.</synopsis>
68                                 <configOption name="initial_size" default="5">
69                                         <synopsis>Initial number of threads in the message bus threadpool.</synopsis>
70                                 </configOption>
71                                 <configOption name="idle_timeout_sec" default="20">
72                                         <synopsis>Number of seconds before an idle thread is disposed of.</synopsis>
73                                 </configOption>
74                                 <configOption name="max_size" default="50">
75                                         <synopsis>Maximum number of threads in the threadpool.</synopsis>
76                                 </configOption>
77                         </configObject>
78                         <configObject name="declined_message_types">
79                                 <synopsis>Stasis message types for which to decline creation.</synopsis>
80                                 <configOption name="decline">
81                                         <synopsis>The message type to decline.</synopsis>
82                                         <description>
83                                                 <para>This configuration option defines the name of the Stasis
84                                                 message type that Asterisk is forbidden from creating and can be
85                                                 specified as many times as necessary to achieve the desired result.</para>
86                                                 <enumlist>
87                                                         <enum name="stasis_app_recording_snapshot_type" />
88                                                         <enum name="stasis_app_playback_snapshot_type" />
89                                                         <enum name="stasis_test_message_type" />
90                                                         <enum name="confbridge_start_type" />
91                                                         <enum name="confbridge_end_type" />
92                                                         <enum name="confbridge_join_type" />
93                                                         <enum name="confbridge_leave_type" />
94                                                         <enum name="confbridge_start_record_type" />
95                                                         <enum name="confbridge_stop_record_type" />
96                                                         <enum name="confbridge_mute_type" />
97                                                         <enum name="confbridge_unmute_type" />
98                                                         <enum name="confbridge_talking_type" />
99                                                         <enum name="cel_generic_type" />
100                                                         <enum name="ast_bridge_snapshot_type" />
101                                                         <enum name="ast_bridge_merge_message_type" />
102                                                         <enum name="ast_channel_entered_bridge_type" />
103                                                         <enum name="ast_channel_left_bridge_type" />
104                                                         <enum name="ast_blind_transfer_type" />
105                                                         <enum name="ast_attended_transfer_type" />
106                                                         <enum name="ast_endpoint_snapshot_type" />
107                                                         <enum name="ast_endpoint_state_type" />
108                                                         <enum name="ast_device_state_message_type" />
109                                                         <enum name="ast_test_suite_message_type" />
110                                                         <enum name="ast_mwi_state_type" />
111                                                         <enum name="ast_mwi_vm_app_type" />
112                                                         <enum name="ast_format_register_type" />
113                                                         <enum name="ast_format_unregister_type" />
114                                                         <enum name="ast_manager_get_generic_type" />
115                                                         <enum name="ast_parked_call_type" />
116                                                         <enum name="ast_channel_snapshot_type" />
117                                                         <enum name="ast_channel_dial_type" />
118                                                         <enum name="ast_channel_varset_type" />
119                                                         <enum name="ast_channel_hangup_request_type" />
120                                                         <enum name="ast_channel_dtmf_begin_type" />
121                                                         <enum name="ast_channel_dtmf_end_type" />
122                                                         <enum name="ast_channel_hold_type" />
123                                                         <enum name="ast_channel_unhold_type" />
124                                                         <enum name="ast_channel_chanspy_start_type" />
125                                                         <enum name="ast_channel_chanspy_stop_type" />
126                                                         <enum name="ast_channel_fax_type" />
127                                                         <enum name="ast_channel_hangup_handler_type" />
128                                                         <enum name="ast_channel_moh_start_type" />
129                                                         <enum name="ast_channel_moh_stop_type" />
130                                                         <enum name="ast_channel_monitor_start_type" />
131                                                         <enum name="ast_channel_monitor_stop_type" />
132                                                         <enum name="ast_channel_agent_login_type" />
133                                                         <enum name="ast_channel_agent_logoff_type" />
134                                                         <enum name="ast_channel_talking_start" />
135                                                         <enum name="ast_channel_talking_stop" />
136                                                         <enum name="ast_security_event_type" />
137                                                         <enum name="ast_named_acl_change_type" />
138                                                         <enum name="ast_local_bridge_type" />
139                                                         <enum name="ast_local_optimization_begin_type" />
140                                                         <enum name="ast_local_optimization_end_type" />
141                                                         <enum name="stasis_subscription_change_type" />
142                                                         <enum name="ast_multi_user_event_type" />
143                                                         <enum name="stasis_cache_clear_type" />
144                                                         <enum name="stasis_cache_update_type" />
145                                                         <enum name="ast_network_change_type" />
146                                                         <enum name="ast_system_registry_type" />
147                                                         <enum name="ast_cc_available_type" />
148                                                         <enum name="ast_cc_offertimerstart_type" />
149                                                         <enum name="ast_cc_requested_type" />
150                                                         <enum name="ast_cc_requestacknowledged_type" />
151                                                         <enum name="ast_cc_callerstopmonitoring_type" />
152                                                         <enum name="ast_cc_callerstartmonitoring_type" />
153                                                         <enum name="ast_cc_callerrecalling_type" />
154                                                         <enum name="ast_cc_recallcomplete_type" />
155                                                         <enum name="ast_cc_failure_type" />
156                                                         <enum name="ast_cc_monitorfailed_type" />
157                                                         <enum name="ast_presence_state_message_type" />
158                                                         <enum name="ast_rtp_rtcp_sent_type" />
159                                                         <enum name="ast_rtp_rtcp_received_type" />
160                                                         <enum name="ast_call_pickup_type" />
161                                                         <enum name="aoc_s_type" />
162                                                         <enum name="aoc_d_type" />
163                                                         <enum name="aoc_e_type" />
164                                                         <enum name="dahdichannel_type" />
165                                                         <enum name="mcid_type" />
166                                                         <enum name="session_timeout_type" />
167                                                         <enum name="cdr_read_message_type" />
168                                                         <enum name="cdr_write_message_type" />
169                                                         <enum name="cdr_prop_write_message_type" />
170                                                         <enum name="corosync_ping_message_type" />
171                                                         <enum name="agi_exec_start_type" />
172                                                         <enum name="agi_exec_end_type" />
173                                                         <enum name="agi_async_start_type" />
174                                                         <enum name="agi_async_exec_type" />
175                                                         <enum name="agi_async_end_type" />
176                                                         <enum name="queue_caller_join_type" />
177                                                         <enum name="queue_caller_leave_type" />
178                                                         <enum name="queue_caller_abandon_type" />
179                                                         <enum name="queue_member_status_type" />
180                                                         <enum name="queue_member_added_type" />
181                                                         <enum name="queue_member_removed_type" />
182                                                         <enum name="queue_member_pause_type" />
183                                                         <enum name="queue_member_penalty_type" />
184                                                         <enum name="queue_member_ringinuse_type" />
185                                                         <enum name="queue_agent_called_type" />
186                                                         <enum name="queue_agent_connect_type" />
187                                                         <enum name="queue_agent_complete_type" />
188                                                         <enum name="queue_agent_dump_type" />
189                                                         <enum name="queue_agent_ringnoanswer_type" />
190                                                         <enum name="meetme_join_type" />
191                                                         <enum name="meetme_leave_type" />
192                                                         <enum name="meetme_end_type" />
193                                                         <enum name="meetme_mute_type" />
194                                                         <enum name="meetme_talking_type" />
195                                                         <enum name="meetme_talk_request_type" />
196                                                         <enum name="appcdr_message_type" />
197                                                         <enum name="forkcdr_message_type" />
198                                                         <enum name="cdr_sync_message_type" />
199                                                 </enumlist>
200                                         </description>
201                                 </configOption>
202                         </configObject>
203                 </configFile>
204         </configInfo>
205 ***/
206
207 /*!
208  * \page stasis-impl Stasis Implementation Notes
209  *
210  * \par Reference counting
211  *
212  * Stasis introduces a number of objects, which are tightly related to one
213  * another. Because we rely on ref-counting for memory management, understanding
214  * these relationships is important to understanding this code.
215  *
216  * \code{.txt}
217  *
218  *   stasis_topic <----> stasis_subscription
219  *             ^          ^
220  *              \        /
221  *               \      /
222  *               dispatch
223  *                  |
224  *                  |
225  *                  v
226  *            stasis_message
227  *                  |
228  *                  |
229  *                  v
230  *          stasis_message_type
231  *
232  * \endcode
233  *
234  * The most troubling thing in this chart is the cyclic reference between
235  * stasis_topic and stasis_subscription. This is both unfortunate, and
236  * necessary. Topics need the subscription in order to dispatch messages;
237  * subscriptions need the topics to unsubscribe and check subscription status.
238  *
239  * The cycle is broken by stasis_unsubscribe(). The unsubscribe will remove the
240  * topic's reference to a subscription. When the subcription is destroyed, it
241  * will remove its reference to the topic.
242  *
243  * This means that until a subscription has be explicitly unsubscribed, it will
244  * not be destroyed. Neither will a topic be destroyed while it has subscribers.
245  * The destructors of both have assertions regarding this to catch ref-counting
246  * problems where a subscription or topic has had an extra ao2_cleanup().
247  *
248  * The \ref dispatch object is a transient object, which is posted to a
249  * subscription's taskprocessor to send a message to the subscriber. They have
250  * short life cycles, allocated on one thread, destroyed on another.
251  *
252  * During shutdown, or the deletion of a domain object, there are a flurry of
253  * ao2_cleanup()s on subscriptions and topics, as the final in-flight messages
254  * are processed. Any one of these cleanups could be the one to actually destroy
255  * a given object, so care must be taken to ensure that an object isn't
256  * referenced after an ao2_cleanup(). This includes the implicit ao2_unlock()
257  * that might happen when a RAII_VAR() goes out of scope.
258  *
259  * \par Typical life cycles
260  *
261  *  \li stasis_topic - There are several topics which live for the duration of
262  *      the Asterisk process (ast_channel_topic_all(), etc.) but most of these
263  *      are actually fed by shorter-lived topics whose lifetime is associated
264  *      with some domain object (like ast_channel_topic() for a given
265  *      ast_channel).
266  *
267  *  \li stasis_subscription - Subscriptions have a similar mix of lifetimes as
268  *      topics, for similar reasons.
269  *
270  *  \li dispatch - Very short lived; just long enough to post a message to a
271  *      subscriber.
272  *
273  *  \li stasis_message - Short to intermediate lifetimes, but that is mostly
274  *      irrelevant. Messages are strictly data and have no behavior associated
275  *      with them, so it doesn't really matter if/when they are destroyed. By
276  *      design, a component could hold a ref to a message forever without any
277  *      ill consequences (aside from consuming more memory).
278  *
279  *  \li stasis_message_type - Long life cycles, typically only destroyed on
280  *      module unloading or _clean_ process exit.
281  *
282  * \par Subscriber shutdown sequencing
283  *
284  * Subscribers are sensitive to shutdown sequencing, specifically in how the
285  * reference message types. This is fully detailed on the wiki at
286  * https://wiki.asterisk.org/wiki/x/K4BqAQ.
287  *
288  * In short, the lifetime of the \a data (and \a callback, if in a module) must
289  * be held until the stasis_subscription_final_message() has been received.
290  * Depending on the structure of the subscriber code, this can be handled by
291  * using stasis_subscription_final_message() to free resources on the final
292  * message, or using stasis_subscription_join()/stasis_unsubscribe_and_join() to
293  * block until the unsubscribe has completed.
294  */
295
296 /*! Initial size of the subscribers list. */
297 #define INITIAL_SUBSCRIBERS_MAX 4
298
299 /*! The number of buckets to use for topic pools */
300 #define TOPIC_POOL_BUCKETS 57
301
302 /*! Thread pool for topics that don't want a dedicated taskprocessor */
303 static struct ast_threadpool *pool;
304
305 STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type);
306
307 /*! \internal */
308 struct stasis_topic {
309         char *name;
310         /*! Variable length array of the subscribers */
311         AST_VECTOR(, struct stasis_subscription *) subscribers;
312
313         /*! Topics forwarding into this topic */
314         AST_VECTOR(, struct stasis_topic *) upstream_topics;
315 };
316
317 /* Forward declarations for the tightly-coupled subscription object */
318 static int topic_add_subscription(struct stasis_topic *topic,
319         struct stasis_subscription *sub);
320
321 static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
322
323 /*! \brief Lock two topics. */
324 #define topic_lock_both(topic1, topic2) \
325         do { \
326                 ao2_lock(topic1); \
327                 while (ao2_trylock(topic2)) { \
328                         AO2_DEADLOCK_AVOIDANCE(topic1); \
329                 } \
330         } while (0)
331
332 static void topic_dtor(void *obj)
333 {
334         struct stasis_topic *topic = obj;
335
336         /* Subscribers hold a reference to topics, so they should all be
337          * unsubscribed before we get here. */
338         ast_assert(AST_VECTOR_SIZE(&topic->subscribers) == 0);
339
340         ast_free(topic->name);
341         topic->name = NULL;
342
343         AST_VECTOR_FREE(&topic->subscribers);
344         AST_VECTOR_FREE(&topic->upstream_topics);
345 }
346
347 struct stasis_topic *stasis_topic_create(const char *name)
348 {
349         struct stasis_topic *topic;
350         int res = 0;
351
352         topic = ao2_t_alloc(sizeof(*topic), topic_dtor, name);
353         if (!topic) {
354                 return NULL;
355         }
356
357         topic->name = ast_strdup(name);
358         res |= AST_VECTOR_INIT(&topic->subscribers, INITIAL_SUBSCRIBERS_MAX);
359         res |= AST_VECTOR_INIT(&topic->upstream_topics, 0);
360         if (!topic->name || res) {
361                 ao2_cleanup(topic);
362                 return NULL;
363         }
364
365         return topic;
366 }
367
368 const char *stasis_topic_name(const struct stasis_topic *topic)
369 {
370         return topic->name;
371 }
372
373 /*! \internal */
374 struct stasis_subscription {
375         /*! Unique ID for this subscription */
376         char uniqueid[AST_UUID_STR_LEN];
377         /*! Topic subscribed to. */
378         struct stasis_topic *topic;
379         /*! Mailbox for processing incoming messages. */
380         struct ast_taskprocessor *mailbox;
381         /*! Callback function for incoming message processing. */
382         stasis_subscription_cb callback;
383         /*! Data pointer to be handed to the callback. */
384         void *data;
385
386         /*! Condition for joining with subscription. */
387         ast_cond_t join_cond;
388         /*! Flag set when final message for sub has been received.
389          *  Be sure join_lock is held before reading/setting. */
390         int final_message_rxed;
391         /*! Flag set when final message for sub has been processed.
392          *  Be sure join_lock is held before reading/setting. */
393         int final_message_processed;
394 };
395
396 static void subscription_dtor(void *obj)
397 {
398         struct stasis_subscription *sub = obj;
399
400         /* Subscriptions need to be manually unsubscribed before destruction
401          * b/c there's a cyclic reference between topics and subscriptions */
402         ast_assert(!stasis_subscription_is_subscribed(sub));
403         /* If there are any messages in flight to this subscription; that would
404          * be bad. */
405         ast_assert(stasis_subscription_is_done(sub));
406
407         ao2_cleanup(sub->topic);
408         sub->topic = NULL;
409         ast_taskprocessor_unreference(sub->mailbox);
410         sub->mailbox = NULL;
411         ast_cond_destroy(&sub->join_cond);
412 }
413
414 /*!
415  * \brief Invoke the subscription's callback.
416  * \param sub Subscription to invoke.
417  * \param topic Topic message was published to.
418  * \param message Message to send.
419  */
420 static void subscription_invoke(struct stasis_subscription *sub,
421                                   struct stasis_message *message)
422 {
423         /* Notify that the final message has been received */
424         if (stasis_subscription_final_message(sub, message)) {
425                 ao2_lock(sub);
426                 sub->final_message_rxed = 1;
427                 ast_cond_signal(&sub->join_cond);
428                 ao2_unlock(sub);
429         }
430
431         /* Since sub is mostly immutable, no need to lock sub */
432         sub->callback(sub->data, sub, message);
433
434         /* Notify that the final message has been processed */
435         if (stasis_subscription_final_message(sub, message)) {
436                 ao2_lock(sub);
437                 sub->final_message_processed = 1;
438                 ast_cond_signal(&sub->join_cond);
439                 ao2_unlock(sub);
440         }
441 }
442
443 static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
444 static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
445
446 void stasis_subscription_cb_noop(void *data, struct stasis_subscription *sub, struct stasis_message *message)
447 {
448 }
449
450 struct stasis_subscription *internal_stasis_subscribe(
451         struct stasis_topic *topic,
452         stasis_subscription_cb callback,
453         void *data,
454         int needs_mailbox,
455         int use_thread_pool)
456 {
457         struct stasis_subscription *sub;
458
459         if (!topic) {
460                 return NULL;
461         }
462
463         /* The ao2 lock is used for join_cond. */
464         sub = ao2_t_alloc(sizeof(*sub), subscription_dtor, stasis_topic_name(topic));
465         if (!sub) {
466                 return NULL;
467         }
468         ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid));
469
470         if (needs_mailbox) {
471                 char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
472
473                 /* Create name with seq number appended. */
474                 ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "sub%c:%s",
475                         use_thread_pool ? 'p' : 'm',
476                         stasis_topic_name(topic));
477
478                 /*
479                  * With a small number of subscribers, a thread-per-sub is
480                  * acceptable. For a large number of subscribers, a thread
481                  * pool should be used.
482                  */
483                 if (use_thread_pool) {
484                         sub->mailbox = ast_threadpool_serializer(tps_name, pool);
485                 } else {
486                         sub->mailbox = ast_taskprocessor_get(tps_name, TPS_REF_DEFAULT);
487                 }
488                 if (!sub->mailbox) {
489                         ao2_ref(sub, -1);
490
491                         return NULL;
492                 }
493                 ast_taskprocessor_set_local(sub->mailbox, sub);
494                 /* Taskprocessor has a reference */
495                 ao2_ref(sub, +1);
496         }
497
498         ao2_ref(topic, +1);
499         sub->topic = topic;
500         sub->callback = callback;
501         sub->data = data;
502         ast_cond_init(&sub->join_cond, NULL);
503
504         if (topic_add_subscription(topic, sub) != 0) {
505                 ao2_ref(sub, -1);
506
507                 return NULL;
508         }
509         send_subscription_subscribe(topic, sub);
510
511         return sub;
512 }
513
514 struct stasis_subscription *stasis_subscribe(
515         struct stasis_topic *topic,
516         stasis_subscription_cb callback,
517         void *data)
518 {
519         return internal_stasis_subscribe(topic, callback, data, 1, 0);
520 }
521
522 struct stasis_subscription *stasis_subscribe_pool(
523         struct stasis_topic *topic,
524         stasis_subscription_cb callback,
525         void *data)
526 {
527         return internal_stasis_subscribe(topic, callback, data, 1, 1);
528 }
529
530 static int sub_cleanup(void *data)
531 {
532         struct stasis_subscription *sub = data;
533         ao2_cleanup(sub);
534         return 0;
535 }
536
537 struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
538 {
539         /* The subscription may be the last ref to this topic. Hold
540          * the topic ref open until after the unlock. */
541         struct stasis_topic *topic;
542
543         if (!sub) {
544                 return NULL;
545         }
546
547         topic = ao2_bump(sub->topic);
548
549         /* We have to remove the subscription first, to ensure the unsubscribe
550          * is the final message */
551         if (topic_remove_subscription(sub->topic, sub) != 0) {
552                 ast_log(LOG_ERROR,
553                         "Internal error: subscription has invalid topic\n");
554                 ao2_cleanup(topic);
555
556                 return NULL;
557         }
558
559         /* Now let everyone know about the unsubscribe */
560         send_subscription_unsubscribe(topic, sub);
561
562         /* When all that's done, remove the ref the mailbox has on the sub */
563         if (sub->mailbox) {
564                 if (ast_taskprocessor_push(sub->mailbox, sub_cleanup, sub)) {
565                         /* Nothing we can do here, the conditional is just to keep
566                          * the compiler happy that we're not ignoring the result. */
567                 }
568         }
569
570         /* Unsubscribing unrefs the subscription */
571         ao2_cleanup(sub);
572         ao2_cleanup(topic);
573
574         return NULL;
575 }
576
577 int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscription,
578         long low_water, long high_water)
579 {
580         int res = -1;
581
582         if (subscription) {
583                 res = ast_taskprocessor_alert_set_levels(subscription->mailbox,
584                         low_water, high_water);
585         }
586         return res;
587 }
588
589 void stasis_subscription_join(struct stasis_subscription *subscription)
590 {
591         if (subscription) {
592                 ao2_lock(subscription);
593                 /* Wait until the processed flag has been set */
594                 while (!subscription->final_message_processed) {
595                         ast_cond_wait(&subscription->join_cond,
596                                 ao2_object_get_lockaddr(subscription));
597                 }
598                 ao2_unlock(subscription);
599         }
600 }
601
602 int stasis_subscription_is_done(struct stasis_subscription *subscription)
603 {
604         if (subscription) {
605                 int ret;
606
607                 ao2_lock(subscription);
608                 ret = subscription->final_message_rxed;
609                 ao2_unlock(subscription);
610
611                 return ret;
612         }
613
614         /* Null subscription is about as done as you can get */
615         return 1;
616 }
617
618 struct stasis_subscription *stasis_unsubscribe_and_join(
619         struct stasis_subscription *subscription)
620 {
621         if (!subscription) {
622                 return NULL;
623         }
624
625         /* Bump refcount to hold it past the unsubscribe */
626         ao2_ref(subscription, +1);
627         stasis_unsubscribe(subscription);
628         stasis_subscription_join(subscription);
629         /* Now decrement the refcount back */
630         ao2_cleanup(subscription);
631         return NULL;
632 }
633
634 int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
635 {
636         if (sub) {
637                 size_t i;
638                 struct stasis_topic *topic = sub->topic;
639
640                 ao2_lock(topic);
641                 for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
642                         if (AST_VECTOR_GET(&topic->subscribers, i) == sub) {
643                                 ao2_unlock(topic);
644                                 return 1;
645                         }
646                 }
647                 ao2_unlock(topic);
648         }
649
650         return 0;
651 }
652
653 const char *stasis_subscription_uniqueid(const struct stasis_subscription *sub)
654 {
655         return sub->uniqueid;
656 }
657
658 int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
659 {
660         struct stasis_subscription_change *change;
661
662         if (stasis_message_type(msg) != stasis_subscription_change_type()) {
663                 return 0;
664         }
665
666         change = stasis_message_data(msg);
667         if (strcmp("Unsubscribe", change->description)) {
668                 return 0;
669         }
670
671         if (strcmp(stasis_subscription_uniqueid(sub), change->uniqueid)) {
672                 return 0;
673         }
674
675         return 1;
676 }
677
678 /*!
679  * \brief Add a subscriber to a topic.
680  * \param topic Topic
681  * \param sub Subscriber
682  * \return 0 on success
683  * \return Non-zero on error
684  */
685 static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
686 {
687         size_t idx;
688
689         ao2_lock(topic);
690         /* The reference from the topic to the subscription is shared with
691          * the owner of the subscription, which will explicitly unsubscribe
692          * to release it.
693          *
694          * If we bumped the refcount here, the owner would have to unsubscribe
695          * and cleanup, which is a bit awkward. */
696         AST_VECTOR_APPEND(&topic->subscribers, sub);
697
698         for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
699                 topic_add_subscription(
700                         AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
701         }
702         ao2_unlock(topic);
703
704         return 0;
705 }
706
707 static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
708 {
709         size_t idx;
710         int res;
711
712         ao2_lock(topic);
713         for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
714                 topic_remove_subscription(
715                         AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
716         }
717         res = AST_VECTOR_REMOVE_ELEM_UNORDERED(&topic->subscribers, sub,
718                 AST_VECTOR_ELEM_CLEANUP_NOOP);
719         ao2_unlock(topic);
720
721         return res;
722 }
723
724 /*!
725  * \internal \brief Dispatch a message to a subscriber asynchronously
726  * \param local \ref ast_taskprocessor_local object
727  * \return 0
728  */
729 static int dispatch_exec_async(struct ast_taskprocessor_local *local)
730 {
731         struct stasis_subscription *sub = local->local_data;
732         struct stasis_message *message = local->data;
733
734         subscription_invoke(sub, message);
735         ao2_cleanup(message);
736
737         return 0;
738 }
739
740 /*!
741  * \internal \brief Data passed to \ref dispatch_exec_sync to synchronize
742  * a published message to a subscriber
743  */
744 struct sync_task_data {
745         ast_mutex_t lock;
746         ast_cond_t cond;
747         int complete;
748         void *task_data;
749 };
750
751 /*!
752  * \internal \brief Dispatch a message to a subscriber synchronously
753  * \param local \ref ast_taskprocessor_local object
754  * \return 0
755  */
756 static int dispatch_exec_sync(struct ast_taskprocessor_local *local)
757 {
758         struct stasis_subscription *sub = local->local_data;
759         struct sync_task_data *std = local->data;
760         struct stasis_message *message = std->task_data;
761
762         subscription_invoke(sub, message);
763         ao2_cleanup(message);
764
765         ast_mutex_lock(&std->lock);
766         std->complete = 1;
767         ast_cond_signal(&std->cond);
768         ast_mutex_unlock(&std->lock);
769
770         return 0;
771 }
772
773 /*!
774  * \internal \brief Dispatch a message to a subscriber
775  * \param sub The subscriber to dispatch to
776  * \param message The message to send
777  * \param synchronous If non-zero, synchronize on the subscriber receiving
778  * the message
779  */
780 static void dispatch_message(struct stasis_subscription *sub,
781         struct stasis_message *message,
782         int synchronous)
783 {
784         if (!sub->mailbox) {
785                 /* Dispatch directly */
786                 subscription_invoke(sub, message);
787                 return;
788         }
789
790         /* Bump the message for the taskprocessor push. This will get de-ref'd
791          * by the task processor callback.
792          */
793         ao2_bump(message);
794         if (!synchronous) {
795                 if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec_async, message)) {
796                         /* Push failed; ugh. */
797                         ast_log(LOG_ERROR, "Dropping async dispatch\n");
798                         ao2_cleanup(message);
799                 }
800         } else {
801                 struct sync_task_data std;
802
803                 ast_mutex_init(&std.lock);
804                 ast_cond_init(&std.cond, NULL);
805                 std.complete = 0;
806                 std.task_data = message;
807
808                 if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec_sync, &std)) {
809                         /* Push failed; ugh. */
810                         ast_log(LOG_ERROR, "Dropping sync dispatch\n");
811                         ao2_cleanup(message);
812                         ast_mutex_destroy(&std.lock);
813                         ast_cond_destroy(&std.cond);
814                         return;
815                 }
816
817                 ast_mutex_lock(&std.lock);
818                 while (!std.complete) {
819                         ast_cond_wait(&std.cond, &std.lock);
820                 }
821                 ast_mutex_unlock(&std.lock);
822
823                 ast_mutex_destroy(&std.lock);
824                 ast_cond_destroy(&std.cond);
825         }
826 }
827
828 /*!
829  * \internal \brief Publish a message to a topic's subscribers
830  * \brief topic The topic to publish to
831  * \brief message The message to publish
832  * \brief sync_sub An optional subscriber of the topic to publish synchronously
833  * to
834  */
835 static void publish_msg(struct stasis_topic *topic,
836         struct stasis_message *message, struct stasis_subscription *sync_sub)
837 {
838         size_t i;
839
840         ast_assert(topic != NULL);
841         ast_assert(message != NULL);
842
843         /*
844          * The topic may be unref'ed by the subscription invocation.
845          * Make sure we hold onto a reference while dispatching.
846          */
847         ao2_ref(topic, +1);
848         ao2_lock(topic);
849         for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
850                 struct stasis_subscription *sub = AST_VECTOR_GET(&topic->subscribers, i);
851
852                 ast_assert(sub != NULL);
853
854                 dispatch_message(sub, message, (sub == sync_sub));
855         }
856         ao2_unlock(topic);
857         ao2_ref(topic, -1);
858 }
859
860 void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
861 {
862         publish_msg(topic, message, NULL);
863 }
864
865 void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message)
866 {
867         ast_assert(sub != NULL);
868
869         publish_msg(sub->topic, message, sub);
870 }
871
872 /*!
873  * \brief Forwarding information
874  *
875  * Any message posted to \a from_topic is forwarded to \a to_topic.
876  *
877  * In cases where both the \a from_topic and \a to_topic need to be locked,
878  * always lock the \a to_topic first, then the \a from_topic. Lest you deadlock.
879  */
880 struct stasis_forward {
881         /*! Originating topic */
882         struct stasis_topic *from_topic;
883         /*! Destination topic */
884         struct stasis_topic *to_topic;
885 };
886
887 static void forward_dtor(void *obj)
888 {
889         struct stasis_forward *forward = obj;
890
891         ao2_cleanup(forward->from_topic);
892         forward->from_topic = NULL;
893         ao2_cleanup(forward->to_topic);
894         forward->to_topic = NULL;
895 }
896
897 struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward)
898 {
899         int idx;
900         struct stasis_topic *from;
901         struct stasis_topic *to;
902
903         if (!forward) {
904                 return NULL;
905         }
906
907         from = forward->from_topic;
908         to = forward->to_topic;
909
910         if (from && to) {
911                 topic_lock_both(to, from);
912                 AST_VECTOR_REMOVE_ELEM_UNORDERED(&to->upstream_topics, from,
913                         AST_VECTOR_ELEM_CLEANUP_NOOP);
914
915                 for (idx = 0; idx < AST_VECTOR_SIZE(&to->subscribers); ++idx) {
916                         topic_remove_subscription(from, AST_VECTOR_GET(&to->subscribers, idx));
917                 }
918                 ao2_unlock(from);
919                 ao2_unlock(to);
920         }
921
922         ao2_cleanup(forward);
923
924         return NULL;
925 }
926
927 struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic,
928         struct stasis_topic *to_topic)
929 {
930         int res;
931         size_t idx;
932         struct stasis_forward *forward;
933
934         if (!from_topic || !to_topic) {
935                 return NULL;
936         }
937
938         forward = ao2_alloc_options(sizeof(*forward), forward_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
939         if (!forward) {
940                 return NULL;
941         }
942
943         /* Forwards to ourselves are implicit. */
944         if (to_topic == from_topic) {
945                 return forward;
946         }
947
948         forward->from_topic = ao2_bump(from_topic);
949         forward->to_topic = ao2_bump(to_topic);
950
951         topic_lock_both(to_topic, from_topic);
952         res = AST_VECTOR_APPEND(&to_topic->upstream_topics, from_topic);
953         if (res != 0) {
954                 ao2_unlock(from_topic);
955                 ao2_unlock(to_topic);
956                 ao2_ref(forward, -1);
957                 return NULL;
958         }
959
960         for (idx = 0; idx < AST_VECTOR_SIZE(&to_topic->subscribers); ++idx) {
961                 topic_add_subscription(from_topic, AST_VECTOR_GET(&to_topic->subscribers, idx));
962         }
963         ao2_unlock(from_topic);
964         ao2_unlock(to_topic);
965
966         return forward;
967 }
968
969 static void subscription_change_dtor(void *obj)
970 {
971         struct stasis_subscription_change *change = obj;
972
973         ast_string_field_free_memory(change);
974         ao2_cleanup(change->topic);
975 }
976
977 static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, const char *uniqueid, const char *description)
978 {
979         struct stasis_subscription_change *change;
980
981         change = ao2_alloc(sizeof(struct stasis_subscription_change), subscription_change_dtor);
982         if (!change || ast_string_field_init(change, 128)) {
983                 ao2_cleanup(change);
984                 return NULL;
985         }
986
987         ast_string_field_set(change, uniqueid, uniqueid);
988         ast_string_field_set(change, description, description);
989         ao2_ref(topic, +1);
990         change->topic = topic;
991
992         return change;
993 }
994
995 static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
996 {
997         struct stasis_subscription_change *change;
998         struct stasis_message *msg;
999
1000         /* This assumes that we have already unsubscribed */
1001         ast_assert(stasis_subscription_is_subscribed(sub));
1002
1003         if (!stasis_subscription_change_type()) {
1004                 return;
1005         }
1006
1007         change = subscription_change_alloc(topic, sub->uniqueid, "Subscribe");
1008         if (!change) {
1009                 return;
1010         }
1011
1012         msg = stasis_message_create(stasis_subscription_change_type(), change);
1013         if (!msg) {
1014                 ao2_cleanup(change);
1015                 return;
1016         }
1017
1018         stasis_publish(topic, msg);
1019         ao2_cleanup(msg);
1020         ao2_cleanup(change);
1021 }
1022
1023 static void send_subscription_unsubscribe(struct stasis_topic *topic,
1024         struct stasis_subscription *sub)
1025 {
1026         struct stasis_subscription_change *change;
1027         struct stasis_message *msg;
1028
1029         /* This assumes that we have already unsubscribed */
1030         ast_assert(!stasis_subscription_is_subscribed(sub));
1031
1032         if (!stasis_subscription_change_type()) {
1033                 return;
1034         }
1035
1036         change = subscription_change_alloc(topic, sub->uniqueid, "Unsubscribe");
1037         if (!change) {
1038                 return;
1039         }
1040
1041         msg = stasis_message_create(stasis_subscription_change_type(), change);
1042         if (!msg) {
1043                 ao2_cleanup(change);
1044                 return;
1045         }
1046
1047         stasis_publish(topic, msg);
1048
1049         /* Now we have to dispatch to the subscription itself */
1050         dispatch_message(sub, msg, 0);
1051
1052         ao2_cleanup(msg);
1053         ao2_cleanup(change);
1054 }
1055
1056 struct topic_pool_entry {
1057         struct stasis_forward *forward;
1058         struct stasis_topic *topic;
1059 };
1060
1061 static void topic_pool_entry_dtor(void *obj)
1062 {
1063         struct topic_pool_entry *entry = obj;
1064
1065         entry->forward = stasis_forward_cancel(entry->forward);
1066         ao2_cleanup(entry->topic);
1067         entry->topic = NULL;
1068 }
1069
1070 static struct topic_pool_entry *topic_pool_entry_alloc(void)
1071 {
1072         return ao2_alloc_options(sizeof(struct topic_pool_entry), topic_pool_entry_dtor,
1073                 AO2_ALLOC_OPT_LOCK_NOLOCK);
1074 }
1075
1076 struct stasis_topic_pool {
1077         struct ao2_container *pool_container;
1078         struct stasis_topic *pool_topic;
1079 };
1080
1081 static void topic_pool_dtor(void *obj)
1082 {
1083         struct stasis_topic_pool *pool = obj;
1084
1085 #ifdef AO2_DEBUG
1086         {
1087                 char *container_name =
1088                         ast_alloca(strlen(stasis_topic_name(pool->pool_topic)) + strlen("-pool") + 1);
1089                 sprintf(container_name, "%s-pool", stasis_topic_name(pool->pool_topic));
1090                 ao2_container_unregister(container_name);
1091         }
1092 #endif
1093
1094         ao2_cleanup(pool->pool_container);
1095         pool->pool_container = NULL;
1096         ao2_cleanup(pool->pool_topic);
1097         pool->pool_topic = NULL;
1098 }
1099
1100 static int topic_pool_entry_hash(const void *obj, const int flags)
1101 {
1102         const struct topic_pool_entry *object;
1103         const char *key;
1104
1105         switch (flags & OBJ_SEARCH_MASK) {
1106         case OBJ_SEARCH_KEY:
1107                 key = obj;
1108                 break;
1109         case OBJ_SEARCH_OBJECT:
1110                 object = obj;
1111                 key = stasis_topic_name(object->topic);
1112                 break;
1113         default:
1114                 /* Hash can only work on something with a full key. */
1115                 ast_assert(0);
1116                 return 0;
1117         }
1118         return ast_str_case_hash(key);
1119 }
1120
1121 static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
1122 {
1123         const struct topic_pool_entry *object_left = obj;
1124         const struct topic_pool_entry *object_right = arg;
1125         const char *right_key = arg;
1126         int cmp;
1127
1128         switch (flags & OBJ_SEARCH_MASK) {
1129         case OBJ_SEARCH_OBJECT:
1130                 right_key = stasis_topic_name(object_right->topic);
1131                 /* Fall through */
1132         case OBJ_SEARCH_KEY:
1133                 cmp = strcasecmp(stasis_topic_name(object_left->topic), right_key);
1134                 break;
1135         case OBJ_SEARCH_PARTIAL_KEY:
1136                 /* Not supported by container */
1137                 ast_assert(0);
1138                 cmp = -1;
1139                 break;
1140         default:
1141                 /*
1142                  * What arg points to is specific to this traversal callback
1143                  * and has no special meaning to astobj2.
1144                  */
1145                 cmp = 0;
1146                 break;
1147         }
1148         if (cmp) {
1149                 return 0;
1150         }
1151         /*
1152          * At this point the traversal callback is identical to a sorted
1153          * container.
1154          */
1155         return CMP_MATCH;
1156 }
1157
1158 #ifdef AO2_DEBUG
1159 static void topic_pool_prnt_obj(void *v_obj, void *where, ao2_prnt_fn *prnt)
1160 {
1161         struct topic_pool_entry *entry = v_obj;
1162
1163         if (!entry) {
1164                 return;
1165         }
1166         prnt(where, "%s", stasis_topic_name(entry->topic));
1167 }
1168 #endif
1169
1170 struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic)
1171 {
1172         struct stasis_topic_pool *pool;
1173
1174         pool = ao2_alloc_options(sizeof(*pool), topic_pool_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
1175         if (!pool) {
1176                 return NULL;
1177         }
1178
1179         pool->pool_container = ao2_container_alloc(TOPIC_POOL_BUCKETS,
1180                 topic_pool_entry_hash, topic_pool_entry_cmp);
1181         if (!pool->pool_container) {
1182                 ao2_cleanup(pool);
1183                 return NULL;
1184         }
1185
1186 #ifdef AO2_DEBUG
1187         {
1188                 char *container_name =
1189                         ast_alloca(strlen(stasis_topic_name(pooled_topic)) + strlen("-pool") + 1);
1190                 sprintf(container_name, "%s-pool", stasis_topic_name(pooled_topic));
1191                 ao2_container_register(container_name, pool->pool_container, topic_pool_prnt_obj);
1192         }
1193 #endif
1194
1195         ao2_ref(pooled_topic, +1);
1196         pool->pool_topic = pooled_topic;
1197
1198         return pool;
1199 }
1200
1201 void stasis_topic_pool_delete_topic(struct stasis_topic_pool *pool, const char *topic_name)
1202 {
1203         ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY | OBJ_NODATA | OBJ_UNLINK);
1204 }
1205
1206 struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
1207 {
1208         RAII_VAR(struct topic_pool_entry *, topic_pool_entry, NULL, ao2_cleanup);
1209         SCOPED_AO2LOCK(topic_container_lock, pool->pool_container);
1210
1211         topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1212         if (topic_pool_entry) {
1213                 return topic_pool_entry->topic;
1214         }
1215
1216         topic_pool_entry = topic_pool_entry_alloc();
1217         if (!topic_pool_entry) {
1218                 return NULL;
1219         }
1220
1221         topic_pool_entry->topic = stasis_topic_create(topic_name);
1222         if (!topic_pool_entry->topic) {
1223                 return NULL;
1224         }
1225
1226         topic_pool_entry->forward = stasis_forward_all(topic_pool_entry->topic, pool->pool_topic);
1227         if (!topic_pool_entry->forward) {
1228                 return NULL;
1229         }
1230
1231         if (!ao2_link_flags(pool->pool_container, topic_pool_entry, OBJ_NOLOCK)) {
1232                 return NULL;
1233         }
1234
1235         return topic_pool_entry->topic;
1236 }
1237
1238 int stasis_topic_pool_topic_exists(const struct stasis_topic_pool *pool, const char *topic_name)
1239 {
1240         struct topic_pool_entry *topic_pool_entry;
1241
1242         topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY);
1243         if (!topic_pool_entry) {
1244                 return 0;
1245         }
1246
1247         ao2_ref(topic_pool_entry, -1);
1248         return 1;
1249 }
1250
1251 void stasis_log_bad_type_access(const char *name)
1252 {
1253 #ifdef AST_DEVMODE
1254         if (!stasis_message_type_declined(name)) {
1255                 ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name);
1256         }
1257 #endif
1258 }
1259
1260 /*! \brief A multi object blob data structure to carry user event stasis messages */
1261 struct ast_multi_object_blob {
1262         struct ast_json *blob;                             /*< A blob of JSON data */
1263         AST_VECTOR(, void *) snapshots[STASIS_UMOS_MAX];   /*< Vector of snapshots for each type */
1264 };
1265
1266 /*!
1267  * \internal
1268  * \brief Destructor for \ref ast_multi_object_blob objects
1269  */
1270 static void multi_object_blob_dtor(void *obj)
1271 {
1272         struct ast_multi_object_blob *multi = obj;
1273         int type;
1274         int i;
1275
1276         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1277                 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
1278                         ao2_cleanup(AST_VECTOR_GET(&multi->snapshots[type], i));
1279                 }
1280                 AST_VECTOR_FREE(&multi->snapshots[type]);
1281         }
1282         ast_json_unref(multi->blob);
1283 }
1284
1285 /*! \brief Create a stasis user event multi object blob */
1286 struct ast_multi_object_blob *ast_multi_object_blob_create(struct ast_json *blob)
1287 {
1288         int type;
1289         struct ast_multi_object_blob *multi;
1290
1291         ast_assert(blob != NULL);
1292
1293         multi = ao2_alloc(sizeof(*multi), multi_object_blob_dtor);
1294         if (!multi) {
1295                 return NULL;
1296         }
1297
1298         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1299                 if (AST_VECTOR_INIT(&multi->snapshots[type], 0)) {
1300                         ao2_ref(multi, -1);
1301
1302                         return NULL;
1303                 }
1304         }
1305
1306         multi->blob = ast_json_ref(blob);
1307
1308         return multi;
1309 }
1310
1311 /*! \brief Add an object (snapshot) to the blob */
1312 void ast_multi_object_blob_add(struct ast_multi_object_blob *multi,
1313         enum stasis_user_multi_object_snapshot_type type, void *object)
1314 {
1315         if (!multi || !object || AST_VECTOR_APPEND(&multi->snapshots[type], object)) {
1316                 ao2_cleanup(object);
1317         }
1318 }
1319
1320 /*! \brief Publish single channel user event (for app_userevent compatibility) */
1321 void ast_multi_object_blob_single_channel_publish(struct ast_channel *chan,
1322         struct stasis_message_type *type, struct ast_json *blob)
1323 {
1324         struct stasis_message *message;
1325         struct ast_channel_snapshot *channel_snapshot;
1326         struct ast_multi_object_blob *multi;
1327
1328         if (!type) {
1329                 return;
1330         }
1331
1332         multi = ast_multi_object_blob_create(blob);
1333         if (!multi) {
1334                 return;
1335         }
1336
1337         channel_snapshot = ast_channel_snapshot_create(chan);
1338         if (!channel_snapshot) {
1339                 ao2_ref(multi, -1);
1340                 return;
1341         }
1342
1343         /* this call steals the channel_snapshot reference */
1344         ast_multi_object_blob_add(multi, STASIS_UMOS_CHANNEL, channel_snapshot);
1345
1346         message = stasis_message_create(type, multi);
1347         ao2_ref(multi, -1);
1348         if (message) {
1349                 /* app_userevent still publishes to channel */
1350                 stasis_publish(ast_channel_topic(chan), message);
1351                 ao2_ref(message, -1);
1352         }
1353 }
1354
1355 /*! \internal \brief convert multi object blob to ari json */
1356 static struct ast_json *multi_user_event_to_json(
1357         struct stasis_message *message,
1358         const struct stasis_message_sanitizer *sanitize)
1359 {
1360         struct ast_json *out;
1361         struct ast_multi_object_blob *multi = stasis_message_data(message);
1362         struct ast_json *blob = multi->blob;
1363         const struct timeval *tv = stasis_message_timestamp(message);
1364         enum stasis_user_multi_object_snapshot_type type;
1365         int i;
1366
1367         out = ast_json_object_create();
1368         if (!out) {
1369                 return NULL;
1370         }
1371
1372         ast_json_object_set(out, "type", ast_json_string_create("ChannelUserevent"));
1373         ast_json_object_set(out, "timestamp", ast_json_timeval(*tv, NULL));
1374         ast_json_object_set(out, "eventname", ast_json_ref(ast_json_object_get(blob, "eventname")));
1375         ast_json_object_set(out, "userevent", ast_json_ref(blob));
1376
1377         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1378                 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
1379                         struct ast_json *json_object = NULL;
1380                         char *name = NULL;
1381                         void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
1382
1383                         switch (type) {
1384                         case STASIS_UMOS_CHANNEL:
1385                                 json_object = ast_channel_snapshot_to_json(snapshot, sanitize);
1386                                 name = "channel";
1387                                 break;
1388                         case STASIS_UMOS_BRIDGE:
1389                                 json_object = ast_bridge_snapshot_to_json(snapshot, sanitize);
1390                                 name = "bridge";
1391                                 break;
1392                         case STASIS_UMOS_ENDPOINT:
1393                                 json_object = ast_endpoint_snapshot_to_json(snapshot, sanitize);
1394                                 name = "endpoint";
1395                                 break;
1396                         }
1397                         if (json_object) {
1398                                 ast_json_object_set(out, name, json_object);
1399                         }
1400                 }
1401         }
1402
1403         return out;
1404 }
1405
1406 /*! \internal \brief convert multi object blob to ami string */
1407 static struct ast_str *multi_object_blob_to_ami(void *obj)
1408 {
1409         struct ast_str *ami_str=ast_str_create(1024);
1410         struct ast_str *ami_snapshot;
1411         const struct ast_multi_object_blob *multi = obj;
1412         enum stasis_user_multi_object_snapshot_type type;
1413         int i;
1414
1415         if (!ami_str) {
1416                 return NULL;
1417         }
1418         if (!multi) {
1419                 ast_free(ami_str);
1420                 return NULL;
1421         }
1422
1423         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1424                 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
1425                         char *name = NULL;
1426                         void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
1427                         ami_snapshot = NULL;
1428
1429                         if (i > 0) {
1430                                 ast_asprintf(&name, "%d", i + 1);
1431                         }
1432
1433                         switch (type) {
1434                         case STASIS_UMOS_CHANNEL:
1435                                 ami_snapshot = ast_manager_build_channel_state_string_prefix(snapshot, name ?: "");
1436                                 break;
1437
1438                         case STASIS_UMOS_BRIDGE:
1439                                 ami_snapshot = ast_manager_build_bridge_state_string_prefix(snapshot, name ?: "");
1440                                 break;
1441
1442                         case STASIS_UMOS_ENDPOINT:
1443                                 /* currently not sending endpoint snapshots to AMI */
1444                                 break;
1445                         }
1446                         if (ami_snapshot) {
1447                                 ast_str_append(&ami_str, 0, "%s", ast_str_buffer(ami_snapshot));
1448                                 ast_free(ami_snapshot);
1449                         }
1450                         ast_free(name);
1451                 }
1452         }
1453
1454         return ami_str;
1455 }
1456
1457 /*! \internal \brief Callback to pass only user defined parameters from blob */
1458 static int userevent_exclusion_cb(const char *key)
1459 {
1460         if (!strcmp("eventname", key)) {
1461                 return 1;
1462         }
1463         return 0;
1464 }
1465
1466 static struct ast_manager_event_blob *multi_user_event_to_ami(
1467         struct stasis_message *message)
1468 {
1469         RAII_VAR(struct ast_str *, object_string, NULL, ast_free);
1470         RAII_VAR(struct ast_str *, body, NULL, ast_free);
1471         struct ast_multi_object_blob *multi = stasis_message_data(message);
1472         const char *eventname;
1473
1474         eventname = ast_json_string_get(ast_json_object_get(multi->blob, "eventname"));
1475         body = ast_manager_str_from_json_object(multi->blob, userevent_exclusion_cb);
1476         object_string = multi_object_blob_to_ami(multi);
1477         if (!object_string || !body) {
1478                 return NULL;
1479         }
1480
1481         return ast_manager_event_blob_create(EVENT_FLAG_USER, "UserEvent",
1482                 "%s"
1483                 "UserEvent: %s\r\n"
1484                 "%s",
1485                 ast_str_buffer(object_string),
1486                 eventname,
1487                 ast_str_buffer(body));
1488 }
1489
1490 /*! \brief A structure to hold global configuration-related options */
1491 struct stasis_declined_config {
1492         /*! The list of message types to decline */
1493         struct ao2_container *declined;
1494 };
1495
1496 /*! \brief Threadpool configuration options */
1497 struct stasis_threadpool_conf {
1498         /*! Initial size of the thread pool */
1499         int initial_size;
1500         /*! Time, in seconds, before we expire a thread */
1501         int idle_timeout_sec;
1502         /*! Maximum number of thread to allow */
1503         int max_size;
1504 };
1505
1506 struct stasis_config {
1507         /*! Thread pool configuration options */
1508         struct stasis_threadpool_conf *threadpool_options;
1509         /*! Declined message types */
1510         struct stasis_declined_config *declined_message_types;
1511 };
1512
1513 static struct aco_type threadpool_option = {
1514         .type = ACO_GLOBAL,
1515         .name = "threadpool",
1516         .item_offset = offsetof(struct stasis_config, threadpool_options),
1517         .category = "threadpool",
1518         .category_match = ACO_WHITELIST_EXACT,
1519 };
1520
1521 static struct aco_type *threadpool_options[] = ACO_TYPES(&threadpool_option);
1522
1523 /*! \brief An aco_type structure to link the "declined_message_types" category to the stasis_declined_config type */
1524 static struct aco_type declined_option = {
1525         .type = ACO_GLOBAL,
1526         .name = "declined_message_types",
1527         .item_offset = offsetof(struct stasis_config, declined_message_types),
1528         .category_match = ACO_WHITELIST_EXACT,
1529         .category = "declined_message_types",
1530 };
1531
1532 struct aco_type *declined_options[] = ACO_TYPES(&declined_option);
1533
1534 struct aco_file stasis_conf = {
1535         .filename = "stasis.conf",
1536         .types = ACO_TYPES(&declined_option, &threadpool_option),
1537 };
1538
1539 /*! \brief A global object container that will contain the stasis_config that gets swapped out on reloads */
1540 static AO2_GLOBAL_OBJ_STATIC(globals);
1541
1542 static void *stasis_config_alloc(void);
1543
1544 /*! \brief Register information about the configs being processed by this module */
1545 CONFIG_INFO_CORE("stasis", cfg_info, globals, stasis_config_alloc,
1546         .files = ACO_FILES(&stasis_conf),
1547 );
1548
1549 static void stasis_declined_config_destructor(void *obj)
1550 {
1551         struct stasis_declined_config *declined = obj;
1552
1553         ao2_cleanup(declined->declined);
1554 }
1555
1556 static void stasis_config_destructor(void *obj)
1557 {
1558         struct stasis_config *cfg = obj;
1559
1560         ao2_cleanup(cfg->declined_message_types);
1561         ast_free(cfg->threadpool_options);
1562 }
1563
1564 static void *stasis_config_alloc(void)
1565 {
1566         struct stasis_config *cfg;
1567
1568         if (!(cfg = ao2_alloc(sizeof(*cfg), stasis_config_destructor))) {
1569                 return NULL;
1570         }
1571
1572         cfg->threadpool_options = ast_calloc(1, sizeof(*cfg->threadpool_options));
1573         if (!cfg->threadpool_options) {
1574                 ao2_ref(cfg, -1);
1575                 return NULL;
1576         }
1577
1578         cfg->declined_message_types = ao2_alloc(sizeof(*cfg->declined_message_types),
1579                 stasis_declined_config_destructor);
1580         if (!cfg->declined_message_types) {
1581                 ao2_ref(cfg, -1);
1582                 return NULL;
1583         }
1584
1585         cfg->declined_message_types->declined = ast_str_container_alloc(13);
1586         if (!cfg->declined_message_types->declined) {
1587                 ao2_ref(cfg, -1);
1588                 return NULL;
1589         }
1590
1591         return cfg;
1592 }
1593
1594 int stasis_message_type_declined(const char *name)
1595 {
1596         struct stasis_config *cfg = ao2_global_obj_ref(globals);
1597         char *name_in_declined;
1598         int res;
1599
1600         if (!cfg || !cfg->declined_message_types) {
1601                 ao2_cleanup(cfg);
1602                 return 0;
1603         }
1604
1605         name_in_declined = ao2_find(cfg->declined_message_types->declined, name, OBJ_SEARCH_KEY);
1606         res = name_in_declined ? 1 : 0;
1607         ao2_cleanup(name_in_declined);
1608         ao2_ref(cfg, -1);
1609         if (res) {
1610                 ast_log(LOG_NOTICE, "Declining to allocate Stasis message type '%s' due to configuration\n", name);
1611         }
1612         return res;
1613 }
1614
1615 static int declined_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
1616 {
1617         struct stasis_declined_config *declined = obj;
1618
1619         if (ast_strlen_zero(var->value)) {
1620                 return 0;
1621         }
1622
1623         if (ast_str_container_add(declined->declined, var->value)) {
1624                 return -1;
1625         }
1626
1627         return 0;
1628 }
1629
1630 /*!
1631  * @{ \brief Define multi user event message type(s).
1632  */
1633
1634 STASIS_MESSAGE_TYPE_DEFN(ast_multi_user_event_type,
1635         .to_json = multi_user_event_to_json,
1636         .to_ami = multi_user_event_to_ami,
1637         );
1638
1639 /*! @} */
1640
1641 /*! \brief Cleanup function for graceful shutdowns */
1642 static void stasis_cleanup(void)
1643 {
1644         ast_threadpool_shutdown(pool);
1645         pool = NULL;
1646         STASIS_MESSAGE_TYPE_CLEANUP(stasis_subscription_change_type);
1647         STASIS_MESSAGE_TYPE_CLEANUP(ast_multi_user_event_type);
1648         aco_info_destroy(&cfg_info);
1649         ao2_global_obj_release(globals);
1650 }
1651
1652 int stasis_init(void)
1653 {
1654         struct stasis_config *cfg;
1655         int cache_init;
1656         struct ast_threadpool_options threadpool_opts = { 0, };
1657
1658         /* Be sure the types are cleaned up after the message bus */
1659         ast_register_cleanup(stasis_cleanup);
1660
1661         if (aco_info_init(&cfg_info)) {
1662                 return -1;
1663         }
1664
1665         aco_option_register_custom(&cfg_info, "decline", ACO_EXACT,
1666                 declined_options, "", declined_handler, 0);
1667         aco_option_register(&cfg_info, "initial_size", ACO_EXACT,
1668                 threadpool_options, "5", OPT_INT_T, PARSE_IN_RANGE,
1669                 FLDSET(struct stasis_threadpool_conf, initial_size), 0,
1670                 INT_MAX);
1671         aco_option_register(&cfg_info, "idle_timeout_sec", ACO_EXACT,
1672                 threadpool_options, "20", OPT_INT_T, PARSE_IN_RANGE,
1673                 FLDSET(struct stasis_threadpool_conf, idle_timeout_sec), 0,
1674                 INT_MAX);
1675         aco_option_register(&cfg_info, "max_size", ACO_EXACT,
1676                 threadpool_options, "50", OPT_INT_T, PARSE_IN_RANGE,
1677                 FLDSET(struct stasis_threadpool_conf, max_size), 0,
1678                 INT_MAX);
1679
1680         if (aco_process_config(&cfg_info, 0) == ACO_PROCESS_ERROR) {
1681                 struct stasis_config *default_cfg = stasis_config_alloc();
1682
1683                 if (!default_cfg) {
1684                         return -1;
1685                 }
1686
1687                 if (aco_set_defaults(&threadpool_option, "threadpool", default_cfg->threadpool_options)) {
1688                         ast_log(LOG_ERROR, "Failed to initialize defaults on Stasis configuration object\n");
1689                         ao2_ref(default_cfg, -1);
1690
1691                         return -1;
1692                 }
1693
1694                 if (aco_set_defaults(&declined_option, "declined_message_types", default_cfg->declined_message_types)) {
1695                         ast_log(LOG_ERROR, "Failed to load stasis.conf and failed to initialize defaults.\n");
1696                         ao2_ref(default_cfg, -1);
1697
1698                         return -1;
1699                 }
1700
1701                 ast_log(LOG_NOTICE, "Could not load Stasis configuration; using defaults\n");
1702                 ao2_global_obj_replace_unref(globals, default_cfg);
1703                 cfg = default_cfg;
1704         } else {
1705                 cfg = ao2_global_obj_ref(globals);
1706                 if (!cfg) {
1707                         ast_log(LOG_ERROR, "Failed to obtain Stasis configuration object\n");
1708
1709                         return -1;
1710                 }
1711         }
1712
1713         threadpool_opts.version = AST_THREADPOOL_OPTIONS_VERSION;
1714         threadpool_opts.initial_size = cfg->threadpool_options->initial_size;
1715         threadpool_opts.auto_increment = 1;
1716         threadpool_opts.max_size = cfg->threadpool_options->max_size;
1717         threadpool_opts.idle_timeout = cfg->threadpool_options->idle_timeout_sec;
1718         pool = ast_threadpool_create("stasis-core", NULL, &threadpool_opts);
1719         ao2_ref(cfg, -1);
1720         if (!pool) {
1721                 ast_log(LOG_ERROR, "Failed to create 'stasis-core' threadpool\n");
1722
1723                 return -1;
1724         }
1725
1726         cache_init = stasis_cache_init();
1727         if (cache_init != 0) {
1728                 return -1;
1729         }
1730
1731         if (STASIS_MESSAGE_TYPE_INIT(stasis_subscription_change_type) != 0) {
1732                 return -1;
1733         }
1734         if (STASIS_MESSAGE_TYPE_INIT(ast_multi_user_event_type) != 0) {
1735                 return -1;
1736         }
1737
1738         return 0;
1739 }