json: Take advantage of new API's.
[asterisk/asterisk.git] / main / stasis.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis Message Bus API.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 #include "asterisk/astobj2.h"
33 #include "asterisk/stasis_internal.h"
34 #include "asterisk/stasis.h"
35 #include "asterisk/taskprocessor.h"
36 #include "asterisk/threadpool.h"
37 #include "asterisk/utils.h"
38 #include "asterisk/uuid.h"
39 #include "asterisk/vector.h"
40 #include "asterisk/stasis_channels.h"
41 #include "asterisk/stasis_bridges.h"
42 #include "asterisk/stasis_endpoints.h"
43 #include "asterisk/config_options.h"
44
45 /*** DOCUMENTATION
46         <managerEvent language="en_US" name="UserEvent">
47                 <managerEventInstance class="EVENT_FLAG_USER">
48                         <synopsis>A user defined event raised from the dialplan.</synopsis>
49                         <syntax>
50                                 <channel_snapshot/>
51                                 <parameter name="UserEvent">
52                                         <para>The event name, as specified in the dialplan.</para>
53                                 </parameter>
54                         </syntax>
55                         <description>
56                                 <para>Event may contain additional arbitrary parameters in addition to optional bridge and endpoint snapshots.  Multiple snapshots of the same type are prefixed with a numeric value.</para>
57                         </description>
58                         <see-also>
59                                 <ref type="application">UserEvent</ref>
60                                 <ref type="managerEvent">UserEvent</ref>
61                         </see-also>
62                 </managerEventInstance>
63         </managerEvent>
64         <configInfo name="stasis" language="en_US">
65                 <configFile name="stasis.conf">
66                         <configObject name="threadpool">
67                                 <synopsis>Settings that configure the threadpool Stasis uses to deliver some messages.</synopsis>
68                                 <configOption name="initial_size" default="5">
69                                         <synopsis>Initial number of threads in the message bus threadpool.</synopsis>
70                                 </configOption>
71                                 <configOption name="idle_timeout_sec" default="20">
72                                         <synopsis>Number of seconds before an idle thread is disposed of.</synopsis>
73                                 </configOption>
74                                 <configOption name="max_size" default="50">
75                                         <synopsis>Maximum number of threads in the threadpool.</synopsis>
76                                 </configOption>
77                         </configObject>
78                         <configObject name="declined_message_types">
79                                 <synopsis>Stasis message types for which to decline creation.</synopsis>
80                                 <configOption name="decline">
81                                         <synopsis>The message type to decline.</synopsis>
82                                         <description>
83                                                 <para>This configuration option defines the name of the Stasis
84                                                 message type that Asterisk is forbidden from creating and can be
85                                                 specified as many times as necessary to achieve the desired result.</para>
86                                                 <enumlist>
87                                                         <enum name="stasis_app_recording_snapshot_type" />
88                                                         <enum name="stasis_app_playback_snapshot_type" />
89                                                         <enum name="stasis_test_message_type" />
90                                                         <enum name="confbridge_start_type" />
91                                                         <enum name="confbridge_end_type" />
92                                                         <enum name="confbridge_join_type" />
93                                                         <enum name="confbridge_leave_type" />
94                                                         <enum name="confbridge_start_record_type" />
95                                                         <enum name="confbridge_stop_record_type" />
96                                                         <enum name="confbridge_mute_type" />
97                                                         <enum name="confbridge_unmute_type" />
98                                                         <enum name="confbridge_talking_type" />
99                                                         <enum name="cel_generic_type" />
100                                                         <enum name="ast_bridge_snapshot_type" />
101                                                         <enum name="ast_bridge_merge_message_type" />
102                                                         <enum name="ast_channel_entered_bridge_type" />
103                                                         <enum name="ast_channel_left_bridge_type" />
104                                                         <enum name="ast_blind_transfer_type" />
105                                                         <enum name="ast_attended_transfer_type" />
106                                                         <enum name="ast_endpoint_snapshot_type" />
107                                                         <enum name="ast_endpoint_state_type" />
108                                                         <enum name="ast_device_state_message_type" />
109                                                         <enum name="ast_test_suite_message_type" />
110                                                         <enum name="ast_mwi_state_type" />
111                                                         <enum name="ast_mwi_vm_app_type" />
112                                                         <enum name="ast_format_register_type" />
113                                                         <enum name="ast_format_unregister_type" />
114                                                         <enum name="ast_manager_get_generic_type" />
115                                                         <enum name="ast_parked_call_type" />
116                                                         <enum name="ast_channel_snapshot_type" />
117                                                         <enum name="ast_channel_dial_type" />
118                                                         <enum name="ast_channel_varset_type" />
119                                                         <enum name="ast_channel_hangup_request_type" />
120                                                         <enum name="ast_channel_dtmf_begin_type" />
121                                                         <enum name="ast_channel_dtmf_end_type" />
122                                                         <enum name="ast_channel_hold_type" />
123                                                         <enum name="ast_channel_unhold_type" />
124                                                         <enum name="ast_channel_chanspy_start_type" />
125                                                         <enum name="ast_channel_chanspy_stop_type" />
126                                                         <enum name="ast_channel_fax_type" />
127                                                         <enum name="ast_channel_hangup_handler_type" />
128                                                         <enum name="ast_channel_moh_start_type" />
129                                                         <enum name="ast_channel_moh_stop_type" />
130                                                         <enum name="ast_channel_monitor_start_type" />
131                                                         <enum name="ast_channel_monitor_stop_type" />
132                                                         <enum name="ast_channel_agent_login_type" />
133                                                         <enum name="ast_channel_agent_logoff_type" />
134                                                         <enum name="ast_channel_talking_start" />
135                                                         <enum name="ast_channel_talking_stop" />
136                                                         <enum name="ast_security_event_type" />
137                                                         <enum name="ast_named_acl_change_type" />
138                                                         <enum name="ast_local_bridge_type" />
139                                                         <enum name="ast_local_optimization_begin_type" />
140                                                         <enum name="ast_local_optimization_end_type" />
141                                                         <enum name="stasis_subscription_change_type" />
142                                                         <enum name="ast_multi_user_event_type" />
143                                                         <enum name="stasis_cache_clear_type" />
144                                                         <enum name="stasis_cache_update_type" />
145                                                         <enum name="ast_network_change_type" />
146                                                         <enum name="ast_system_registry_type" />
147                                                         <enum name="ast_cc_available_type" />
148                                                         <enum name="ast_cc_offertimerstart_type" />
149                                                         <enum name="ast_cc_requested_type" />
150                                                         <enum name="ast_cc_requestacknowledged_type" />
151                                                         <enum name="ast_cc_callerstopmonitoring_type" />
152                                                         <enum name="ast_cc_callerstartmonitoring_type" />
153                                                         <enum name="ast_cc_callerrecalling_type" />
154                                                         <enum name="ast_cc_recallcomplete_type" />
155                                                         <enum name="ast_cc_failure_type" />
156                                                         <enum name="ast_cc_monitorfailed_type" />
157                                                         <enum name="ast_presence_state_message_type" />
158                                                         <enum name="ast_rtp_rtcp_sent_type" />
159                                                         <enum name="ast_rtp_rtcp_received_type" />
160                                                         <enum name="ast_call_pickup_type" />
161                                                         <enum name="aoc_s_type" />
162                                                         <enum name="aoc_d_type" />
163                                                         <enum name="aoc_e_type" />
164                                                         <enum name="dahdichannel_type" />
165                                                         <enum name="mcid_type" />
166                                                         <enum name="session_timeout_type" />
167                                                         <enum name="cdr_read_message_type" />
168                                                         <enum name="cdr_write_message_type" />
169                                                         <enum name="cdr_prop_write_message_type" />
170                                                         <enum name="corosync_ping_message_type" />
171                                                         <enum name="agi_exec_start_type" />
172                                                         <enum name="agi_exec_end_type" />
173                                                         <enum name="agi_async_start_type" />
174                                                         <enum name="agi_async_exec_type" />
175                                                         <enum name="agi_async_end_type" />
176                                                         <enum name="queue_caller_join_type" />
177                                                         <enum name="queue_caller_leave_type" />
178                                                         <enum name="queue_caller_abandon_type" />
179                                                         <enum name="queue_member_status_type" />
180                                                         <enum name="queue_member_added_type" />
181                                                         <enum name="queue_member_removed_type" />
182                                                         <enum name="queue_member_pause_type" />
183                                                         <enum name="queue_member_penalty_type" />
184                                                         <enum name="queue_member_ringinuse_type" />
185                                                         <enum name="queue_agent_called_type" />
186                                                         <enum name="queue_agent_connect_type" />
187                                                         <enum name="queue_agent_complete_type" />
188                                                         <enum name="queue_agent_dump_type" />
189                                                         <enum name="queue_agent_ringnoanswer_type" />
190                                                         <enum name="meetme_join_type" />
191                                                         <enum name="meetme_leave_type" />
192                                                         <enum name="meetme_end_type" />
193                                                         <enum name="meetme_mute_type" />
194                                                         <enum name="meetme_talking_type" />
195                                                         <enum name="meetme_talk_request_type" />
196                                                         <enum name="appcdr_message_type" />
197                                                         <enum name="forkcdr_message_type" />
198                                                         <enum name="cdr_sync_message_type" />
199                                                 </enumlist>
200                                         </description>
201                                 </configOption>
202                         </configObject>
203                 </configFile>
204         </configInfo>
205 ***/
206
207 /*!
208  * \page stasis-impl Stasis Implementation Notes
209  *
210  * \par Reference counting
211  *
212  * Stasis introduces a number of objects, which are tightly related to one
213  * another. Because we rely on ref-counting for memory management, understanding
214  * these relationships is important to understanding this code.
215  *
216  * \code{.txt}
217  *
218  *   stasis_topic <----> stasis_subscription
219  *             ^          ^
220  *              \        /
221  *               \      /
222  *               dispatch
223  *                  |
224  *                  |
225  *                  v
226  *            stasis_message
227  *                  |
228  *                  |
229  *                  v
230  *          stasis_message_type
231  *
232  * \endcode
233  *
234  * The most troubling thing in this chart is the cyclic reference between
235  * stasis_topic and stasis_subscription. This is both unfortunate, and
236  * necessary. Topics need the subscription in order to dispatch messages;
237  * subscriptions need the topics to unsubscribe and check subscription status.
238  *
239  * The cycle is broken by stasis_unsubscribe(). The unsubscribe will remove the
240  * topic's reference to a subscription. When the subcription is destroyed, it
241  * will remove its reference to the topic.
242  *
243  * This means that until a subscription has be explicitly unsubscribed, it will
244  * not be destroyed. Neither will a topic be destroyed while it has subscribers.
245  * The destructors of both have assertions regarding this to catch ref-counting
246  * problems where a subscription or topic has had an extra ao2_cleanup().
247  *
248  * The \ref dispatch object is a transient object, which is posted to a
249  * subscription's taskprocessor to send a message to the subscriber. They have
250  * short life cycles, allocated on one thread, destroyed on another.
251  *
252  * During shutdown, or the deletion of a domain object, there are a flurry of
253  * ao2_cleanup()s on subscriptions and topics, as the final in-flight messages
254  * are processed. Any one of these cleanups could be the one to actually destroy
255  * a given object, so care must be taken to ensure that an object isn't
256  * referenced after an ao2_cleanup(). This includes the implicit ao2_unlock()
257  * that might happen when a RAII_VAR() goes out of scope.
258  *
259  * \par Typical life cycles
260  *
261  *  \li stasis_topic - There are several topics which live for the duration of
262  *      the Asterisk process (ast_channel_topic_all(), etc.) but most of these
263  *      are actually fed by shorter-lived topics whose lifetime is associated
264  *      with some domain object (like ast_channel_topic() for a given
265  *      ast_channel).
266  *
267  *  \li stasis_subscription - Subscriptions have a similar mix of lifetimes as
268  *      topics, for similar reasons.
269  *
270  *  \li dispatch - Very short lived; just long enough to post a message to a
271  *      subscriber.
272  *
273  *  \li stasis_message - Short to intermediate lifetimes, but that is mostly
274  *      irrelevant. Messages are strictly data and have no behavior associated
275  *      with them, so it doesn't really matter if/when they are destroyed. By
276  *      design, a component could hold a ref to a message forever without any
277  *      ill consequences (aside from consuming more memory).
278  *
279  *  \li stasis_message_type - Long life cycles, typically only destroyed on
280  *      module unloading or _clean_ process exit.
281  *
282  * \par Subscriber shutdown sequencing
283  *
284  * Subscribers are sensitive to shutdown sequencing, specifically in how the
285  * reference message types. This is fully detailed on the wiki at
286  * https://wiki.asterisk.org/wiki/x/K4BqAQ.
287  *
288  * In short, the lifetime of the \a data (and \a callback, if in a module) must
289  * be held until the stasis_subscription_final_message() has been received.
290  * Depending on the structure of the subscriber code, this can be handled by
291  * using stasis_subscription_final_message() to free resources on the final
292  * message, or using stasis_subscription_join()/stasis_unsubscribe_and_join() to
293  * block until the unsubscribe has completed.
294  */
295
296 /*! Initial size of the subscribers list. */
297 #define INITIAL_SUBSCRIBERS_MAX 4
298
299 /*! The number of buckets to use for topic pools */
300 #define TOPIC_POOL_BUCKETS 57
301
302 /*! Thread pool for topics that don't want a dedicated taskprocessor */
303 static struct ast_threadpool *pool;
304
305 STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type);
306
307 /*! \internal */
308 struct stasis_topic {
309         char *name;
310         /*! Variable length array of the subscribers */
311         AST_VECTOR(, struct stasis_subscription *) subscribers;
312
313         /*! Topics forwarding into this topic */
314         AST_VECTOR(, struct stasis_topic *) upstream_topics;
315 };
316
317 /* Forward declarations for the tightly-coupled subscription object */
318 static int topic_add_subscription(struct stasis_topic *topic,
319         struct stasis_subscription *sub);
320
321 static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
322
323 /*! \brief Lock two topics. */
324 #define topic_lock_both(topic1, topic2) \
325         do { \
326                 ao2_lock(topic1); \
327                 while (ao2_trylock(topic2)) { \
328                         AO2_DEADLOCK_AVOIDANCE(topic1); \
329                 } \
330         } while (0)
331
332 static void topic_dtor(void *obj)
333 {
334         struct stasis_topic *topic = obj;
335
336         /* Subscribers hold a reference to topics, so they should all be
337          * unsubscribed before we get here. */
338         ast_assert(AST_VECTOR_SIZE(&topic->subscribers) == 0);
339
340         ast_free(topic->name);
341         topic->name = NULL;
342
343         AST_VECTOR_FREE(&topic->subscribers);
344         AST_VECTOR_FREE(&topic->upstream_topics);
345 }
346
347 struct stasis_topic *stasis_topic_create(const char *name)
348 {
349         struct stasis_topic *topic;
350         int res = 0;
351
352         topic = ao2_t_alloc(sizeof(*topic), topic_dtor, name);
353         if (!topic) {
354                 return NULL;
355         }
356
357         topic->name = ast_strdup(name);
358         res |= AST_VECTOR_INIT(&topic->subscribers, INITIAL_SUBSCRIBERS_MAX);
359         res |= AST_VECTOR_INIT(&topic->upstream_topics, 0);
360         if (!topic->name || res) {
361                 ao2_cleanup(topic);
362                 return NULL;
363         }
364
365         return topic;
366 }
367
368 const char *stasis_topic_name(const struct stasis_topic *topic)
369 {
370         return topic->name;
371 }
372
373 /*! \internal */
374 struct stasis_subscription {
375         /*! Unique ID for this subscription */
376         char uniqueid[AST_UUID_STR_LEN];
377         /*! Topic subscribed to. */
378         struct stasis_topic *topic;
379         /*! Mailbox for processing incoming messages. */
380         struct ast_taskprocessor *mailbox;
381         /*! Callback function for incoming message processing. */
382         stasis_subscription_cb callback;
383         /*! Data pointer to be handed to the callback. */
384         void *data;
385
386         /*! Condition for joining with subscription. */
387         ast_cond_t join_cond;
388         /*! Flag set when final message for sub has been received.
389          *  Be sure join_lock is held before reading/setting. */
390         int final_message_rxed;
391         /*! Flag set when final message for sub has been processed.
392          *  Be sure join_lock is held before reading/setting. */
393         int final_message_processed;
394 };
395
396 static void subscription_dtor(void *obj)
397 {
398         struct stasis_subscription *sub = obj;
399
400         /* Subscriptions need to be manually unsubscribed before destruction
401          * b/c there's a cyclic reference between topics and subscriptions */
402         ast_assert(!stasis_subscription_is_subscribed(sub));
403         /* If there are any messages in flight to this subscription; that would
404          * be bad. */
405         ast_assert(stasis_subscription_is_done(sub));
406
407         ao2_cleanup(sub->topic);
408         sub->topic = NULL;
409         ast_taskprocessor_unreference(sub->mailbox);
410         sub->mailbox = NULL;
411         ast_cond_destroy(&sub->join_cond);
412 }
413
414 /*!
415  * \brief Invoke the subscription's callback.
416  * \param sub Subscription to invoke.
417  * \param topic Topic message was published to.
418  * \param message Message to send.
419  */
420 static void subscription_invoke(struct stasis_subscription *sub,
421                                   struct stasis_message *message)
422 {
423         /* Notify that the final message has been received */
424         if (stasis_subscription_final_message(sub, message)) {
425                 ao2_lock(sub);
426                 sub->final_message_rxed = 1;
427                 ast_cond_signal(&sub->join_cond);
428                 ao2_unlock(sub);
429         }
430
431         /* Since sub is mostly immutable, no need to lock sub */
432         sub->callback(sub->data, sub, message);
433
434         /* Notify that the final message has been processed */
435         if (stasis_subscription_final_message(sub, message)) {
436                 ao2_lock(sub);
437                 sub->final_message_processed = 1;
438                 ast_cond_signal(&sub->join_cond);
439                 ao2_unlock(sub);
440         }
441 }
442
443 static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
444 static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
445
446 void stasis_subscription_cb_noop(void *data, struct stasis_subscription *sub, struct stasis_message *message)
447 {
448 }
449
450 struct stasis_subscription *internal_stasis_subscribe(
451         struct stasis_topic *topic,
452         stasis_subscription_cb callback,
453         void *data,
454         int needs_mailbox,
455         int use_thread_pool)
456 {
457         struct stasis_subscription *sub;
458
459         if (!topic) {
460                 return NULL;
461         }
462
463         /* The ao2 lock is used for join_cond. */
464         sub = ao2_t_alloc(sizeof(*sub), subscription_dtor, stasis_topic_name(topic));
465         if (!sub) {
466                 return NULL;
467         }
468         ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid));
469
470         if (needs_mailbox) {
471                 char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
472
473                 /* Create name with seq number appended. */
474                 ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "sub%c:%s",
475                         use_thread_pool ? 'p' : 'm',
476                         stasis_topic_name(topic));
477
478                 /*
479                  * With a small number of subscribers, a thread-per-sub is
480                  * acceptable. For a large number of subscribers, a thread
481                  * pool should be used.
482                  */
483                 if (use_thread_pool) {
484                         sub->mailbox = ast_threadpool_serializer(tps_name, pool);
485                 } else {
486                         sub->mailbox = ast_taskprocessor_get(tps_name, TPS_REF_DEFAULT);
487                 }
488                 if (!sub->mailbox) {
489                         ao2_ref(sub, -1);
490
491                         return NULL;
492                 }
493                 ast_taskprocessor_set_local(sub->mailbox, sub);
494                 /* Taskprocessor has a reference */
495                 ao2_ref(sub, +1);
496         }
497
498         ao2_ref(topic, +1);
499         sub->topic = topic;
500         sub->callback = callback;
501         sub->data = data;
502         ast_cond_init(&sub->join_cond, NULL);
503
504         if (topic_add_subscription(topic, sub) != 0) {
505                 ao2_ref(sub, -1);
506
507                 return NULL;
508         }
509         send_subscription_subscribe(topic, sub);
510
511         return sub;
512 }
513
514 struct stasis_subscription *stasis_subscribe(
515         struct stasis_topic *topic,
516         stasis_subscription_cb callback,
517         void *data)
518 {
519         return internal_stasis_subscribe(topic, callback, data, 1, 0);
520 }
521
522 struct stasis_subscription *stasis_subscribe_pool(
523         struct stasis_topic *topic,
524         stasis_subscription_cb callback,
525         void *data)
526 {
527         return internal_stasis_subscribe(topic, callback, data, 1, 1);
528 }
529
530 static int sub_cleanup(void *data)
531 {
532         struct stasis_subscription *sub = data;
533         ao2_cleanup(sub);
534         return 0;
535 }
536
537 struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
538 {
539         /* The subscription may be the last ref to this topic. Hold
540          * the topic ref open until after the unlock. */
541         struct stasis_topic *topic;
542
543         if (!sub) {
544                 return NULL;
545         }
546
547         topic = ao2_bump(sub->topic);
548
549         /* We have to remove the subscription first, to ensure the unsubscribe
550          * is the final message */
551         if (topic_remove_subscription(sub->topic, sub) != 0) {
552                 ast_log(LOG_ERROR,
553                         "Internal error: subscription has invalid topic\n");
554                 ao2_cleanup(topic);
555
556                 return NULL;
557         }
558
559         /* Now let everyone know about the unsubscribe */
560         send_subscription_unsubscribe(topic, sub);
561
562         /* When all that's done, remove the ref the mailbox has on the sub */
563         if (sub->mailbox) {
564                 ast_taskprocessor_push(sub->mailbox, sub_cleanup, sub);
565         }
566
567         /* Unsubscribing unrefs the subscription */
568         ao2_cleanup(sub);
569         ao2_cleanup(topic);
570
571         return NULL;
572 }
573
574 int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscription,
575         long low_water, long high_water)
576 {
577         int res = -1;
578
579         if (subscription) {
580                 res = ast_taskprocessor_alert_set_levels(subscription->mailbox,
581                         low_water, high_water);
582         }
583         return res;
584 }
585
586 void stasis_subscription_join(struct stasis_subscription *subscription)
587 {
588         if (subscription) {
589                 ao2_lock(subscription);
590                 /* Wait until the processed flag has been set */
591                 while (!subscription->final_message_processed) {
592                         ast_cond_wait(&subscription->join_cond,
593                                 ao2_object_get_lockaddr(subscription));
594                 }
595                 ao2_unlock(subscription);
596         }
597 }
598
599 int stasis_subscription_is_done(struct stasis_subscription *subscription)
600 {
601         if (subscription) {
602                 int ret;
603
604                 ao2_lock(subscription);
605                 ret = subscription->final_message_rxed;
606                 ao2_unlock(subscription);
607
608                 return ret;
609         }
610
611         /* Null subscription is about as done as you can get */
612         return 1;
613 }
614
615 struct stasis_subscription *stasis_unsubscribe_and_join(
616         struct stasis_subscription *subscription)
617 {
618         if (!subscription) {
619                 return NULL;
620         }
621
622         /* Bump refcount to hold it past the unsubscribe */
623         ao2_ref(subscription, +1);
624         stasis_unsubscribe(subscription);
625         stasis_subscription_join(subscription);
626         /* Now decrement the refcount back */
627         ao2_cleanup(subscription);
628         return NULL;
629 }
630
631 int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
632 {
633         if (sub) {
634                 size_t i;
635                 struct stasis_topic *topic = sub->topic;
636
637                 ao2_lock(topic);
638                 for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
639                         if (AST_VECTOR_GET(&topic->subscribers, i) == sub) {
640                                 ao2_unlock(topic);
641                                 return 1;
642                         }
643                 }
644                 ao2_unlock(topic);
645         }
646
647         return 0;
648 }
649
650 const char *stasis_subscription_uniqueid(const struct stasis_subscription *sub)
651 {
652         return sub->uniqueid;
653 }
654
655 int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
656 {
657         struct stasis_subscription_change *change;
658
659         if (stasis_message_type(msg) != stasis_subscription_change_type()) {
660                 return 0;
661         }
662
663         change = stasis_message_data(msg);
664         if (strcmp("Unsubscribe", change->description)) {
665                 return 0;
666         }
667
668         if (strcmp(stasis_subscription_uniqueid(sub), change->uniqueid)) {
669                 return 0;
670         }
671
672         return 1;
673 }
674
675 /*!
676  * \brief Add a subscriber to a topic.
677  * \param topic Topic
678  * \param sub Subscriber
679  * \return 0 on success
680  * \return Non-zero on error
681  */
682 static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
683 {
684         size_t idx;
685
686         ao2_lock(topic);
687         /* The reference from the topic to the subscription is shared with
688          * the owner of the subscription, which will explicitly unsubscribe
689          * to release it.
690          *
691          * If we bumped the refcount here, the owner would have to unsubscribe
692          * and cleanup, which is a bit awkward. */
693         AST_VECTOR_APPEND(&topic->subscribers, sub);
694
695         for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
696                 topic_add_subscription(
697                         AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
698         }
699         ao2_unlock(topic);
700
701         return 0;
702 }
703
704 static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
705 {
706         size_t idx;
707         int res;
708
709         ao2_lock(topic);
710         for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
711                 topic_remove_subscription(
712                         AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
713         }
714         res = AST_VECTOR_REMOVE_ELEM_UNORDERED(&topic->subscribers, sub,
715                 AST_VECTOR_ELEM_CLEANUP_NOOP);
716         ao2_unlock(topic);
717
718         return res;
719 }
720
721 /*!
722  * \internal \brief Dispatch a message to a subscriber asynchronously
723  * \param local \ref ast_taskprocessor_local object
724  * \return 0
725  */
726 static int dispatch_exec_async(struct ast_taskprocessor_local *local)
727 {
728         struct stasis_subscription *sub = local->local_data;
729         struct stasis_message *message = local->data;
730
731         subscription_invoke(sub, message);
732         ao2_cleanup(message);
733
734         return 0;
735 }
736
737 /*!
738  * \internal \brief Data passed to \ref dispatch_exec_sync to synchronize
739  * a published message to a subscriber
740  */
741 struct sync_task_data {
742         ast_mutex_t lock;
743         ast_cond_t cond;
744         int complete;
745         void *task_data;
746 };
747
748 /*!
749  * \internal \brief Dispatch a message to a subscriber synchronously
750  * \param local \ref ast_taskprocessor_local object
751  * \return 0
752  */
753 static int dispatch_exec_sync(struct ast_taskprocessor_local *local)
754 {
755         struct stasis_subscription *sub = local->local_data;
756         struct sync_task_data *std = local->data;
757         struct stasis_message *message = std->task_data;
758
759         subscription_invoke(sub, message);
760         ao2_cleanup(message);
761
762         ast_mutex_lock(&std->lock);
763         std->complete = 1;
764         ast_cond_signal(&std->cond);
765         ast_mutex_unlock(&std->lock);
766
767         return 0;
768 }
769
770 /*!
771  * \internal \brief Dispatch a message to a subscriber
772  * \param sub The subscriber to dispatch to
773  * \param message The message to send
774  * \param synchronous If non-zero, synchronize on the subscriber receiving
775  * the message
776  */
777 static void dispatch_message(struct stasis_subscription *sub,
778         struct stasis_message *message,
779         int synchronous)
780 {
781         if (!sub->mailbox) {
782                 /* Dispatch directly */
783                 subscription_invoke(sub, message);
784                 return;
785         }
786
787         /* Bump the message for the taskprocessor push. This will get de-ref'd
788          * by the task processor callback.
789          */
790         ao2_bump(message);
791         if (!synchronous) {
792                 if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec_async, message)) {
793                         /* Push failed; ugh. */
794                         ast_log(LOG_ERROR, "Dropping async dispatch\n");
795                         ao2_cleanup(message);
796                 }
797         } else {
798                 struct sync_task_data std;
799
800                 ast_mutex_init(&std.lock);
801                 ast_cond_init(&std.cond, NULL);
802                 std.complete = 0;
803                 std.task_data = message;
804
805                 if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec_sync, &std)) {
806                         /* Push failed; ugh. */
807                         ast_log(LOG_ERROR, "Dropping sync dispatch\n");
808                         ao2_cleanup(message);
809                         ast_mutex_destroy(&std.lock);
810                         ast_cond_destroy(&std.cond);
811                         return;
812                 }
813
814                 ast_mutex_lock(&std.lock);
815                 while (!std.complete) {
816                         ast_cond_wait(&std.cond, &std.lock);
817                 }
818                 ast_mutex_unlock(&std.lock);
819
820                 ast_mutex_destroy(&std.lock);
821                 ast_cond_destroy(&std.cond);
822         }
823 }
824
825 /*!
826  * \internal \brief Publish a message to a topic's subscribers
827  * \brief topic The topic to publish to
828  * \brief message The message to publish
829  * \brief sync_sub An optional subscriber of the topic to publish synchronously
830  * to
831  */
832 static void publish_msg(struct stasis_topic *topic,
833         struct stasis_message *message, struct stasis_subscription *sync_sub)
834 {
835         size_t i;
836
837         ast_assert(topic != NULL);
838         ast_assert(message != NULL);
839
840         /*
841          * The topic may be unref'ed by the subscription invocation.
842          * Make sure we hold onto a reference while dispatching.
843          */
844         ao2_ref(topic, +1);
845         ao2_lock(topic);
846         for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
847                 struct stasis_subscription *sub = AST_VECTOR_GET(&topic->subscribers, i);
848
849                 ast_assert(sub != NULL);
850
851                 dispatch_message(sub, message, (sub == sync_sub));
852         }
853         ao2_unlock(topic);
854         ao2_ref(topic, -1);
855 }
856
857 void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
858 {
859         publish_msg(topic, message, NULL);
860 }
861
862 void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message)
863 {
864         ast_assert(sub != NULL);
865
866         publish_msg(sub->topic, message, sub);
867 }
868
869 /*!
870  * \brief Forwarding information
871  *
872  * Any message posted to \a from_topic is forwarded to \a to_topic.
873  *
874  * In cases where both the \a from_topic and \a to_topic need to be locked,
875  * always lock the \a to_topic first, then the \a from_topic. Lest you deadlock.
876  */
877 struct stasis_forward {
878         /*! Originating topic */
879         struct stasis_topic *from_topic;
880         /*! Destination topic */
881         struct stasis_topic *to_topic;
882 };
883
884 static void forward_dtor(void *obj)
885 {
886         struct stasis_forward *forward = obj;
887
888         ao2_cleanup(forward->from_topic);
889         forward->from_topic = NULL;
890         ao2_cleanup(forward->to_topic);
891         forward->to_topic = NULL;
892 }
893
894 struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward)
895 {
896         int idx;
897         struct stasis_topic *from;
898         struct stasis_topic *to;
899
900         if (!forward) {
901                 return NULL;
902         }
903
904         from = forward->from_topic;
905         to = forward->to_topic;
906
907         if (from && to) {
908                 topic_lock_both(to, from);
909                 AST_VECTOR_REMOVE_ELEM_UNORDERED(&to->upstream_topics, from,
910                         AST_VECTOR_ELEM_CLEANUP_NOOP);
911
912                 for (idx = 0; idx < AST_VECTOR_SIZE(&to->subscribers); ++idx) {
913                         topic_remove_subscription(from, AST_VECTOR_GET(&to->subscribers, idx));
914                 }
915                 ao2_unlock(from);
916                 ao2_unlock(to);
917         }
918
919         ao2_cleanup(forward);
920
921         return NULL;
922 }
923
924 struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic,
925         struct stasis_topic *to_topic)
926 {
927         int res;
928         size_t idx;
929         struct stasis_forward *forward;
930
931         if (!from_topic || !to_topic) {
932                 return NULL;
933         }
934
935         forward = ao2_alloc_options(sizeof(*forward), forward_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
936         if (!forward) {
937                 return NULL;
938         }
939
940         /* Forwards to ourselves are implicit. */
941         if (to_topic == from_topic) {
942                 return forward;
943         }
944
945         forward->from_topic = ao2_bump(from_topic);
946         forward->to_topic = ao2_bump(to_topic);
947
948         topic_lock_both(to_topic, from_topic);
949         res = AST_VECTOR_APPEND(&to_topic->upstream_topics, from_topic);
950         if (res != 0) {
951                 ao2_unlock(from_topic);
952                 ao2_unlock(to_topic);
953                 ao2_ref(forward, -1);
954                 return NULL;
955         }
956
957         for (idx = 0; idx < AST_VECTOR_SIZE(&to_topic->subscribers); ++idx) {
958                 topic_add_subscription(from_topic, AST_VECTOR_GET(&to_topic->subscribers, idx));
959         }
960         ao2_unlock(from_topic);
961         ao2_unlock(to_topic);
962
963         return forward;
964 }
965
966 static void subscription_change_dtor(void *obj)
967 {
968         struct stasis_subscription_change *change = obj;
969
970         ast_string_field_free_memory(change);
971         ao2_cleanup(change->topic);
972 }
973
974 static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, const char *uniqueid, const char *description)
975 {
976         struct stasis_subscription_change *change;
977
978         change = ao2_alloc(sizeof(struct stasis_subscription_change), subscription_change_dtor);
979         if (!change || ast_string_field_init(change, 128)) {
980                 ao2_cleanup(change);
981                 return NULL;
982         }
983
984         ast_string_field_set(change, uniqueid, uniqueid);
985         ast_string_field_set(change, description, description);
986         ao2_ref(topic, +1);
987         change->topic = topic;
988
989         return change;
990 }
991
992 static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
993 {
994         struct stasis_subscription_change *change;
995         struct stasis_message *msg;
996
997         /* This assumes that we have already unsubscribed */
998         ast_assert(stasis_subscription_is_subscribed(sub));
999
1000         if (!stasis_subscription_change_type()) {
1001                 return;
1002         }
1003
1004         change = subscription_change_alloc(topic, sub->uniqueid, "Subscribe");
1005         if (!change) {
1006                 return;
1007         }
1008
1009         msg = stasis_message_create(stasis_subscription_change_type(), change);
1010         if (!msg) {
1011                 ao2_cleanup(change);
1012                 return;
1013         }
1014
1015         stasis_publish(topic, msg);
1016         ao2_cleanup(msg);
1017         ao2_cleanup(change);
1018 }
1019
1020 static void send_subscription_unsubscribe(struct stasis_topic *topic,
1021         struct stasis_subscription *sub)
1022 {
1023         struct stasis_subscription_change *change;
1024         struct stasis_message *msg;
1025
1026         /* This assumes that we have already unsubscribed */
1027         ast_assert(!stasis_subscription_is_subscribed(sub));
1028
1029         if (!stasis_subscription_change_type()) {
1030                 return;
1031         }
1032
1033         change = subscription_change_alloc(topic, sub->uniqueid, "Unsubscribe");
1034         if (!change) {
1035                 return;
1036         }
1037
1038         msg = stasis_message_create(stasis_subscription_change_type(), change);
1039         if (!msg) {
1040                 ao2_cleanup(change);
1041                 return;
1042         }
1043
1044         stasis_publish(topic, msg);
1045
1046         /* Now we have to dispatch to the subscription itself */
1047         dispatch_message(sub, msg, 0);
1048
1049         ao2_cleanup(msg);
1050         ao2_cleanup(change);
1051 }
1052
1053 struct topic_pool_entry {
1054         struct stasis_forward *forward;
1055         struct stasis_topic *topic;
1056 };
1057
1058 static void topic_pool_entry_dtor(void *obj)
1059 {
1060         struct topic_pool_entry *entry = obj;
1061
1062         entry->forward = stasis_forward_cancel(entry->forward);
1063         ao2_cleanup(entry->topic);
1064         entry->topic = NULL;
1065 }
1066
1067 static struct topic_pool_entry *topic_pool_entry_alloc(void)
1068 {
1069         return ao2_alloc_options(sizeof(struct topic_pool_entry), topic_pool_entry_dtor,
1070                 AO2_ALLOC_OPT_LOCK_NOLOCK);
1071 }
1072
1073 struct stasis_topic_pool {
1074         struct ao2_container *pool_container;
1075         struct stasis_topic *pool_topic;
1076 };
1077
1078 static void topic_pool_dtor(void *obj)
1079 {
1080         struct stasis_topic_pool *pool = obj;
1081
1082 #ifdef AO2_DEBUG
1083         {
1084                 char *container_name =
1085                         ast_alloca(strlen(stasis_topic_name(pool->pool_topic)) + strlen("-pool") + 1);
1086                 sprintf(container_name, "%s-pool", stasis_topic_name(pool->pool_topic));
1087                 ao2_container_unregister(container_name);
1088         }
1089 #endif
1090
1091         ao2_cleanup(pool->pool_container);
1092         pool->pool_container = NULL;
1093         ao2_cleanup(pool->pool_topic);
1094         pool->pool_topic = NULL;
1095 }
1096
1097 static int topic_pool_entry_hash(const void *obj, const int flags)
1098 {
1099         const struct topic_pool_entry *object;
1100         const char *key;
1101
1102         switch (flags & OBJ_SEARCH_MASK) {
1103         case OBJ_SEARCH_KEY:
1104                 key = obj;
1105                 break;
1106         case OBJ_SEARCH_OBJECT:
1107                 object = obj;
1108                 key = stasis_topic_name(object->topic);
1109                 break;
1110         default:
1111                 /* Hash can only work on something with a full key. */
1112                 ast_assert(0);
1113                 return 0;
1114         }
1115         return ast_str_case_hash(key);
1116 }
1117
1118 static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
1119 {
1120         const struct topic_pool_entry *object_left = obj;
1121         const struct topic_pool_entry *object_right = arg;
1122         const char *right_key = arg;
1123         int cmp;
1124
1125         switch (flags & OBJ_SEARCH_MASK) {
1126         case OBJ_SEARCH_OBJECT:
1127                 right_key = stasis_topic_name(object_right->topic);
1128                 /* Fall through */
1129         case OBJ_SEARCH_KEY:
1130                 cmp = strcasecmp(stasis_topic_name(object_left->topic), right_key);
1131                 break;
1132         case OBJ_SEARCH_PARTIAL_KEY:
1133                 /* Not supported by container */
1134                 ast_assert(0);
1135                 cmp = -1;
1136                 break;
1137         default:
1138                 /*
1139                  * What arg points to is specific to this traversal callback
1140                  * and has no special meaning to astobj2.
1141                  */
1142                 cmp = 0;
1143                 break;
1144         }
1145         if (cmp) {
1146                 return 0;
1147         }
1148         /*
1149          * At this point the traversal callback is identical to a sorted
1150          * container.
1151          */
1152         return CMP_MATCH;
1153 }
1154
1155 #ifdef AO2_DEBUG
1156 static void topic_pool_prnt_obj(void *v_obj, void *where, ao2_prnt_fn *prnt)
1157 {
1158         struct topic_pool_entry *entry = v_obj;
1159
1160         if (!entry) {
1161                 return;
1162         }
1163         prnt(where, "%s", stasis_topic_name(entry->topic));
1164 }
1165 #endif
1166
1167 struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic)
1168 {
1169         struct stasis_topic_pool *pool;
1170
1171         pool = ao2_alloc_options(sizeof(*pool), topic_pool_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
1172         if (!pool) {
1173                 return NULL;
1174         }
1175
1176         pool->pool_container = ao2_container_alloc(TOPIC_POOL_BUCKETS,
1177                 topic_pool_entry_hash, topic_pool_entry_cmp);
1178         if (!pool->pool_container) {
1179                 ao2_cleanup(pool);
1180                 return NULL;
1181         }
1182
1183 #ifdef AO2_DEBUG
1184         {
1185                 char *container_name =
1186                         ast_alloca(strlen(stasis_topic_name(pooled_topic)) + strlen("-pool") + 1);
1187                 sprintf(container_name, "%s-pool", stasis_topic_name(pooled_topic));
1188                 ao2_container_register(container_name, pool->pool_container, topic_pool_prnt_obj);
1189         }
1190 #endif
1191
1192         ao2_ref(pooled_topic, +1);
1193         pool->pool_topic = pooled_topic;
1194
1195         return pool;
1196 }
1197
1198 void stasis_topic_pool_delete_topic(struct stasis_topic_pool *pool, const char *topic_name)
1199 {
1200         ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY | OBJ_NODATA | OBJ_UNLINK);
1201 }
1202
1203 struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
1204 {
1205         RAII_VAR(struct topic_pool_entry *, topic_pool_entry, NULL, ao2_cleanup);
1206         SCOPED_AO2LOCK(topic_container_lock, pool->pool_container);
1207
1208         topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1209         if (topic_pool_entry) {
1210                 return topic_pool_entry->topic;
1211         }
1212
1213         topic_pool_entry = topic_pool_entry_alloc();
1214         if (!topic_pool_entry) {
1215                 return NULL;
1216         }
1217
1218         topic_pool_entry->topic = stasis_topic_create(topic_name);
1219         if (!topic_pool_entry->topic) {
1220                 return NULL;
1221         }
1222
1223         topic_pool_entry->forward = stasis_forward_all(topic_pool_entry->topic, pool->pool_topic);
1224         if (!topic_pool_entry->forward) {
1225                 return NULL;
1226         }
1227
1228         if (!ao2_link_flags(pool->pool_container, topic_pool_entry, OBJ_NOLOCK)) {
1229                 return NULL;
1230         }
1231
1232         return topic_pool_entry->topic;
1233 }
1234
1235 int stasis_topic_pool_topic_exists(const struct stasis_topic_pool *pool, const char *topic_name)
1236 {
1237         struct topic_pool_entry *topic_pool_entry;
1238
1239         topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY);
1240         if (!topic_pool_entry) {
1241                 return 0;
1242         }
1243
1244         ao2_ref(topic_pool_entry, -1);
1245         return 1;
1246 }
1247
1248 void stasis_log_bad_type_access(const char *name)
1249 {
1250 #ifdef AST_DEVMODE
1251         if (!stasis_message_type_declined(name)) {
1252                 ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name);
1253         }
1254 #endif
1255 }
1256
1257 /*! \brief A multi object blob data structure to carry user event stasis messages */
1258 struct ast_multi_object_blob {
1259         struct ast_json *blob;                             /*< A blob of JSON data */
1260         AST_VECTOR(, void *) snapshots[STASIS_UMOS_MAX];   /*< Vector of snapshots for each type */
1261 };
1262
1263 /*!
1264  * \internal
1265  * \brief Destructor for \ref ast_multi_object_blob objects
1266  */
1267 static void multi_object_blob_dtor(void *obj)
1268 {
1269         struct ast_multi_object_blob *multi = obj;
1270         int type;
1271         int i;
1272
1273         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1274                 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
1275                         ao2_cleanup(AST_VECTOR_GET(&multi->snapshots[type], i));
1276                 }
1277                 AST_VECTOR_FREE(&multi->snapshots[type]);
1278         }
1279         ast_json_unref(multi->blob);
1280 }
1281
1282 /*! \brief Create a stasis user event multi object blob */
1283 struct ast_multi_object_blob *ast_multi_object_blob_create(struct ast_json *blob)
1284 {
1285         int type;
1286         struct ast_multi_object_blob *multi;
1287
1288         ast_assert(blob != NULL);
1289
1290         multi = ao2_alloc(sizeof(*multi), multi_object_blob_dtor);
1291         if (!multi) {
1292                 return NULL;
1293         }
1294
1295         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1296                 if (AST_VECTOR_INIT(&multi->snapshots[type], 0)) {
1297                         ao2_ref(multi, -1);
1298
1299                         return NULL;
1300                 }
1301         }
1302
1303         multi->blob = ast_json_ref(blob);
1304
1305         return multi;
1306 }
1307
1308 /*! \brief Add an object (snapshot) to the blob */
1309 void ast_multi_object_blob_add(struct ast_multi_object_blob *multi,
1310         enum stasis_user_multi_object_snapshot_type type, void *object)
1311 {
1312         if (!multi || !object || AST_VECTOR_APPEND(&multi->snapshots[type], object)) {
1313                 ao2_cleanup(object);
1314         }
1315 }
1316
1317 /*! \brief Publish single channel user event (for app_userevent compatibility) */
1318 void ast_multi_object_blob_single_channel_publish(struct ast_channel *chan,
1319         struct stasis_message_type *type, struct ast_json *blob)
1320 {
1321         struct stasis_message *message;
1322         struct ast_channel_snapshot *channel_snapshot;
1323         struct ast_multi_object_blob *multi;
1324
1325         if (!type) {
1326                 return;
1327         }
1328
1329         multi = ast_multi_object_blob_create(blob);
1330         if (!multi) {
1331                 return;
1332         }
1333
1334         channel_snapshot = ast_channel_snapshot_create(chan);
1335         if (!channel_snapshot) {
1336                 ao2_ref(multi, -1);
1337                 return;
1338         }
1339
1340         /* this call steals the channel_snapshot reference */
1341         ast_multi_object_blob_add(multi, STASIS_UMOS_CHANNEL, channel_snapshot);
1342
1343         message = stasis_message_create(type, multi);
1344         ao2_ref(multi, -1);
1345         if (message) {
1346                 /* app_userevent still publishes to channel */
1347                 stasis_publish(ast_channel_topic(chan), message);
1348                 ao2_ref(message, -1);
1349         }
1350 }
1351
1352 /*! \internal \brief convert multi object blob to ari json */
1353 static struct ast_json *multi_user_event_to_json(
1354         struct stasis_message *message,
1355         const struct stasis_message_sanitizer *sanitize)
1356 {
1357         struct ast_json *out;
1358         struct ast_multi_object_blob *multi = stasis_message_data(message);
1359         struct ast_json *blob = multi->blob;
1360         const struct timeval *tv = stasis_message_timestamp(message);
1361         enum stasis_user_multi_object_snapshot_type type;
1362         int i;
1363
1364         out = ast_json_object_create();
1365         if (!out) {
1366                 return NULL;
1367         }
1368
1369         ast_json_object_set(out, "type", ast_json_string_create("ChannelUserevent"));
1370         ast_json_object_set(out, "timestamp", ast_json_timeval(*tv, NULL));
1371         ast_json_object_set(out, "eventname", ast_json_ref(ast_json_object_get(blob, "eventname")));
1372         ast_json_object_set(out, "userevent", ast_json_ref(blob));
1373
1374         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1375                 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
1376                         struct ast_json *json_object = NULL;
1377                         char *name = NULL;
1378                         void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
1379
1380                         switch (type) {
1381                         case STASIS_UMOS_CHANNEL:
1382                                 json_object = ast_channel_snapshot_to_json(snapshot, sanitize);
1383                                 name = "channel";
1384                                 break;
1385                         case STASIS_UMOS_BRIDGE:
1386                                 json_object = ast_bridge_snapshot_to_json(snapshot, sanitize);
1387                                 name = "bridge";
1388                                 break;
1389                         case STASIS_UMOS_ENDPOINT:
1390                                 json_object = ast_endpoint_snapshot_to_json(snapshot, sanitize);
1391                                 name = "endpoint";
1392                                 break;
1393                         }
1394                         if (json_object) {
1395                                 ast_json_object_set(out, name, json_object);
1396                         }
1397                 }
1398         }
1399
1400         return out;
1401 }
1402
1403 /*! \internal \brief convert multi object blob to ami string */
1404 static struct ast_str *multi_object_blob_to_ami(void *obj)
1405 {
1406         struct ast_str *ami_str=ast_str_create(1024);
1407         struct ast_str *ami_snapshot;
1408         const struct ast_multi_object_blob *multi = obj;
1409         enum stasis_user_multi_object_snapshot_type type;
1410         int i;
1411
1412         if (!ami_str) {
1413                 return NULL;
1414         }
1415         if (!multi) {
1416                 ast_free(ami_str);
1417                 return NULL;
1418         }
1419
1420         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1421                 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
1422                         char *name = NULL;
1423                         void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
1424                         ami_snapshot = NULL;
1425
1426                         if (i > 0) {
1427                                 ast_asprintf(&name, "%d", i + 1);
1428                         }
1429
1430                         switch (type) {
1431                         case STASIS_UMOS_CHANNEL:
1432                                 ami_snapshot = ast_manager_build_channel_state_string_prefix(snapshot, name ?: "");
1433                                 break;
1434
1435                         case STASIS_UMOS_BRIDGE:
1436                                 ami_snapshot = ast_manager_build_bridge_state_string_prefix(snapshot, name ?: "");
1437                                 break;
1438
1439                         case STASIS_UMOS_ENDPOINT:
1440                                 /* currently not sending endpoint snapshots to AMI */
1441                                 break;
1442                         }
1443                         if (ami_snapshot) {
1444                                 ast_str_append(&ami_str, 0, "%s", ast_str_buffer(ami_snapshot));
1445                                 ast_free(ami_snapshot);
1446                         }
1447                         ast_free(name);
1448                 }
1449         }
1450
1451         return ami_str;
1452 }
1453
1454 /*! \internal \brief Callback to pass only user defined parameters from blob */
1455 static int userevent_exclusion_cb(const char *key)
1456 {
1457         if (!strcmp("eventname", key)) {
1458                 return 1;
1459         }
1460         return 0;
1461 }
1462
1463 static struct ast_manager_event_blob *multi_user_event_to_ami(
1464         struct stasis_message *message)
1465 {
1466         RAII_VAR(struct ast_str *, object_string, NULL, ast_free);
1467         RAII_VAR(struct ast_str *, body, NULL, ast_free);
1468         struct ast_multi_object_blob *multi = stasis_message_data(message);
1469         const char *eventname;
1470
1471         eventname = ast_json_string_get(ast_json_object_get(multi->blob, "eventname"));
1472         body = ast_manager_str_from_json_object(multi->blob, userevent_exclusion_cb);
1473         object_string = multi_object_blob_to_ami(multi);
1474         if (!object_string || !body) {
1475                 return NULL;
1476         }
1477
1478         return ast_manager_event_blob_create(EVENT_FLAG_USER, "UserEvent",
1479                 "%s"
1480                 "UserEvent: %s\r\n"
1481                 "%s",
1482                 ast_str_buffer(object_string),
1483                 eventname,
1484                 ast_str_buffer(body));
1485 }
1486
1487 /*! \brief A structure to hold global configuration-related options */
1488 struct stasis_declined_config {
1489         /*! The list of message types to decline */
1490         struct ao2_container *declined;
1491 };
1492
1493 /*! \brief Threadpool configuration options */
1494 struct stasis_threadpool_conf {
1495         /*! Initial size of the thread pool */
1496         int initial_size;
1497         /*! Time, in seconds, before we expire a thread */
1498         int idle_timeout_sec;
1499         /*! Maximum number of thread to allow */
1500         int max_size;
1501 };
1502
1503 struct stasis_config {
1504         /*! Thread pool configuration options */
1505         struct stasis_threadpool_conf *threadpool_options;
1506         /*! Declined message types */
1507         struct stasis_declined_config *declined_message_types;
1508 };
1509
1510 static struct aco_type threadpool_option = {
1511         .type = ACO_GLOBAL,
1512         .name = "threadpool",
1513         .item_offset = offsetof(struct stasis_config, threadpool_options),
1514         .category = "threadpool",
1515         .category_match = ACO_WHITELIST_EXACT,
1516 };
1517
1518 static struct aco_type *threadpool_options[] = ACO_TYPES(&threadpool_option);
1519
1520 /*! \brief An aco_type structure to link the "declined_message_types" category to the stasis_declined_config type */
1521 static struct aco_type declined_option = {
1522         .type = ACO_GLOBAL,
1523         .name = "declined_message_types",
1524         .item_offset = offsetof(struct stasis_config, declined_message_types),
1525         .category_match = ACO_WHITELIST_EXACT,
1526         .category = "declined_message_types",
1527 };
1528
1529 struct aco_type *declined_options[] = ACO_TYPES(&declined_option);
1530
1531 struct aco_file stasis_conf = {
1532         .filename = "stasis.conf",
1533         .types = ACO_TYPES(&declined_option, &threadpool_option),
1534 };
1535
1536 /*! \brief A global object container that will contain the stasis_config that gets swapped out on reloads */
1537 static AO2_GLOBAL_OBJ_STATIC(globals);
1538
1539 static void *stasis_config_alloc(void);
1540
1541 /*! \brief Register information about the configs being processed by this module */
1542 CONFIG_INFO_CORE("stasis", cfg_info, globals, stasis_config_alloc,
1543         .files = ACO_FILES(&stasis_conf),
1544 );
1545
1546 static void stasis_declined_config_destructor(void *obj)
1547 {
1548         struct stasis_declined_config *declined = obj;
1549
1550         ao2_cleanup(declined->declined);
1551 }
1552
1553 static void stasis_config_destructor(void *obj)
1554 {
1555         struct stasis_config *cfg = obj;
1556
1557         ao2_cleanup(cfg->declined_message_types);
1558         ast_free(cfg->threadpool_options);
1559 }
1560
1561 static void *stasis_config_alloc(void)
1562 {
1563         struct stasis_config *cfg;
1564
1565         if (!(cfg = ao2_alloc(sizeof(*cfg), stasis_config_destructor))) {
1566                 return NULL;
1567         }
1568
1569         cfg->threadpool_options = ast_calloc(1, sizeof(*cfg->threadpool_options));
1570         if (!cfg->threadpool_options) {
1571                 ao2_ref(cfg, -1);
1572                 return NULL;
1573         }
1574
1575         cfg->declined_message_types = ao2_alloc(sizeof(*cfg->declined_message_types),
1576                 stasis_declined_config_destructor);
1577         if (!cfg->declined_message_types) {
1578                 ao2_ref(cfg, -1);
1579                 return NULL;
1580         }
1581
1582         cfg->declined_message_types->declined = ast_str_container_alloc(13);
1583         if (!cfg->declined_message_types->declined) {
1584                 ao2_ref(cfg, -1);
1585                 return NULL;
1586         }
1587
1588         return cfg;
1589 }
1590
1591 int stasis_message_type_declined(const char *name)
1592 {
1593         struct stasis_config *cfg = ao2_global_obj_ref(globals);
1594         char *name_in_declined;
1595         int res;
1596
1597         if (!cfg || !cfg->declined_message_types) {
1598                 ao2_cleanup(cfg);
1599                 return 0;
1600         }
1601
1602         name_in_declined = ao2_find(cfg->declined_message_types->declined, name, OBJ_SEARCH_KEY);
1603         res = name_in_declined ? 1 : 0;
1604         ao2_cleanup(name_in_declined);
1605         ao2_ref(cfg, -1);
1606         if (res) {
1607                 ast_log(LOG_NOTICE, "Declining to allocate Stasis message type '%s' due to configuration\n", name);
1608         }
1609         return res;
1610 }
1611
1612 static int declined_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
1613 {
1614         struct stasis_declined_config *declined = obj;
1615
1616         if (ast_strlen_zero(var->value)) {
1617                 return 0;
1618         }
1619
1620         if (ast_str_container_add(declined->declined, var->value)) {
1621                 return -1;
1622         }
1623
1624         return 0;
1625 }
1626
1627 /*!
1628  * @{ \brief Define multi user event message type(s).
1629  */
1630
1631 STASIS_MESSAGE_TYPE_DEFN(ast_multi_user_event_type,
1632         .to_json = multi_user_event_to_json,
1633         .to_ami = multi_user_event_to_ami,
1634         );
1635
1636 /*! @} */
1637
1638 /*! \brief Cleanup function for graceful shutdowns */
1639 static void stasis_cleanup(void)
1640 {
1641         ast_threadpool_shutdown(pool);
1642         pool = NULL;
1643         STASIS_MESSAGE_TYPE_CLEANUP(stasis_subscription_change_type);
1644         STASIS_MESSAGE_TYPE_CLEANUP(ast_multi_user_event_type);
1645         aco_info_destroy(&cfg_info);
1646         ao2_global_obj_release(globals);
1647 }
1648
1649 int stasis_init(void)
1650 {
1651         struct stasis_config *cfg;
1652         int cache_init;
1653         struct ast_threadpool_options threadpool_opts = { 0, };
1654
1655         /* Be sure the types are cleaned up after the message bus */
1656         ast_register_cleanup(stasis_cleanup);
1657
1658         if (aco_info_init(&cfg_info)) {
1659                 return -1;
1660         }
1661
1662         aco_option_register_custom(&cfg_info, "decline", ACO_EXACT,
1663                 declined_options, "", declined_handler, 0);
1664         aco_option_register(&cfg_info, "initial_size", ACO_EXACT,
1665                 threadpool_options, "5", OPT_INT_T, PARSE_IN_RANGE,
1666                 FLDSET(struct stasis_threadpool_conf, initial_size), 0,
1667                 INT_MAX);
1668         aco_option_register(&cfg_info, "idle_timeout_sec", ACO_EXACT,
1669                 threadpool_options, "20", OPT_INT_T, PARSE_IN_RANGE,
1670                 FLDSET(struct stasis_threadpool_conf, idle_timeout_sec), 0,
1671                 INT_MAX);
1672         aco_option_register(&cfg_info, "max_size", ACO_EXACT,
1673                 threadpool_options, "50", OPT_INT_T, PARSE_IN_RANGE,
1674                 FLDSET(struct stasis_threadpool_conf, max_size), 0,
1675                 INT_MAX);
1676
1677         if (aco_process_config(&cfg_info, 0) == ACO_PROCESS_ERROR) {
1678                 struct stasis_config *default_cfg = stasis_config_alloc();
1679
1680                 if (!default_cfg) {
1681                         return -1;
1682                 }
1683
1684                 if (aco_set_defaults(&threadpool_option, "threadpool", default_cfg->threadpool_options)) {
1685                         ast_log(LOG_ERROR, "Failed to initialize defaults on Stasis configuration object\n");
1686                         ao2_ref(default_cfg, -1);
1687
1688                         return -1;
1689                 }
1690
1691                 if (aco_set_defaults(&declined_option, "declined_message_types", default_cfg->declined_message_types)) {
1692                         ast_log(LOG_ERROR, "Failed to load stasis.conf and failed to initialize defaults.\n");
1693                         ao2_ref(default_cfg, -1);
1694
1695                         return -1;
1696                 }
1697
1698                 ast_log(LOG_NOTICE, "Could not load Stasis configuration; using defaults\n");
1699                 ao2_global_obj_replace_unref(globals, default_cfg);
1700                 cfg = default_cfg;
1701         } else {
1702                 cfg = ao2_global_obj_ref(globals);
1703                 if (!cfg) {
1704                         ast_log(LOG_ERROR, "Failed to obtain Stasis configuration object\n");
1705
1706                         return -1;
1707                 }
1708         }
1709
1710         threadpool_opts.version = AST_THREADPOOL_OPTIONS_VERSION;
1711         threadpool_opts.initial_size = cfg->threadpool_options->initial_size;
1712         threadpool_opts.auto_increment = 1;
1713         threadpool_opts.max_size = cfg->threadpool_options->max_size;
1714         threadpool_opts.idle_timeout = cfg->threadpool_options->idle_timeout_sec;
1715         pool = ast_threadpool_create("stasis-core", NULL, &threadpool_opts);
1716         ao2_ref(cfg, -1);
1717         if (!pool) {
1718                 ast_log(LOG_ERROR, "Failed to create 'stasis-core' threadpool\n");
1719
1720                 return -1;
1721         }
1722
1723         cache_init = stasis_cache_init();
1724         if (cache_init != 0) {
1725                 return -1;
1726         }
1727
1728         if (STASIS_MESSAGE_TYPE_INIT(stasis_subscription_change_type) != 0) {
1729                 return -1;
1730         }
1731         if (STASIS_MESSAGE_TYPE_INIT(ast_multi_user_event_type) != 0) {
1732                 return -1;
1733         }
1734
1735         return 0;
1736 }