4ce70523320f66410058fb35cd0c86136f20ecfa
[asterisk/asterisk.git] / main / stasis.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis Message Bus API.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 #include "asterisk/astobj2.h"
33 #include "asterisk/stasis_internal.h"
34 #include "asterisk/stasis.h"
35 #include "asterisk/taskprocessor.h"
36 #include "asterisk/threadpool.h"
37 #include "asterisk/utils.h"
38 #include "asterisk/uuid.h"
39 #include "asterisk/vector.h"
40 #include "asterisk/stasis_channels.h"
41 #include "asterisk/stasis_bridges.h"
42 #include "asterisk/stasis_endpoints.h"
43 #include "asterisk/config_options.h"
44 #include "asterisk/cli.h"
45
46 /*** DOCUMENTATION
47         <managerEvent language="en_US" name="UserEvent">
48                 <managerEventInstance class="EVENT_FLAG_USER">
49                         <synopsis>A user defined event raised from the dialplan.</synopsis>
50                         <syntax>
51                                 <channel_snapshot/>
52                                 <parameter name="UserEvent">
53                                         <para>The event name, as specified in the dialplan.</para>
54                                 </parameter>
55                         </syntax>
56                         <description>
57                                 <para>Event may contain additional arbitrary parameters in addition to optional bridge and endpoint snapshots.  Multiple snapshots of the same type are prefixed with a numeric value.</para>
58                         </description>
59                         <see-also>
60                                 <ref type="application">UserEvent</ref>
61                                 <ref type="managerEvent">UserEvent</ref>
62                         </see-also>
63                 </managerEventInstance>
64         </managerEvent>
65         <configInfo name="stasis" language="en_US">
66                 <configFile name="stasis.conf">
67                         <configObject name="threadpool">
68                                 <synopsis>Settings that configure the threadpool Stasis uses to deliver some messages.</synopsis>
69                                 <configOption name="initial_size" default="5">
70                                         <synopsis>Initial number of threads in the message bus threadpool.</synopsis>
71                                 </configOption>
72                                 <configOption name="idle_timeout_sec" default="20">
73                                         <synopsis>Number of seconds before an idle thread is disposed of.</synopsis>
74                                 </configOption>
75                                 <configOption name="max_size" default="50">
76                                         <synopsis>Maximum number of threads in the threadpool.</synopsis>
77                                 </configOption>
78                         </configObject>
79                         <configObject name="declined_message_types">
80                                 <synopsis>Stasis message types for which to decline creation.</synopsis>
81                                 <configOption name="decline">
82                                         <synopsis>The message type to decline.</synopsis>
83                                         <description>
84                                                 <para>This configuration option defines the name of the Stasis
85                                                 message type that Asterisk is forbidden from creating and can be
86                                                 specified as many times as necessary to achieve the desired result.</para>
87                                                 <enumlist>
88                                                         <enum name="stasis_app_recording_snapshot_type" />
89                                                         <enum name="stasis_app_playback_snapshot_type" />
90                                                         <enum name="stasis_test_message_type" />
91                                                         <enum name="confbridge_start_type" />
92                                                         <enum name="confbridge_end_type" />
93                                                         <enum name="confbridge_join_type" />
94                                                         <enum name="confbridge_leave_type" />
95                                                         <enum name="confbridge_start_record_type" />
96                                                         <enum name="confbridge_stop_record_type" />
97                                                         <enum name="confbridge_mute_type" />
98                                                         <enum name="confbridge_unmute_type" />
99                                                         <enum name="confbridge_talking_type" />
100                                                         <enum name="cel_generic_type" />
101                                                         <enum name="ast_bridge_snapshot_type" />
102                                                         <enum name="ast_bridge_merge_message_type" />
103                                                         <enum name="ast_channel_entered_bridge_type" />
104                                                         <enum name="ast_channel_left_bridge_type" />
105                                                         <enum name="ast_blind_transfer_type" />
106                                                         <enum name="ast_attended_transfer_type" />
107                                                         <enum name="ast_endpoint_snapshot_type" />
108                                                         <enum name="ast_endpoint_state_type" />
109                                                         <enum name="ast_device_state_message_type" />
110                                                         <enum name="ast_test_suite_message_type" />
111                                                         <enum name="ast_mwi_state_type" />
112                                                         <enum name="ast_mwi_vm_app_type" />
113                                                         <enum name="ast_format_register_type" />
114                                                         <enum name="ast_format_unregister_type" />
115                                                         <enum name="ast_manager_get_generic_type" />
116                                                         <enum name="ast_parked_call_type" />
117                                                         <enum name="ast_channel_snapshot_type" />
118                                                         <enum name="ast_channel_dial_type" />
119                                                         <enum name="ast_channel_varset_type" />
120                                                         <enum name="ast_channel_hangup_request_type" />
121                                                         <enum name="ast_channel_dtmf_begin_type" />
122                                                         <enum name="ast_channel_dtmf_end_type" />
123                                                         <enum name="ast_channel_hold_type" />
124                                                         <enum name="ast_channel_unhold_type" />
125                                                         <enum name="ast_channel_chanspy_start_type" />
126                                                         <enum name="ast_channel_chanspy_stop_type" />
127                                                         <enum name="ast_channel_fax_type" />
128                                                         <enum name="ast_channel_hangup_handler_type" />
129                                                         <enum name="ast_channel_moh_start_type" />
130                                                         <enum name="ast_channel_moh_stop_type" />
131                                                         <enum name="ast_channel_monitor_start_type" />
132                                                         <enum name="ast_channel_monitor_stop_type" />
133                                                         <enum name="ast_channel_agent_login_type" />
134                                                         <enum name="ast_channel_agent_logoff_type" />
135                                                         <enum name="ast_channel_talking_start" />
136                                                         <enum name="ast_channel_talking_stop" />
137                                                         <enum name="ast_security_event_type" />
138                                                         <enum name="ast_named_acl_change_type" />
139                                                         <enum name="ast_local_bridge_type" />
140                                                         <enum name="ast_local_optimization_begin_type" />
141                                                         <enum name="ast_local_optimization_end_type" />
142                                                         <enum name="stasis_subscription_change_type" />
143                                                         <enum name="ast_multi_user_event_type" />
144                                                         <enum name="stasis_cache_clear_type" />
145                                                         <enum name="stasis_cache_update_type" />
146                                                         <enum name="ast_network_change_type" />
147                                                         <enum name="ast_system_registry_type" />
148                                                         <enum name="ast_cc_available_type" />
149                                                         <enum name="ast_cc_offertimerstart_type" />
150                                                         <enum name="ast_cc_requested_type" />
151                                                         <enum name="ast_cc_requestacknowledged_type" />
152                                                         <enum name="ast_cc_callerstopmonitoring_type" />
153                                                         <enum name="ast_cc_callerstartmonitoring_type" />
154                                                         <enum name="ast_cc_callerrecalling_type" />
155                                                         <enum name="ast_cc_recallcomplete_type" />
156                                                         <enum name="ast_cc_failure_type" />
157                                                         <enum name="ast_cc_monitorfailed_type" />
158                                                         <enum name="ast_presence_state_message_type" />
159                                                         <enum name="ast_rtp_rtcp_sent_type" />
160                                                         <enum name="ast_rtp_rtcp_received_type" />
161                                                         <enum name="ast_call_pickup_type" />
162                                                         <enum name="aoc_s_type" />
163                                                         <enum name="aoc_d_type" />
164                                                         <enum name="aoc_e_type" />
165                                                         <enum name="dahdichannel_type" />
166                                                         <enum name="mcid_type" />
167                                                         <enum name="session_timeout_type" />
168                                                         <enum name="cdr_read_message_type" />
169                                                         <enum name="cdr_write_message_type" />
170                                                         <enum name="cdr_prop_write_message_type" />
171                                                         <enum name="corosync_ping_message_type" />
172                                                         <enum name="agi_exec_start_type" />
173                                                         <enum name="agi_exec_end_type" />
174                                                         <enum name="agi_async_start_type" />
175                                                         <enum name="agi_async_exec_type" />
176                                                         <enum name="agi_async_end_type" />
177                                                         <enum name="queue_caller_join_type" />
178                                                         <enum name="queue_caller_leave_type" />
179                                                         <enum name="queue_caller_abandon_type" />
180                                                         <enum name="queue_member_status_type" />
181                                                         <enum name="queue_member_added_type" />
182                                                         <enum name="queue_member_removed_type" />
183                                                         <enum name="queue_member_pause_type" />
184                                                         <enum name="queue_member_penalty_type" />
185                                                         <enum name="queue_member_ringinuse_type" />
186                                                         <enum name="queue_agent_called_type" />
187                                                         <enum name="queue_agent_connect_type" />
188                                                         <enum name="queue_agent_complete_type" />
189                                                         <enum name="queue_agent_dump_type" />
190                                                         <enum name="queue_agent_ringnoanswer_type" />
191                                                         <enum name="meetme_join_type" />
192                                                         <enum name="meetme_leave_type" />
193                                                         <enum name="meetme_end_type" />
194                                                         <enum name="meetme_mute_type" />
195                                                         <enum name="meetme_talking_type" />
196                                                         <enum name="meetme_talk_request_type" />
197                                                         <enum name="appcdr_message_type" />
198                                                         <enum name="forkcdr_message_type" />
199                                                         <enum name="cdr_sync_message_type" />
200                                                 </enumlist>
201                                         </description>
202                                 </configOption>
203                         </configObject>
204                 </configFile>
205         </configInfo>
206 ***/
207
208 /*!
209  * \page stasis-impl Stasis Implementation Notes
210  *
211  * \par Reference counting
212  *
213  * Stasis introduces a number of objects, which are tightly related to one
214  * another. Because we rely on ref-counting for memory management, understanding
215  * these relationships is important to understanding this code.
216  *
217  * \code{.txt}
218  *
219  *   stasis_topic <----> stasis_subscription
220  *             ^          ^
221  *              \        /
222  *               \      /
223  *               dispatch
224  *                  |
225  *                  |
226  *                  v
227  *            stasis_message
228  *                  |
229  *                  |
230  *                  v
231  *          stasis_message_type
232  *
233  * \endcode
234  *
235  * The most troubling thing in this chart is the cyclic reference between
236  * stasis_topic and stasis_subscription. This is both unfortunate, and
237  * necessary. Topics need the subscription in order to dispatch messages;
238  * subscriptions need the topics to unsubscribe and check subscription status.
239  *
240  * The cycle is broken by stasis_unsubscribe(). The unsubscribe will remove the
241  * topic's reference to a subscription. When the subcription is destroyed, it
242  * will remove its reference to the topic.
243  *
244  * This means that until a subscription has be explicitly unsubscribed, it will
245  * not be destroyed. Neither will a topic be destroyed while it has subscribers.
246  * The destructors of both have assertions regarding this to catch ref-counting
247  * problems where a subscription or topic has had an extra ao2_cleanup().
248  *
249  * The \ref dispatch object is a transient object, which is posted to a
250  * subscription's taskprocessor to send a message to the subscriber. They have
251  * short life cycles, allocated on one thread, destroyed on another.
252  *
253  * During shutdown, or the deletion of a domain object, there are a flurry of
254  * ao2_cleanup()s on subscriptions and topics, as the final in-flight messages
255  * are processed. Any one of these cleanups could be the one to actually destroy
256  * a given object, so care must be taken to ensure that an object isn't
257  * referenced after an ao2_cleanup(). This includes the implicit ao2_unlock()
258  * that might happen when a RAII_VAR() goes out of scope.
259  *
260  * \par Typical life cycles
261  *
262  *  \li stasis_topic - There are several topics which live for the duration of
263  *      the Asterisk process (ast_channel_topic_all(), etc.) but most of these
264  *      are actually fed by shorter-lived topics whose lifetime is associated
265  *      with some domain object (like ast_channel_topic() for a given
266  *      ast_channel).
267  *
268  *  \li stasis_subscription - Subscriptions have a similar mix of lifetimes as
269  *      topics, for similar reasons.
270  *
271  *  \li dispatch - Very short lived; just long enough to post a message to a
272  *      subscriber.
273  *
274  *  \li stasis_message - Short to intermediate lifetimes, but that is mostly
275  *      irrelevant. Messages are strictly data and have no behavior associated
276  *      with them, so it doesn't really matter if/when they are destroyed. By
277  *      design, a component could hold a ref to a message forever without any
278  *      ill consequences (aside from consuming more memory).
279  *
280  *  \li stasis_message_type - Long life cycles, typically only destroyed on
281  *      module unloading or _clean_ process exit.
282  *
283  * \par Subscriber shutdown sequencing
284  *
285  * Subscribers are sensitive to shutdown sequencing, specifically in how the
286  * reference message types. This is fully detailed on the wiki at
287  * https://wiki.asterisk.org/wiki/x/K4BqAQ.
288  *
289  * In short, the lifetime of the \a data (and \a callback, if in a module) must
290  * be held until the stasis_subscription_final_message() has been received.
291  * Depending on the structure of the subscriber code, this can be handled by
292  * using stasis_subscription_final_message() to free resources on the final
293  * message, or using stasis_subscription_join()/stasis_unsubscribe_and_join() to
294  * block until the unsubscribe has completed.
295  */
296
297 /*! Initial size of the subscribers list. */
298 #define INITIAL_SUBSCRIBERS_MAX 4
299
300 /*! The number of buckets to use for topic pools */
301 #define TOPIC_POOL_BUCKETS 57
302
303 /*! Thread pool for topics that don't want a dedicated taskprocessor */
304 static struct ast_threadpool *pool;
305
306 STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type);
307
308 #if defined(LOW_MEMORY)
309
310 #define TOPIC_ALL_BUCKETS 257
311
312 #else
313
314 #define TOPIC_ALL_BUCKETS 997
315
316 #endif
317
318 #ifdef AST_DEVMODE
319
320 /*! The number of buckets to use for topic statistics */
321 #define TOPIC_STATISTICS_BUCKETS 57
322
323 /*! The number of buckets to use for subscription statistics */
324 #define SUBSCRIPTION_STATISTICS_BUCKETS 57
325
326 /*! Container which stores statistics for topics */
327 static struct ao2_container *topic_statistics;
328
329 /*! Container which stores statistics for subscriptions */
330 static struct ao2_container *subscription_statistics;
331
332 /*! \internal */
333 struct stasis_message_type_statistics {
334         /*! \brief The number of messages of this published */
335         int published;
336         /*! \brief The number of messages of this that did not reach a subscriber */
337         int unused;
338         /*! \brief The stasis message type */
339         struct stasis_message_type *message_type;
340 };
341
342 /*! Lock to protect the message types vector */
343 AST_MUTEX_DEFINE_STATIC(message_type_statistics_lock);
344
345 /*! Vector containing message type information */
346 static AST_VECTOR(, struct stasis_message_type_statistics) message_type_statistics;
347
348 /*! \internal */
349 struct stasis_topic_statistics {
350         /*! \brief Highest time spent dispatching messages to subscribers */
351         long highest_time_dispatched;
352         /*! \brief Lowest time spent dispatching messages to subscribers */
353         long lowest_time_dispatched;
354         /*! \brief The number of messages that were not dispatched to any subscriber */
355         int messages_not_dispatched;
356         /*! \brief The number of messages that were dispatched to at least 1 subscriber */
357         int messages_dispatched;
358         /*! \brief The ids of the subscribers to this topic */
359         struct ao2_container *subscribers;
360         /*! \brief Pointer to the topic (NOT refcounted, and must NOT be accessed) */
361         struct stasis_topic *topic;
362         /*! \brief Name of the topic */
363         char name[0];
364 };
365 #endif
366
367 /*! \internal */
368 struct stasis_topic {
369         /*! Variable length array of the subscribers */
370         AST_VECTOR(, struct stasis_subscription *) subscribers;
371
372         /*! Topics forwarding into this topic */
373         AST_VECTOR(, struct stasis_topic *) upstream_topics;
374
375 #ifdef AST_DEVMODE
376         struct stasis_topic_statistics *statistics;
377 #endif
378
379         /*! Unique incrementing integer for subscriber ids */
380         int subscriber_id;
381
382         /*! Name of the topic */
383         char *name;
384
385         /*! Detail of the topic */
386         char *detail;
387
388         /*! Creation time */
389         struct timeval *creationtime;
390 };
391
392 struct ao2_container *topic_all;
393
394 struct topic_proxy {
395         AO2_WEAKPROXY();
396
397         char *name;
398         char *detail;
399
400         struct timeval creationtime;
401
402         char buf[0];
403 };
404
405 AO2_STRING_FIELD_HASH_FN(topic_proxy, name);
406 AO2_STRING_FIELD_CMP_FN(topic_proxy, name);
407 AO2_STRING_FIELD_CASE_SORT_FN(topic_proxy, name);
408
409 static void proxy_dtor(void *weakproxy, void *data)
410 {
411         ao2_unlink(topic_all, weakproxy);
412 }
413
414 /* Forward declarations for the tightly-coupled subscription object */
415 static int topic_add_subscription(struct stasis_topic *topic,
416         struct stasis_subscription *sub);
417
418 static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
419
420 /*! \brief Lock two topics. */
421 #define topic_lock_both(topic1, topic2) \
422         do { \
423                 ao2_lock(topic1); \
424                 while (ao2_trylock(topic2)) { \
425                         AO2_DEADLOCK_AVOIDANCE(topic1); \
426                 } \
427         } while (0)
428
429 static void topic_dtor(void *obj)
430 {
431         struct stasis_topic *topic = obj;
432
433         ast_debug(2, "Destroying topic. name: %s, detail: %s\n",
434                         topic->name, topic->detail);
435
436         /* Subscribers hold a reference to topics, so they should all be
437          * unsubscribed before we get here. */
438         ast_assert(AST_VECTOR_SIZE(&topic->subscribers) == 0);
439
440         AST_VECTOR_FREE(&topic->subscribers);
441         AST_VECTOR_FREE(&topic->upstream_topics);
442         ast_debug(1, "Topic '%s': %p destroyed\n", topic->name, topic);
443
444 #ifdef AST_DEVMODE
445         if (topic->statistics) {
446                 ao2_unlink(topic_statistics, topic->statistics);
447                 ao2_ref(topic->statistics, -1);
448         }
449 #endif
450 }
451
452 #ifdef AST_DEVMODE
453 static void topic_statistics_destroy(void *obj)
454 {
455         struct stasis_topic_statistics *statistics = obj;
456
457         ao2_cleanup(statistics->subscribers);
458 }
459
460 static struct stasis_topic_statistics *stasis_topic_statistics_create(struct stasis_topic *topic)
461 {
462         struct stasis_topic_statistics *statistics;
463
464         statistics = ao2_alloc(sizeof(*statistics) + strlen(topic->name) + 1, topic_statistics_destroy);
465         if (!statistics) {
466                 return NULL;
467         }
468
469         statistics->subscribers = ast_str_container_alloc(1);
470         if (!statistics->subscribers) {
471                 ao2_ref(statistics, -1);
472                 return NULL;
473         }
474
475         /* This is strictly used for the pointer address when showing the topic */
476         statistics->topic = topic;
477         strcpy(statistics->name, topic->name); /* SAFE */
478         ao2_link(topic_statistics, statistics);
479
480         return statistics;
481 }
482 #endif
483
484 static int link_topic_proxy(struct stasis_topic *topic, const char *name, const char *detail)
485 {
486         struct topic_proxy *proxy;
487         struct stasis_topic* topic_tmp;
488
489         if (!topic || !name || !strlen(name) || !detail) {
490                 return -1;
491         }
492
493         ao2_wrlock(topic_all);
494
495         topic_tmp = stasis_topic_get(name);
496         if (topic_tmp) {
497                 ast_log(LOG_ERROR, "The same topic is already exist. name: %s\n", name);
498                 ao2_ref(topic_tmp, -1);
499                 ao2_unlock(topic_all);
500
501                 return -1;
502         }
503
504         proxy = ao2_t_weakproxy_alloc(
505                         sizeof(*proxy) + strlen(name) + 1 + strlen(detail) + 1, NULL, topic->name);
506         if (!proxy) {
507                 ao2_unlock(topic_all);
508
509                 return -1;
510         }
511
512         /* set the proxy info */
513         proxy->name = proxy->buf;
514         proxy->detail = proxy->name + strlen(name) + 1;
515
516         strcpy(proxy->name, name); /* SAFE */
517         strcpy(proxy->detail, detail); /* SAFE */
518         proxy->creationtime = ast_tvnow();
519
520         /* We have exclusive access to proxy, no need for locking here. */
521         if (ao2_t_weakproxy_set_object(proxy, topic, OBJ_NOLOCK, "weakproxy link")) {
522                 ao2_cleanup(proxy);
523                 ao2_unlock(topic_all);
524
525                 return -1;
526         }
527
528         if (ao2_weakproxy_subscribe(proxy, proxy_dtor, NULL, OBJ_NOLOCK)) {
529                 ao2_cleanup(proxy);
530                 ao2_unlock(topic_all);
531
532                 return -1;
533         }
534
535         /* setting the topic point to the proxy */
536         topic->name = proxy->name;
537         topic->detail = proxy->detail;
538         topic->creationtime = &(proxy->creationtime);
539
540         ao2_link_flags(topic_all, proxy, OBJ_NOLOCK);
541         ao2_ref(proxy, -1);
542
543         ao2_unlock(topic_all);
544
545         return 0;
546 }
547
548 struct stasis_topic *stasis_topic_create_with_detail(
549                 const char *name, const char* detail
550                 )
551 {
552         struct stasis_topic *topic;
553         int res = 0;
554
555         if (!name|| !strlen(name) || !detail) {
556                 return NULL;
557         }
558         ast_debug(2, "Creating topic. name: %s, detail: %s\n", name, detail);
559
560         topic = stasis_topic_get(name);
561         if (topic) {
562                 ast_debug(2, "Topic is already exist. name: %s, detail: %s\n",
563                                 name, detail);
564                 return topic;
565         }
566
567         topic = ao2_t_alloc(sizeof(*topic), topic_dtor, name);
568         if (!topic) {
569                 return NULL;
570         }
571
572         res |= AST_VECTOR_INIT(&topic->subscribers, INITIAL_SUBSCRIBERS_MAX);
573         res |= AST_VECTOR_INIT(&topic->upstream_topics, 0);
574         if (res) {
575                 ao2_ref(topic, -1);
576                 return NULL;
577         }
578
579         /* link to the proxy */
580         if (link_topic_proxy(topic, name, detail)) {
581                 ao2_ref(topic, -1);
582                 return NULL;
583         }
584
585 #ifdef AST_DEVMODE
586         topic->statistics = stasis_topic_statistics_create(topic);
587         if (!topic->statistics) {
588                 ao2_ref(topic, -1);
589                 return NULL;
590         }
591 #endif
592         ast_debug(1, "Topic '%s': %p created\n", topic->name, topic);
593
594         return topic;
595 }
596
597 struct stasis_topic *stasis_topic_create(const char *name)
598 {
599         return stasis_topic_create_with_detail(name, "");
600 }
601
602 struct stasis_topic *stasis_topic_get(const char *name)
603 {
604         return ao2_weakproxy_find(topic_all, name, OBJ_SEARCH_KEY, "");
605 }
606
607 const char *stasis_topic_name(const struct stasis_topic *topic)
608 {
609         if (!topic) {
610                 return NULL;
611         }
612         return topic->name;
613 }
614
615 const char *stasis_topic_detail(const struct stasis_topic *topic)
616 {
617         if (!topic) {
618                 return NULL;
619         }
620         return topic->detail;
621 }
622
623 size_t stasis_topic_subscribers(const struct stasis_topic *topic)
624 {
625         return AST_VECTOR_SIZE(&topic->subscribers);
626 }
627
628 #ifdef AST_DEVMODE
629 struct stasis_subscription_statistics {
630         /*! \brief The filename where the subscription originates */
631         const char *file;
632         /*! \brief The function where the subscription originates */
633         const char *func;
634         /*! \brief Names of the topics we are subscribed to */
635         struct ao2_container *topics;
636         /*! \brief The message type that currently took the longest to process */
637         struct stasis_message_type *highest_time_message_type;
638         /*! \brief Highest time spent invoking a message */
639         long highest_time_invoked;
640         /*! \brief Lowest time spent invoking a message */
641         long lowest_time_invoked;
642         /*! \brief The number of messages that were filtered out */
643         int messages_dropped;
644         /*! \brief The number of messages that passed filtering */
645         int messages_passed;
646         /*! \brief Using a mailbox to queue messages */
647         int uses_mailbox;
648         /*! \brief Using stasis threadpool for handling messages */
649         int uses_threadpool;
650         /*! \brief The line number where the subscription originates */
651         int lineno;
652         /*! \brief Pointer to the subscription (NOT refcounted, and must NOT be accessed) */
653         struct stasis_subscription *sub;
654         /*! \brief Unique ID of the subscription */
655         char uniqueid[0];
656 };
657 #endif
658
659 /*! \internal */
660 struct stasis_subscription {
661         /*! Unique ID for this subscription */
662         char *uniqueid;
663         /*! Topic subscribed to. */
664         struct stasis_topic *topic;
665         /*! Mailbox for processing incoming messages. */
666         struct ast_taskprocessor *mailbox;
667         /*! Callback function for incoming message processing. */
668         stasis_subscription_cb callback;
669         /*! Data pointer to be handed to the callback. */
670         void *data;
671
672         /*! Condition for joining with subscription. */
673         ast_cond_t join_cond;
674         /*! Flag set when final message for sub has been received.
675          *  Be sure join_lock is held before reading/setting. */
676         int final_message_rxed;
677         /*! Flag set when final message for sub has been processed.
678          *  Be sure join_lock is held before reading/setting. */
679         int final_message_processed;
680
681         /*! The message types this subscription is accepting */
682         AST_VECTOR(, char) accepted_message_types;
683         /*! The message formatters this subscription is accepting */
684         enum stasis_subscription_message_formatters accepted_formatters;
685         /*! The message filter currently in use */
686         enum stasis_subscription_message_filter filter;
687
688 #ifdef AST_DEVMODE
689         /*! Statistics information */
690         struct stasis_subscription_statistics *statistics;
691 #endif
692 };
693
694 static void subscription_dtor(void *obj)
695 {
696         struct stasis_subscription *sub = obj;
697
698         /* Subscriptions need to be manually unsubscribed before destruction
699          * b/c there's a cyclic reference between topics and subscriptions */
700         ast_assert(!stasis_subscription_is_subscribed(sub));
701         /* If there are any messages in flight to this subscription; that would
702          * be bad. */
703         ast_assert(stasis_subscription_is_done(sub));
704
705         ast_free(sub->uniqueid);
706         ao2_cleanup(sub->topic);
707         sub->topic = NULL;
708         ast_taskprocessor_unreference(sub->mailbox);
709         sub->mailbox = NULL;
710         ast_cond_destroy(&sub->join_cond);
711
712         AST_VECTOR_FREE(&sub->accepted_message_types);
713
714 #ifdef AST_DEVMODE
715         if (sub->statistics) {
716                 ao2_unlink(subscription_statistics, sub->statistics);
717                 ao2_ref(sub->statistics, -1);
718         }
719 #endif
720 }
721
722 /*!
723  * \brief Invoke the subscription's callback.
724  * \param sub Subscription to invoke.
725  * \param topic Topic message was published to.
726  * \param message Message to send.
727  */
728 static void subscription_invoke(struct stasis_subscription *sub,
729                                   struct stasis_message *message)
730 {
731         unsigned int final = stasis_subscription_final_message(sub, message);
732         int message_type_id = stasis_message_type_id(stasis_subscription_change_type());
733 #ifdef AST_DEVMODE
734         struct timeval start;
735         long elapsed;
736
737         start = ast_tvnow();
738 #endif
739
740         /* Notify that the final message has been received */
741         if (final) {
742                 ao2_lock(sub);
743                 sub->final_message_rxed = 1;
744                 ast_cond_signal(&sub->join_cond);
745                 ao2_unlock(sub);
746         }
747
748         /*
749          * If filtering is turned on and this is a 'final' message, we only invoke the callback
750          * if the subscriber accepts subscription_change message types.
751          */
752         if (!final || sub->filter != STASIS_SUBSCRIPTION_FILTER_SELECTIVE ||
753                 (message_type_id < AST_VECTOR_SIZE(&sub->accepted_message_types) && AST_VECTOR_GET(&sub->accepted_message_types, message_type_id))) {
754                 /* Since sub is mostly immutable, no need to lock sub */
755                 sub->callback(sub->data, sub, message);
756         }
757
758         /* Notify that the final message has been processed */
759         if (final) {
760                 ao2_lock(sub);
761                 sub->final_message_processed = 1;
762                 ast_cond_signal(&sub->join_cond);
763                 ao2_unlock(sub);
764         }
765
766 #ifdef AST_DEVMODE
767         elapsed = ast_tvdiff_ms(ast_tvnow(), start);
768         if (elapsed > sub->statistics->highest_time_invoked) {
769                 sub->statistics->highest_time_invoked = elapsed;
770                 ao2_lock(sub->statistics);
771                 sub->statistics->highest_time_message_type = stasis_message_type(message);
772                 ao2_unlock(sub->statistics);
773         }
774         if (elapsed < sub->statistics->lowest_time_invoked) {
775                 sub->statistics->lowest_time_invoked = elapsed;
776         }
777 #endif
778 }
779
780 static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
781 static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
782
783 void stasis_subscription_cb_noop(void *data, struct stasis_subscription *sub, struct stasis_message *message)
784 {
785 }
786
787 #ifdef AST_DEVMODE
788 static void subscription_statistics_destroy(void *obj)
789 {
790         struct stasis_subscription_statistics *statistics = obj;
791
792         ao2_cleanup(statistics->topics);
793 }
794
795 static struct stasis_subscription_statistics *stasis_subscription_statistics_create(struct stasis_subscription *sub,
796         int needs_mailbox, int use_thread_pool, const char *file, int lineno,
797         const char *func)
798 {
799         struct stasis_subscription_statistics *statistics;
800
801         statistics = ao2_alloc(sizeof(*statistics) + strlen(sub->uniqueid) + 1, subscription_statistics_destroy);
802         if (!statistics) {
803                 return NULL;
804         }
805
806         statistics->topics = ast_str_container_alloc(1);
807         if (!statistics->topics) {
808                 ao2_ref(statistics, -1);
809                 return NULL;
810         }
811
812         statistics->file = file;
813         statistics->lineno = lineno;
814         statistics->func = func;
815         statistics->uses_mailbox = needs_mailbox;
816         statistics->uses_threadpool = use_thread_pool;
817         strcpy(statistics->uniqueid, sub->uniqueid); /* SAFE */
818         statistics->sub = sub;
819         ao2_link(subscription_statistics, statistics);
820
821         return statistics;
822 }
823 #endif
824
825 struct stasis_subscription *internal_stasis_subscribe(
826         struct stasis_topic *topic,
827         stasis_subscription_cb callback,
828         void *data,
829         int needs_mailbox,
830         int use_thread_pool,
831         const char *file,
832         int lineno,
833         const char *func)
834 {
835         struct stasis_subscription *sub;
836         int ret;
837
838         if (!topic) {
839                 return NULL;
840         }
841
842         /* The ao2 lock is used for join_cond. */
843         sub = ao2_t_alloc(sizeof(*sub), subscription_dtor, stasis_topic_name(topic));
844         if (!sub) {
845                 return NULL;
846         }
847
848 #ifdef AST_DEVMODE
849         ret = ast_asprintf(&sub->uniqueid, "%s:%s-%d", file, stasis_topic_name(topic), ast_atomic_fetchadd_int(&topic->subscriber_id, +1));
850         sub->statistics = stasis_subscription_statistics_create(sub, needs_mailbox, use_thread_pool, file, lineno, func);
851         if (ret < 0 || !sub->statistics) {
852                 ao2_ref(sub, -1);
853                 return NULL;
854         }
855 #else
856         ret = ast_asprintf(&sub->uniqueid, "%s-%d", stasis_topic_name(topic), ast_atomic_fetchadd_int(&topic->subscriber_id, +1));
857         if (ret < 0) {
858                 ao2_ref(sub, -1);
859                 return NULL;
860         }
861 #endif
862
863         if (needs_mailbox) {
864                 char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
865
866                 /* Create name with seq number appended. */
867                 ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "stasis/%c:%s",
868                         use_thread_pool ? 'p' : 'm',
869                         stasis_topic_name(topic));
870
871                 /*
872                  * With a small number of subscribers, a thread-per-sub is
873                  * acceptable. For a large number of subscribers, a thread
874                  * pool should be used.
875                  */
876                 if (use_thread_pool) {
877                         sub->mailbox = ast_threadpool_serializer(tps_name, pool);
878                 } else {
879                         sub->mailbox = ast_taskprocessor_get(tps_name, TPS_REF_DEFAULT);
880                 }
881                 if (!sub->mailbox) {
882                         ao2_ref(sub, -1);
883
884                         return NULL;
885                 }
886                 ast_taskprocessor_set_local(sub->mailbox, sub);
887                 /* Taskprocessor has a reference */
888                 ao2_ref(sub, +1);
889         }
890
891         ao2_ref(topic, +1);
892         sub->topic = topic;
893         sub->callback = callback;
894         sub->data = data;
895         ast_cond_init(&sub->join_cond, NULL);
896         sub->filter = STASIS_SUBSCRIPTION_FILTER_NONE;
897         AST_VECTOR_INIT(&sub->accepted_message_types, 0);
898         sub->accepted_formatters = STASIS_SUBSCRIPTION_FORMATTER_NONE;
899
900         if (topic_add_subscription(topic, sub) != 0) {
901                 ao2_ref(sub, -1);
902                 ao2_ref(topic, -1);
903
904                 return NULL;
905         }
906         send_subscription_subscribe(topic, sub);
907
908         return sub;
909 }
910
911 struct stasis_subscription *__stasis_subscribe(
912         struct stasis_topic *topic,
913         stasis_subscription_cb callback,
914         void *data,
915         const char *file,
916         int lineno,
917         const char *func)
918 {
919         return internal_stasis_subscribe(topic, callback, data, 1, 0, file, lineno, func);
920 }
921
922 struct stasis_subscription *__stasis_subscribe_pool(
923         struct stasis_topic *topic,
924         stasis_subscription_cb callback,
925         void *data,
926         const char *file,
927         int lineno,
928         const char *func)
929 {
930         return internal_stasis_subscribe(topic, callback, data, 1, 1, file, lineno, func);
931 }
932
933 static int sub_cleanup(void *data)
934 {
935         struct stasis_subscription *sub = data;
936         ao2_cleanup(sub);
937         return 0;
938 }
939
940 struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
941 {
942         /* The subscription may be the last ref to this topic. Hold
943          * the topic ref open until after the unlock. */
944         struct stasis_topic *topic;
945
946         if (!sub) {
947                 return NULL;
948         }
949
950         topic = ao2_bump(sub->topic);
951
952         /* We have to remove the subscription first, to ensure the unsubscribe
953          * is the final message */
954         if (topic_remove_subscription(sub->topic, sub) != 0) {
955                 ast_log(LOG_ERROR,
956                         "Internal error: subscription has invalid topic\n");
957                 ao2_cleanup(topic);
958
959                 return NULL;
960         }
961
962         /* Now let everyone know about the unsubscribe */
963         send_subscription_unsubscribe(topic, sub);
964
965         /* When all that's done, remove the ref the mailbox has on the sub */
966         if (sub->mailbox) {
967                 if (ast_taskprocessor_push(sub->mailbox, sub_cleanup, sub)) {
968                         /* Nothing we can do here, the conditional is just to keep
969                          * the compiler happy that we're not ignoring the result. */
970                 }
971         }
972
973         /* Unsubscribing unrefs the subscription */
974         ao2_cleanup(sub);
975         ao2_cleanup(topic);
976
977         return NULL;
978 }
979
980 int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscription,
981         long low_water, long high_water)
982 {
983         int res = -1;
984
985         if (subscription) {
986                 res = ast_taskprocessor_alert_set_levels(subscription->mailbox,
987                         low_water, high_water);
988         }
989         return res;
990 }
991
992 int stasis_subscription_accept_message_type(struct stasis_subscription *subscription,
993         const struct stasis_message_type *type)
994 {
995         if (!subscription) {
996                 return -1;
997         }
998
999         ast_assert(type != NULL);
1000         ast_assert(stasis_message_type_name(type) != NULL);
1001
1002         if (!type || !stasis_message_type_name(type)) {
1003                 /* Filtering is unreliable as this message type is not yet initialized
1004                  * so force all messages through.
1005                  */
1006                 subscription->filter = STASIS_SUBSCRIPTION_FILTER_FORCED_NONE;
1007                 return 0;
1008         }
1009
1010         ao2_lock(subscription->topic);
1011         if (AST_VECTOR_REPLACE(&subscription->accepted_message_types, stasis_message_type_id(type), 1)) {
1012                 /* We do this for the same reason as above. The subscription can still operate, so allow
1013                  * it to do so by forcing all messages through.
1014                  */
1015                 subscription->filter = STASIS_SUBSCRIPTION_FILTER_FORCED_NONE;
1016         }
1017         ao2_unlock(subscription->topic);
1018
1019         return 0;
1020 }
1021
1022 int stasis_subscription_decline_message_type(struct stasis_subscription *subscription,
1023         const struct stasis_message_type *type)
1024 {
1025         if (!subscription) {
1026                 return -1;
1027         }
1028
1029         ast_assert(type != NULL);
1030         ast_assert(stasis_message_type_name(type) != NULL);
1031
1032         if (!type || !stasis_message_type_name(type)) {
1033                 return 0;
1034         }
1035
1036         ao2_lock(subscription->topic);
1037         if (stasis_message_type_id(type) < AST_VECTOR_SIZE(&subscription->accepted_message_types)) {
1038                 /* The memory is already allocated so this can't fail */
1039                 AST_VECTOR_REPLACE(&subscription->accepted_message_types, stasis_message_type_id(type), 0);
1040         }
1041         ao2_unlock(subscription->topic);
1042
1043         return 0;
1044 }
1045
1046 int stasis_subscription_set_filter(struct stasis_subscription *subscription,
1047         enum stasis_subscription_message_filter filter)
1048 {
1049         if (!subscription) {
1050                 return -1;
1051         }
1052
1053         ao2_lock(subscription->topic);
1054         if (subscription->filter != STASIS_SUBSCRIPTION_FILTER_FORCED_NONE) {
1055                 subscription->filter = filter;
1056         }
1057         ao2_unlock(subscription->topic);
1058
1059         return 0;
1060 }
1061
1062 void stasis_subscription_accept_formatters(struct stasis_subscription *subscription,
1063         enum stasis_subscription_message_formatters formatters)
1064 {
1065         ast_assert(subscription != NULL);
1066
1067         ao2_lock(subscription->topic);
1068         subscription->accepted_formatters = formatters;
1069         ao2_unlock(subscription->topic);
1070
1071         return;
1072 }
1073
1074 void stasis_subscription_join(struct stasis_subscription *subscription)
1075 {
1076         if (subscription) {
1077                 ao2_lock(subscription);
1078                 /* Wait until the processed flag has been set */
1079                 while (!subscription->final_message_processed) {
1080                         ast_cond_wait(&subscription->join_cond,
1081                                 ao2_object_get_lockaddr(subscription));
1082                 }
1083                 ao2_unlock(subscription);
1084         }
1085 }
1086
1087 int stasis_subscription_is_done(struct stasis_subscription *subscription)
1088 {
1089         if (subscription) {
1090                 int ret;
1091
1092                 ao2_lock(subscription);
1093                 ret = subscription->final_message_rxed;
1094                 ao2_unlock(subscription);
1095
1096                 return ret;
1097         }
1098
1099         /* Null subscription is about as done as you can get */
1100         return 1;
1101 }
1102
1103 struct stasis_subscription *stasis_unsubscribe_and_join(
1104         struct stasis_subscription *subscription)
1105 {
1106         if (!subscription) {
1107                 return NULL;
1108         }
1109
1110         /* Bump refcount to hold it past the unsubscribe */
1111         ao2_ref(subscription, +1);
1112         stasis_unsubscribe(subscription);
1113         stasis_subscription_join(subscription);
1114         /* Now decrement the refcount back */
1115         ao2_cleanup(subscription);
1116         return NULL;
1117 }
1118
1119 int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
1120 {
1121         if (sub) {
1122                 size_t i;
1123                 struct stasis_topic *topic = sub->topic;
1124
1125                 ao2_lock(topic);
1126                 for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
1127                         if (AST_VECTOR_GET(&topic->subscribers, i) == sub) {
1128                                 ao2_unlock(topic);
1129                                 return 1;
1130                         }
1131                 }
1132                 ao2_unlock(topic);
1133         }
1134
1135         return 0;
1136 }
1137
1138 const char *stasis_subscription_uniqueid(const struct stasis_subscription *sub)
1139 {
1140         return sub->uniqueid;
1141 }
1142
1143 int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
1144 {
1145         struct stasis_subscription_change *change;
1146
1147         if (stasis_message_type(msg) != stasis_subscription_change_type()) {
1148                 return 0;
1149         }
1150
1151         change = stasis_message_data(msg);
1152         if (strcmp("Unsubscribe", change->description)) {
1153                 return 0;
1154         }
1155
1156         if (strcmp(stasis_subscription_uniqueid(sub), change->uniqueid)) {
1157                 return 0;
1158         }
1159
1160         return 1;
1161 }
1162
1163 /*!
1164  * \brief Add a subscriber to a topic.
1165  * \param topic Topic
1166  * \param sub Subscriber
1167  * \return 0 on success
1168  * \return Non-zero on error
1169  */
1170 static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
1171 {
1172         size_t idx;
1173
1174         ao2_lock(topic);
1175         /* The reference from the topic to the subscription is shared with
1176          * the owner of the subscription, which will explicitly unsubscribe
1177          * to release it.
1178          *
1179          * If we bumped the refcount here, the owner would have to unsubscribe
1180          * and cleanup, which is a bit awkward. */
1181         AST_VECTOR_APPEND(&topic->subscribers, sub);
1182
1183         for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
1184                 topic_add_subscription(
1185                         AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
1186         }
1187
1188 #ifdef AST_DEVMODE
1189         ast_str_container_add(topic->statistics->subscribers, stasis_subscription_uniqueid(sub));
1190         ast_str_container_add(sub->statistics->topics, stasis_topic_name(topic));
1191 #endif
1192
1193         ao2_unlock(topic);
1194
1195         return 0;
1196 }
1197
1198 static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
1199 {
1200         size_t idx;
1201         int res;
1202
1203         ao2_lock(topic);
1204         for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
1205                 topic_remove_subscription(
1206                         AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
1207         }
1208         res = AST_VECTOR_REMOVE_ELEM_UNORDERED(&topic->subscribers, sub,
1209                 AST_VECTOR_ELEM_CLEANUP_NOOP);
1210
1211 #ifdef AST_DEVMODE
1212         if (!res) {
1213                 ast_str_container_remove(topic->statistics->subscribers, stasis_subscription_uniqueid(sub));
1214                 ast_str_container_remove(sub->statistics->topics, stasis_topic_name(topic));
1215         }
1216 #endif
1217
1218         ao2_unlock(topic);
1219
1220         return res;
1221 }
1222
1223 /*!
1224  * \internal \brief Dispatch a message to a subscriber asynchronously
1225  * \param local \ref ast_taskprocessor_local object
1226  * \return 0
1227  */
1228 static int dispatch_exec_async(struct ast_taskprocessor_local *local)
1229 {
1230         struct stasis_subscription *sub = local->local_data;
1231         struct stasis_message *message = local->data;
1232
1233         subscription_invoke(sub, message);
1234         ao2_cleanup(message);
1235
1236         return 0;
1237 }
1238
1239 /*!
1240  * \internal \brief Data passed to \ref dispatch_exec_sync to synchronize
1241  * a published message to a subscriber
1242  */
1243 struct sync_task_data {
1244         ast_mutex_t lock;
1245         ast_cond_t cond;
1246         int complete;
1247         void *task_data;
1248 };
1249
1250 /*!
1251  * \internal \brief Dispatch a message to a subscriber synchronously
1252  * \param local \ref ast_taskprocessor_local object
1253  * \return 0
1254  */
1255 static int dispatch_exec_sync(struct ast_taskprocessor_local *local)
1256 {
1257         struct stasis_subscription *sub = local->local_data;
1258         struct sync_task_data *std = local->data;
1259         struct stasis_message *message = std->task_data;
1260
1261         subscription_invoke(sub, message);
1262         ao2_cleanup(message);
1263
1264         ast_mutex_lock(&std->lock);
1265         std->complete = 1;
1266         ast_cond_signal(&std->cond);
1267         ast_mutex_unlock(&std->lock);
1268
1269         return 0;
1270 }
1271
1272 /*!
1273  * \internal \brief Dispatch a message to a subscriber
1274  * \param sub The subscriber to dispatch to
1275  * \param message The message to send
1276  * \param synchronous If non-zero, synchronize on the subscriber receiving
1277  * the message
1278  * \retval 0 if message was not dispatched
1279  * \retval 1 if message was dispatched
1280  */
1281 static unsigned int dispatch_message(struct stasis_subscription *sub,
1282         struct stasis_message *message,
1283         int synchronous)
1284 {
1285         int is_final = stasis_subscription_final_message(sub, message);
1286
1287         /*
1288          * The 'do while' gives us an easy way to skip remaining logic once
1289          * we determine the message should be accepted.
1290          * The code looks more verbose than it needs to be but it optimizes
1291          * down very nicely.  It's just easier to understand and debug this way.
1292          */
1293         do {
1294                 struct stasis_message_type *message_type = stasis_message_type(message);
1295                 int type_id = stasis_message_type_id(message_type);
1296                 int type_filter_specified = 0;
1297                 int formatter_filter_specified = 0;
1298                 int type_filter_passed = 0;
1299                 int formatter_filter_passed = 0;
1300
1301                 /* We always accept final messages so only run the filter logic if not final */
1302                 if (is_final) {
1303                         break;
1304                 }
1305
1306                 type_filter_specified = sub->filter & STASIS_SUBSCRIPTION_FILTER_SELECTIVE;
1307                 formatter_filter_specified = sub->accepted_formatters != STASIS_SUBSCRIPTION_FORMATTER_NONE;
1308
1309                 /* Accept if no filters of either type were specified */
1310                 if (!type_filter_specified && !formatter_filter_specified) {
1311                         break;
1312                 }
1313
1314                 type_filter_passed = type_filter_specified
1315                         && type_id < AST_VECTOR_SIZE(&sub->accepted_message_types)
1316                         && AST_VECTOR_GET(&sub->accepted_message_types, type_id);
1317
1318                 /*
1319                  * Since the type and formatter filters are OR'd, we can skip
1320                  * the formatter check if the type check passes.
1321                  */
1322                 if (type_filter_passed) {
1323                         break;
1324                 }
1325
1326                 formatter_filter_passed = formatter_filter_specified
1327                         && (sub->accepted_formatters & stasis_message_type_available_formatters(message_type));
1328
1329                 if (formatter_filter_passed) {
1330                         break;
1331                 }
1332
1333 #ifdef AST_DEVMODE
1334                 ast_atomic_fetchadd_int(&sub->statistics->messages_dropped, +1);
1335 #endif
1336
1337                 return 0;
1338
1339         } while (0);
1340
1341 #ifdef AST_DEVMODE
1342         ast_atomic_fetchadd_int(&sub->statistics->messages_passed, +1);
1343 #endif
1344
1345         if (!sub->mailbox) {
1346                 /* Dispatch directly */
1347                 subscription_invoke(sub, message);
1348                 return 1;
1349         }
1350
1351         /* Bump the message for the taskprocessor push. This will get de-ref'd
1352          * by the task processor callback.
1353          */
1354         ao2_bump(message);
1355         if (!synchronous) {
1356                 if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec_async, message)) {
1357                         /* Push failed; ugh. */
1358                         ast_log(LOG_ERROR, "Dropping async dispatch\n");
1359                         ao2_cleanup(message);
1360                         return 0;
1361                 }
1362         } else {
1363                 struct sync_task_data std;
1364
1365                 ast_mutex_init(&std.lock);
1366                 ast_cond_init(&std.cond, NULL);
1367                 std.complete = 0;
1368                 std.task_data = message;
1369
1370                 if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec_sync, &std)) {
1371                         /* Push failed; ugh. */
1372                         ast_log(LOG_ERROR, "Dropping sync dispatch\n");
1373                         ao2_cleanup(message);
1374                         ast_mutex_destroy(&std.lock);
1375                         ast_cond_destroy(&std.cond);
1376                         return 0;
1377                 }
1378
1379                 ast_mutex_lock(&std.lock);
1380                 while (!std.complete) {
1381                         ast_cond_wait(&std.cond, &std.lock);
1382                 }
1383                 ast_mutex_unlock(&std.lock);
1384
1385                 ast_mutex_destroy(&std.lock);
1386                 ast_cond_destroy(&std.cond);
1387         }
1388
1389         return 1;
1390 }
1391
1392 /*!
1393  * \internal \brief Publish a message to a topic's subscribers
1394  * \brief topic The topic to publish to
1395  * \brief message The message to publish
1396  * \brief sync_sub An optional subscriber of the topic to publish synchronously
1397  * to
1398  */
1399 static void publish_msg(struct stasis_topic *topic,
1400         struct stasis_message *message, struct stasis_subscription *sync_sub)
1401 {
1402         size_t i;
1403         unsigned int dispatched = 0;
1404 #ifdef AST_DEVMODE
1405         int message_type_id = stasis_message_type_id(stasis_message_type(message));
1406         struct stasis_message_type_statistics *statistics;
1407         struct timeval start;
1408         long elapsed;
1409 #endif
1410
1411         ast_assert(topic != NULL);
1412         ast_assert(message != NULL);
1413
1414 #ifdef AST_DEVMODE
1415         ast_mutex_lock(&message_type_statistics_lock);
1416         if (message_type_id >= AST_VECTOR_SIZE(&message_type_statistics)) {
1417                 struct stasis_message_type_statistics new_statistics = {
1418                         .published = 0,
1419                 };
1420                 if (AST_VECTOR_REPLACE(&message_type_statistics, message_type_id, new_statistics)) {
1421                         ast_mutex_unlock(&message_type_statistics_lock);
1422                         return;
1423                 }
1424         }
1425         statistics = AST_VECTOR_GET_ADDR(&message_type_statistics, message_type_id);
1426         statistics->message_type = stasis_message_type(message);
1427         ast_mutex_unlock(&message_type_statistics_lock);
1428
1429         ast_atomic_fetchadd_int(&statistics->published, +1);
1430 #endif
1431
1432         /* If there are no subscribers don't bother */
1433         if (!stasis_topic_subscribers(topic)) {
1434 #ifdef AST_DEVMODE
1435                 ast_atomic_fetchadd_int(&statistics->unused, +1);
1436                 ast_atomic_fetchadd_int(&topic->statistics->messages_not_dispatched, +1);
1437 #endif
1438                 return;
1439         }
1440
1441         /*
1442          * The topic may be unref'ed by the subscription invocation.
1443          * Make sure we hold onto a reference while dispatching.
1444          */
1445         ao2_ref(topic, +1);
1446 #ifdef AST_DEVMODE
1447         start = ast_tvnow();
1448 #endif
1449         ao2_lock(topic);
1450         for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
1451                 struct stasis_subscription *sub = AST_VECTOR_GET(&topic->subscribers, i);
1452
1453                 ast_assert(sub != NULL);
1454
1455                 dispatched += dispatch_message(sub, message, (sub == sync_sub));
1456         }
1457         ao2_unlock(topic);
1458
1459 #ifdef AST_DEVMODE
1460         elapsed = ast_tvdiff_ms(ast_tvnow(), start);
1461         if (elapsed > topic->statistics->highest_time_dispatched) {
1462                 topic->statistics->highest_time_dispatched = elapsed;
1463         }
1464         if (elapsed < topic->statistics->lowest_time_dispatched) {
1465                 topic->statistics->lowest_time_dispatched = elapsed;
1466         }
1467         if (dispatched) {
1468                 ast_atomic_fetchadd_int(&topic->statistics->messages_dispatched, +1);
1469         } else {
1470                 ast_atomic_fetchadd_int(&statistics->unused, +1);
1471                 ast_atomic_fetchadd_int(&topic->statistics->messages_not_dispatched, +1);
1472         }
1473 #endif
1474
1475         ao2_ref(topic, -1);
1476 }
1477
1478 void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
1479 {
1480         publish_msg(topic, message, NULL);
1481 }
1482
1483 void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message)
1484 {
1485         ast_assert(sub != NULL);
1486
1487         publish_msg(sub->topic, message, sub);
1488 }
1489
1490 /*!
1491  * \brief Forwarding information
1492  *
1493  * Any message posted to \a from_topic is forwarded to \a to_topic.
1494  *
1495  * In cases where both the \a from_topic and \a to_topic need to be locked,
1496  * always lock the \a to_topic first, then the \a from_topic. Lest you deadlock.
1497  */
1498 struct stasis_forward {
1499         /*! Originating topic */
1500         struct stasis_topic *from_topic;
1501         /*! Destination topic */
1502         struct stasis_topic *to_topic;
1503 };
1504
1505 static void forward_dtor(void *obj)
1506 {
1507         struct stasis_forward *forward = obj;
1508
1509         ao2_cleanup(forward->from_topic);
1510         forward->from_topic = NULL;
1511         ao2_cleanup(forward->to_topic);
1512         forward->to_topic = NULL;
1513 }
1514
1515 struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward)
1516 {
1517         int idx;
1518         struct stasis_topic *from;
1519         struct stasis_topic *to;
1520
1521         if (!forward) {
1522                 return NULL;
1523         }
1524
1525         from = forward->from_topic;
1526         to = forward->to_topic;
1527
1528         if (from && to) {
1529                 topic_lock_both(to, from);
1530                 AST_VECTOR_REMOVE_ELEM_UNORDERED(&to->upstream_topics, from,
1531                         AST_VECTOR_ELEM_CLEANUP_NOOP);
1532
1533                 for (idx = 0; idx < AST_VECTOR_SIZE(&to->subscribers); ++idx) {
1534                         topic_remove_subscription(from, AST_VECTOR_GET(&to->subscribers, idx));
1535                 }
1536                 ao2_unlock(from);
1537                 ao2_unlock(to);
1538         }
1539
1540         ao2_cleanup(forward);
1541
1542         return NULL;
1543 }
1544
1545 struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic,
1546         struct stasis_topic *to_topic)
1547 {
1548         int res;
1549         size_t idx;
1550         struct stasis_forward *forward;
1551
1552         if (!from_topic || !to_topic) {
1553                 return NULL;
1554         }
1555
1556         forward = ao2_alloc_options(sizeof(*forward), forward_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
1557         if (!forward) {
1558                 return NULL;
1559         }
1560
1561         /* Forwards to ourselves are implicit. */
1562         if (to_topic == from_topic) {
1563                 return forward;
1564         }
1565
1566         forward->from_topic = ao2_bump(from_topic);
1567         forward->to_topic = ao2_bump(to_topic);
1568
1569         topic_lock_both(to_topic, from_topic);
1570         res = AST_VECTOR_APPEND(&to_topic->upstream_topics, from_topic);
1571         if (res != 0) {
1572                 ao2_unlock(from_topic);
1573                 ao2_unlock(to_topic);
1574                 ao2_ref(forward, -1);
1575                 return NULL;
1576         }
1577
1578         for (idx = 0; idx < AST_VECTOR_SIZE(&to_topic->subscribers); ++idx) {
1579                 topic_add_subscription(from_topic, AST_VECTOR_GET(&to_topic->subscribers, idx));
1580         }
1581         ao2_unlock(from_topic);
1582         ao2_unlock(to_topic);
1583
1584         return forward;
1585 }
1586
1587 static void subscription_change_dtor(void *obj)
1588 {
1589         struct stasis_subscription_change *change = obj;
1590
1591         ao2_cleanup(change->topic);
1592 }
1593
1594 static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, const char *uniqueid, const char *description)
1595 {
1596         size_t description_len = strlen(description) + 1;
1597         struct stasis_subscription_change *change;
1598
1599         change = ao2_alloc_options(sizeof(*change) + description_len + strlen(uniqueid) + 1,
1600                 subscription_change_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
1601         if (!change) {
1602                 return NULL;
1603         }
1604
1605         strcpy(change->description, description); /* SAFE */
1606         change->uniqueid = change->description + description_len;
1607         strcpy(change->uniqueid, uniqueid); /* SAFE */
1608         ao2_ref(topic, +1);
1609         change->topic = topic;
1610
1611         return change;
1612 }
1613
1614 static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
1615 {
1616         struct stasis_subscription_change *change;
1617         struct stasis_message *msg;
1618
1619         /* This assumes that we have already unsubscribed */
1620         ast_assert(stasis_subscription_is_subscribed(sub));
1621
1622         if (!stasis_subscription_change_type()) {
1623                 return;
1624         }
1625
1626         change = subscription_change_alloc(topic, sub->uniqueid, "Subscribe");
1627         if (!change) {
1628                 return;
1629         }
1630
1631         msg = stasis_message_create(stasis_subscription_change_type(), change);
1632         if (!msg) {
1633                 ao2_cleanup(change);
1634                 return;
1635         }
1636
1637         stasis_publish(topic, msg);
1638         ao2_cleanup(msg);
1639         ao2_cleanup(change);
1640 }
1641
1642 static void send_subscription_unsubscribe(struct stasis_topic *topic,
1643         struct stasis_subscription *sub)
1644 {
1645         struct stasis_subscription_change *change;
1646         struct stasis_message *msg;
1647
1648         /* This assumes that we have already unsubscribed */
1649         ast_assert(!stasis_subscription_is_subscribed(sub));
1650
1651         if (!stasis_subscription_change_type()) {
1652                 return;
1653         }
1654
1655         change = subscription_change_alloc(topic, sub->uniqueid, "Unsubscribe");
1656         if (!change) {
1657                 return;
1658         }
1659
1660         msg = stasis_message_create(stasis_subscription_change_type(), change);
1661         if (!msg) {
1662                 ao2_cleanup(change);
1663                 return;
1664         }
1665
1666         stasis_publish(topic, msg);
1667
1668         /* Now we have to dispatch to the subscription itself */
1669         dispatch_message(sub, msg, 0);
1670
1671         ao2_cleanup(msg);
1672         ao2_cleanup(change);
1673 }
1674
1675 struct topic_pool_entry {
1676         struct stasis_forward *forward;
1677         struct stasis_topic *topic;
1678         char name[0];
1679 };
1680
1681 static void topic_pool_entry_dtor(void *obj)
1682 {
1683         struct topic_pool_entry *entry = obj;
1684
1685         entry->forward = stasis_forward_cancel(entry->forward);
1686         ao2_cleanup(entry->topic);
1687         entry->topic = NULL;
1688 }
1689
1690 static struct topic_pool_entry *topic_pool_entry_alloc(const char *topic_name)
1691 {
1692         struct topic_pool_entry *topic_pool_entry;
1693
1694         topic_pool_entry = ao2_alloc_options(sizeof(*topic_pool_entry) + strlen(topic_name) + 1,
1695                 topic_pool_entry_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
1696         if (!topic_pool_entry) {
1697                 return NULL;
1698         }
1699
1700         strcpy(topic_pool_entry->name, topic_name); /* Safe */
1701
1702         return topic_pool_entry;
1703 }
1704
1705 struct stasis_topic_pool {
1706         struct ao2_container *pool_container;
1707         struct stasis_topic *pool_topic;
1708 };
1709
1710 static void topic_pool_dtor(void *obj)
1711 {
1712         struct stasis_topic_pool *pool = obj;
1713
1714 #ifdef AO2_DEBUG
1715         {
1716                 char *container_name =
1717                         ast_alloca(strlen(stasis_topic_name(pool->pool_topic)) + strlen("-pool") + 1);
1718                 sprintf(container_name, "%s-pool", stasis_topic_name(pool->pool_topic));
1719                 ao2_container_unregister(container_name);
1720         }
1721 #endif
1722
1723         ao2_cleanup(pool->pool_container);
1724         pool->pool_container = NULL;
1725         ao2_cleanup(pool->pool_topic);
1726         pool->pool_topic = NULL;
1727 }
1728
1729 static int topic_pool_entry_hash(const void *obj, const int flags)
1730 {
1731         const struct topic_pool_entry *object;
1732         const char *key;
1733
1734         switch (flags & OBJ_SEARCH_MASK) {
1735         case OBJ_SEARCH_KEY:
1736                 key = obj;
1737                 break;
1738         case OBJ_SEARCH_OBJECT:
1739                 object = obj;
1740                 key = object->name;
1741                 break;
1742         default:
1743                 /* Hash can only work on something with a full key. */
1744                 ast_assert(0);
1745                 return 0;
1746         }
1747         return ast_str_case_hash(key);
1748 }
1749
1750 static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
1751 {
1752         const struct topic_pool_entry *object_left = obj;
1753         const struct topic_pool_entry *object_right = arg;
1754         const char *right_key = arg;
1755         int cmp;
1756
1757         switch (flags & OBJ_SEARCH_MASK) {
1758         case OBJ_SEARCH_OBJECT:
1759                 right_key = object_right->name;
1760                 /* Fall through */
1761         case OBJ_SEARCH_KEY:
1762                 cmp = strcasecmp(object_left->name, right_key);
1763                 break;
1764         case OBJ_SEARCH_PARTIAL_KEY:
1765                 /* Not supported by container */
1766                 ast_assert(0);
1767                 cmp = -1;
1768                 break;
1769         default:
1770                 /*
1771                  * What arg points to is specific to this traversal callback
1772                  * and has no special meaning to astobj2.
1773                  */
1774                 cmp = 0;
1775                 break;
1776         }
1777         if (cmp) {
1778                 return 0;
1779         }
1780         /*
1781          * At this point the traversal callback is identical to a sorted
1782          * container.
1783          */
1784         return CMP_MATCH;
1785 }
1786
1787 #ifdef AO2_DEBUG
1788 static void topic_pool_prnt_obj(void *v_obj, void *where, ao2_prnt_fn *prnt)
1789 {
1790         struct topic_pool_entry *entry = v_obj;
1791
1792         if (!entry) {
1793                 return;
1794         }
1795         prnt(where, "%s", stasis_topic_name(entry->topic));
1796 }
1797 #endif
1798
1799 struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic)
1800 {
1801         struct stasis_topic_pool *pool;
1802
1803         pool = ao2_alloc_options(sizeof(*pool), topic_pool_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
1804         if (!pool) {
1805                 return NULL;
1806         }
1807
1808         pool->pool_container = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
1809                 TOPIC_POOL_BUCKETS, topic_pool_entry_hash, NULL, topic_pool_entry_cmp);
1810         if (!pool->pool_container) {
1811                 ao2_cleanup(pool);
1812                 return NULL;
1813         }
1814
1815 #ifdef AO2_DEBUG
1816         {
1817                 char *container_name =
1818                         ast_alloca(strlen(stasis_topic_name(pooled_topic)) + strlen("-pool") + 1);
1819                 sprintf(container_name, "%s-pool", stasis_topic_name(pooled_topic));
1820                 ao2_container_register(container_name, pool->pool_container, topic_pool_prnt_obj);
1821         }
1822 #endif
1823
1824         ao2_ref(pooled_topic, +1);
1825         pool->pool_topic = pooled_topic;
1826
1827         return pool;
1828 }
1829
1830 void stasis_topic_pool_delete_topic(struct stasis_topic_pool *pool, const char *topic_name)
1831 {
1832         ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY | OBJ_NODATA | OBJ_UNLINK);
1833 }
1834
1835 struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
1836 {
1837         RAII_VAR(struct topic_pool_entry *, topic_pool_entry, NULL, ao2_cleanup);
1838         SCOPED_AO2LOCK(topic_container_lock, pool->pool_container);
1839         char *new_topic_name;
1840         int ret;
1841
1842         topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1843         if (topic_pool_entry) {
1844                 return topic_pool_entry->topic;
1845         }
1846
1847         topic_pool_entry = topic_pool_entry_alloc(topic_name);
1848         if (!topic_pool_entry) {
1849                 return NULL;
1850         }
1851
1852         /* To provide further detail and to ensure that the topic is unique within the scope of the
1853          * system we prefix it with the pooling topic name, which should itself already be unique.
1854          */
1855         ret = ast_asprintf(&new_topic_name, "%s/%s", stasis_topic_name(pool->pool_topic), topic_name);
1856         if (ret < 0) {
1857                 return NULL;
1858         }
1859
1860         topic_pool_entry->topic = stasis_topic_create(new_topic_name);
1861         ast_free(new_topic_name);
1862         if (!topic_pool_entry->topic) {
1863                 return NULL;
1864         }
1865
1866         topic_pool_entry->forward = stasis_forward_all(topic_pool_entry->topic, pool->pool_topic);
1867         if (!topic_pool_entry->forward) {
1868                 return NULL;
1869         }
1870
1871         if (!ao2_link_flags(pool->pool_container, topic_pool_entry, OBJ_NOLOCK)) {
1872                 return NULL;
1873         }
1874
1875         return topic_pool_entry->topic;
1876 }
1877
1878 int stasis_topic_pool_topic_exists(const struct stasis_topic_pool *pool, const char *topic_name)
1879 {
1880         struct topic_pool_entry *topic_pool_entry;
1881
1882         topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY);
1883         if (!topic_pool_entry) {
1884                 return 0;
1885         }
1886
1887         ao2_ref(topic_pool_entry, -1);
1888         return 1;
1889 }
1890
1891 void stasis_log_bad_type_access(const char *name)
1892 {
1893 #ifdef AST_DEVMODE
1894         if (!stasis_message_type_declined(name)) {
1895                 ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name);
1896         }
1897 #endif
1898 }
1899
1900 /*! \brief A multi object blob data structure to carry user event stasis messages */
1901 struct ast_multi_object_blob {
1902         struct ast_json *blob;                             /*< A blob of JSON data */
1903         AST_VECTOR(, void *) snapshots[STASIS_UMOS_MAX];   /*< Vector of snapshots for each type */
1904 };
1905
1906 /*!
1907  * \internal
1908  * \brief Destructor for \ref ast_multi_object_blob objects
1909  */
1910 static void multi_object_blob_dtor(void *obj)
1911 {
1912         struct ast_multi_object_blob *multi = obj;
1913         int type;
1914         int i;
1915
1916         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1917                 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
1918                         ao2_cleanup(AST_VECTOR_GET(&multi->snapshots[type], i));
1919                 }
1920                 AST_VECTOR_FREE(&multi->snapshots[type]);
1921         }
1922         ast_json_unref(multi->blob);
1923 }
1924
1925 /*! \brief Create a stasis user event multi object blob */
1926 struct ast_multi_object_blob *ast_multi_object_blob_create(struct ast_json *blob)
1927 {
1928         int type;
1929         struct ast_multi_object_blob *multi;
1930
1931         ast_assert(blob != NULL);
1932
1933         multi = ao2_alloc(sizeof(*multi), multi_object_blob_dtor);
1934         if (!multi) {
1935                 return NULL;
1936         }
1937
1938         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1939                 if (AST_VECTOR_INIT(&multi->snapshots[type], 0)) {
1940                         ao2_ref(multi, -1);
1941
1942                         return NULL;
1943                 }
1944         }
1945
1946         multi->blob = ast_json_ref(blob);
1947
1948         return multi;
1949 }
1950
1951 /*! \brief Add an object (snapshot) to the blob */
1952 void ast_multi_object_blob_add(struct ast_multi_object_blob *multi,
1953         enum stasis_user_multi_object_snapshot_type type, void *object)
1954 {
1955         if (!multi || !object || AST_VECTOR_APPEND(&multi->snapshots[type], object)) {
1956                 ao2_cleanup(object);
1957         }
1958 }
1959
1960 /*! \brief Publish single channel user event (for app_userevent compatibility) */
1961 void ast_multi_object_blob_single_channel_publish(struct ast_channel *chan,
1962         struct stasis_message_type *type, struct ast_json *blob)
1963 {
1964         struct stasis_message *message;
1965         struct ast_channel_snapshot *channel_snapshot;
1966         struct ast_multi_object_blob *multi;
1967
1968         if (!type) {
1969                 return;
1970         }
1971
1972         multi = ast_multi_object_blob_create(blob);
1973         if (!multi) {
1974                 return;
1975         }
1976
1977         channel_snapshot = ast_channel_snapshot_create(chan);
1978         if (!channel_snapshot) {
1979                 ao2_ref(multi, -1);
1980                 return;
1981         }
1982
1983         /* this call steals the channel_snapshot reference */
1984         ast_multi_object_blob_add(multi, STASIS_UMOS_CHANNEL, channel_snapshot);
1985
1986         message = stasis_message_create(type, multi);
1987         ao2_ref(multi, -1);
1988         if (message) {
1989                 /* app_userevent still publishes to channel */
1990                 stasis_publish(ast_channel_topic(chan), message);
1991                 ao2_ref(message, -1);
1992         }
1993 }
1994
1995 /*! \internal \brief convert multi object blob to ari json */
1996 static struct ast_json *multi_user_event_to_json(
1997         struct stasis_message *message,
1998         const struct stasis_message_sanitizer *sanitize)
1999 {
2000         struct ast_json *out;
2001         struct ast_multi_object_blob *multi = stasis_message_data(message);
2002         struct ast_json *blob = multi->blob;
2003         const struct timeval *tv = stasis_message_timestamp(message);
2004         enum stasis_user_multi_object_snapshot_type type;
2005         int i;
2006
2007         out = ast_json_object_create();
2008         if (!out) {
2009                 return NULL;
2010         }
2011
2012         ast_json_object_set(out, "type", ast_json_string_create("ChannelUserevent"));
2013         ast_json_object_set(out, "timestamp", ast_json_timeval(*tv, NULL));
2014         ast_json_object_set(out, "eventname", ast_json_ref(ast_json_object_get(blob, "eventname")));
2015         ast_json_object_set(out, "userevent", ast_json_ref(blob));
2016
2017         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
2018                 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
2019                         struct ast_json *json_object = NULL;
2020                         char *name = NULL;
2021                         void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
2022
2023                         switch (type) {
2024                         case STASIS_UMOS_CHANNEL:
2025                                 json_object = ast_channel_snapshot_to_json(snapshot, sanitize);
2026                                 name = "channel";
2027                                 break;
2028                         case STASIS_UMOS_BRIDGE:
2029                                 json_object = ast_bridge_snapshot_to_json(snapshot, sanitize);
2030                                 name = "bridge";
2031                                 break;
2032                         case STASIS_UMOS_ENDPOINT:
2033                                 json_object = ast_endpoint_snapshot_to_json(snapshot, sanitize);
2034                                 name = "endpoint";
2035                                 break;
2036                         }
2037                         if (json_object) {
2038                                 ast_json_object_set(out, name, json_object);
2039                         }
2040                 }
2041         }
2042
2043         return out;
2044 }
2045
2046 /*! \internal \brief convert multi object blob to ami string */
2047 static struct ast_str *multi_object_blob_to_ami(void *obj)
2048 {
2049         struct ast_str *ami_str=ast_str_create(1024);
2050         struct ast_str *ami_snapshot;
2051         const struct ast_multi_object_blob *multi = obj;
2052         enum stasis_user_multi_object_snapshot_type type;
2053         int i;
2054
2055         if (!ami_str) {
2056                 return NULL;
2057         }
2058         if (!multi) {
2059                 ast_free(ami_str);
2060                 return NULL;
2061         }
2062
2063         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
2064                 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
2065                         char *name = NULL;
2066                         void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
2067                         ami_snapshot = NULL;
2068
2069                         if (i > 0) {
2070                                 ast_asprintf(&name, "%d", i + 1);
2071                         }
2072
2073                         switch (type) {
2074                         case STASIS_UMOS_CHANNEL:
2075                                 ami_snapshot = ast_manager_build_channel_state_string_prefix(snapshot, name ?: "");
2076                                 break;
2077
2078                         case STASIS_UMOS_BRIDGE:
2079                                 ami_snapshot = ast_manager_build_bridge_state_string_prefix(snapshot, name ?: "");
2080                                 break;
2081
2082                         case STASIS_UMOS_ENDPOINT:
2083                                 /* currently not sending endpoint snapshots to AMI */
2084                                 break;
2085                         }
2086                         if (ami_snapshot) {
2087                                 ast_str_append(&ami_str, 0, "%s", ast_str_buffer(ami_snapshot));
2088                                 ast_free(ami_snapshot);
2089                         }
2090                         ast_free(name);
2091                 }
2092         }
2093
2094         return ami_str;
2095 }
2096
2097 /*! \internal \brief Callback to pass only user defined parameters from blob */
2098 static int userevent_exclusion_cb(const char *key)
2099 {
2100         if (!strcmp("eventname", key)) {
2101                 return 1;
2102         }
2103         return 0;
2104 }
2105
2106 static struct ast_manager_event_blob *multi_user_event_to_ami(
2107         struct stasis_message *message)
2108 {
2109         RAII_VAR(struct ast_str *, object_string, NULL, ast_free);
2110         RAII_VAR(struct ast_str *, body, NULL, ast_free);
2111         struct ast_multi_object_blob *multi = stasis_message_data(message);
2112         const char *eventname;
2113
2114         eventname = ast_json_string_get(ast_json_object_get(multi->blob, "eventname"));
2115         body = ast_manager_str_from_json_object(multi->blob, userevent_exclusion_cb);
2116         object_string = multi_object_blob_to_ami(multi);
2117         if (!object_string || !body) {
2118                 return NULL;
2119         }
2120
2121         return ast_manager_event_blob_create(EVENT_FLAG_USER, "UserEvent",
2122                 "%s"
2123                 "UserEvent: %s\r\n"
2124                 "%s",
2125                 ast_str_buffer(object_string),
2126                 eventname,
2127                 ast_str_buffer(body));
2128 }
2129
2130 /*! \brief A structure to hold global configuration-related options */
2131 struct stasis_declined_config {
2132         /*! The list of message types to decline */
2133         struct ao2_container *declined;
2134 };
2135
2136 /*! \brief Threadpool configuration options */
2137 struct stasis_threadpool_conf {
2138         /*! Initial size of the thread pool */
2139         int initial_size;
2140         /*! Time, in seconds, before we expire a thread */
2141         int idle_timeout_sec;
2142         /*! Maximum number of thread to allow */
2143         int max_size;
2144 };
2145
2146 struct stasis_config {
2147         /*! Thread pool configuration options */
2148         struct stasis_threadpool_conf *threadpool_options;
2149         /*! Declined message types */
2150         struct stasis_declined_config *declined_message_types;
2151 };
2152
2153 static struct aco_type threadpool_option = {
2154         .type = ACO_GLOBAL,
2155         .name = "threadpool",
2156         .item_offset = offsetof(struct stasis_config, threadpool_options),
2157         .category = "threadpool",
2158         .category_match = ACO_WHITELIST_EXACT,
2159 };
2160
2161 static struct aco_type *threadpool_options[] = ACO_TYPES(&threadpool_option);
2162
2163 /*! \brief An aco_type structure to link the "declined_message_types" category to the stasis_declined_config type */
2164 static struct aco_type declined_option = {
2165         .type = ACO_GLOBAL,
2166         .name = "declined_message_types",
2167         .item_offset = offsetof(struct stasis_config, declined_message_types),
2168         .category_match = ACO_WHITELIST_EXACT,
2169         .category = "declined_message_types",
2170 };
2171
2172 struct aco_type *declined_options[] = ACO_TYPES(&declined_option);
2173
2174 struct aco_file stasis_conf = {
2175         .filename = "stasis.conf",
2176         .types = ACO_TYPES(&declined_option, &threadpool_option),
2177 };
2178
2179 /*! \brief A global object container that will contain the stasis_config that gets swapped out on reloads */
2180 static AO2_GLOBAL_OBJ_STATIC(globals);
2181
2182 static void *stasis_config_alloc(void);
2183
2184 /*! \brief Register information about the configs being processed by this module */
2185 CONFIG_INFO_CORE("stasis", cfg_info, globals, stasis_config_alloc,
2186         .files = ACO_FILES(&stasis_conf),
2187 );
2188
2189 static void stasis_declined_config_destructor(void *obj)
2190 {
2191         struct stasis_declined_config *declined = obj;
2192
2193         ao2_cleanup(declined->declined);
2194 }
2195
2196 static void stasis_config_destructor(void *obj)
2197 {
2198         struct stasis_config *cfg = obj;
2199
2200         ao2_cleanup(cfg->declined_message_types);
2201         ast_free(cfg->threadpool_options);
2202 }
2203
2204 static void *stasis_config_alloc(void)
2205 {
2206         struct stasis_config *cfg;
2207
2208         if (!(cfg = ao2_alloc(sizeof(*cfg), stasis_config_destructor))) {
2209                 return NULL;
2210         }
2211
2212         cfg->threadpool_options = ast_calloc(1, sizeof(*cfg->threadpool_options));
2213         if (!cfg->threadpool_options) {
2214                 ao2_ref(cfg, -1);
2215                 return NULL;
2216         }
2217
2218         cfg->declined_message_types = ao2_alloc(sizeof(*cfg->declined_message_types),
2219                 stasis_declined_config_destructor);
2220         if (!cfg->declined_message_types) {
2221                 ao2_ref(cfg, -1);
2222                 return NULL;
2223         }
2224
2225         cfg->declined_message_types->declined = ast_str_container_alloc(13);
2226         if (!cfg->declined_message_types->declined) {
2227                 ao2_ref(cfg, -1);
2228                 return NULL;
2229         }
2230
2231         return cfg;
2232 }
2233
2234 int stasis_message_type_declined(const char *name)
2235 {
2236         struct stasis_config *cfg = ao2_global_obj_ref(globals);
2237         char *name_in_declined;
2238         int res;
2239
2240         if (!cfg || !cfg->declined_message_types) {
2241                 ao2_cleanup(cfg);
2242                 return 0;
2243         }
2244
2245         name_in_declined = ao2_find(cfg->declined_message_types->declined, name, OBJ_SEARCH_KEY);
2246         res = name_in_declined ? 1 : 0;
2247         ao2_cleanup(name_in_declined);
2248         ao2_ref(cfg, -1);
2249         if (res) {
2250                 ast_log(LOG_NOTICE, "Declining to allocate Stasis message type '%s' due to configuration\n", name);
2251         }
2252         return res;
2253 }
2254
2255 static int declined_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
2256 {
2257         struct stasis_declined_config *declined = obj;
2258
2259         if (ast_strlen_zero(var->value)) {
2260                 return 0;
2261         }
2262
2263         if (ast_str_container_add(declined->declined, var->value)) {
2264                 return -1;
2265         }
2266
2267         return 0;
2268 }
2269
2270 /*!
2271  * @{ \brief Define multi user event message type(s).
2272  */
2273
2274 STASIS_MESSAGE_TYPE_DEFN(ast_multi_user_event_type,
2275         .to_json = multi_user_event_to_json,
2276         .to_ami = multi_user_event_to_ami,
2277         );
2278
2279 /*! @} */
2280
2281 /*!
2282  * \internal
2283  * \brief CLI command implementation for 'stasis show topics'
2284  */
2285 static char *stasis_show_topics(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2286 {
2287         struct ao2_iterator iter;
2288         struct topic_proxy *topic;
2289         struct ao2_container *tmp_container;
2290         int count = 0;
2291 #define FMT_HEADERS             "%-64s %-64s\n"
2292 #define FMT_FIELDS              "%-64s %-64s\n"
2293
2294         switch (cmd) {
2295         case CLI_INIT:
2296                 e->command = "stasis show topics";
2297                 e->usage =
2298                         "Usage: stasis show topics\n"
2299                         "       Shows a list of topics\n";
2300                 return NULL;
2301         case CLI_GENERATE:
2302                 return NULL;
2303         }
2304
2305         if (a->argc != e->args) {
2306                 return CLI_SHOWUSAGE;
2307         }
2308
2309         ast_cli(a->fd, "\n" FMT_HEADERS, "Name", "Detail");
2310
2311         tmp_container = ao2_container_alloc_list(AO2_ALLOC_OPT_LOCK_NOLOCK, 0,
2312                                 topic_proxy_sort_fn, NULL);
2313
2314         if (!tmp_container || ao2_container_dup(tmp_container, topic_all, OBJ_SEARCH_OBJECT)) {
2315                 ao2_cleanup(tmp_container);
2316
2317                 return NULL;
2318         }
2319
2320         /* getting all topic in order */
2321         iter = ao2_iterator_init(tmp_container, AO2_ITERATOR_UNLINK);
2322         while ((topic = ao2_iterator_next(&iter))) {
2323                 ast_cli(a->fd, FMT_FIELDS, topic->name, topic->detail);
2324                 ao2_ref(topic, -1);
2325                 ++count;
2326         }
2327         ao2_iterator_destroy(&iter);
2328         ao2_cleanup(tmp_container);
2329
2330         ast_cli(a->fd, "\n%d Total topics\n\n", count);
2331
2332 #undef FMT_HEADERS
2333 #undef FMT_FIELDS
2334
2335         return CLI_SUCCESS;
2336 }
2337
2338 /*!
2339  * \internal
2340  * \brief CLI tab completion for topic names
2341  */
2342 static char *topic_complete_name(const char *word)
2343 {
2344         struct topic_proxy *topic;
2345         struct ao2_iterator it;
2346         int wordlen = strlen(word);
2347         int ret;
2348
2349         it = ao2_iterator_init(topic_all, 0);
2350         while ((topic = ao2_iterator_next(&it))) {
2351                 if (!strncasecmp(word, topic->name, wordlen)) {
2352                         ret = ast_cli_completion_add(ast_strdup(topic->name));
2353                         if (ret) {
2354                                 ao2_ref(topic, -1);
2355                                 break;
2356                         }
2357                 }
2358                 ao2_ref(topic, -1);
2359         }
2360         ao2_iterator_destroy(&it);
2361         return NULL;
2362 }
2363
2364 /*!
2365  * \internal
2366  * \brief CLI command implementation for 'stasis show topic'
2367  */
2368 static char *stasis_show_topic(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2369 {
2370         struct stasis_topic *topic;
2371         char print_time[32];
2372
2373         switch (cmd) {
2374         case CLI_INIT:
2375                 e->command = "stasis show topic";
2376                 e->usage =
2377                     "Usage: stasis show topic <name>\n"
2378                     "       Show stasis topic detail info.\n";
2379                 return NULL;
2380         case CLI_GENERATE:
2381                 if (a->pos == 3) {
2382                         return topic_complete_name(a->word);
2383                 } else {
2384                         return NULL;
2385                 }
2386         }
2387
2388         if (a->argc != 4) {
2389                 return CLI_SHOWUSAGE;
2390         }
2391
2392         topic = stasis_topic_get(a->argv[3]);
2393         if (!topic) {
2394                 ast_cli(a->fd, "Specified topic '%s' does not exist\n", a->argv[3]);
2395                 return CLI_FAILURE;
2396         }
2397
2398         ast_cli(a->fd, "Name: %s\n", topic->name);
2399         ast_cli(a->fd, "Detail: %s\n", topic->detail);
2400         ast_cli(a->fd, "Subscribers count: %lu\n", AST_VECTOR_SIZE(&topic->subscribers));
2401         ast_cli(a->fd, "Forwarding topic count: %lu\n", AST_VECTOR_SIZE(&topic->upstream_topics));
2402         ast_format_duration_hh_mm_ss(ast_tvnow().tv_sec - topic->creationtime->tv_sec, print_time, sizeof(print_time));
2403         ast_cli(a->fd, "Duration time: %s\n", print_time);
2404
2405         ao2_ref(topic, -1);
2406
2407         return CLI_SUCCESS;
2408 }
2409
2410
2411 static struct ast_cli_entry cli_stasis[] = {
2412         AST_CLI_DEFINE(stasis_show_topics, "Show all topics"),
2413         AST_CLI_DEFINE(stasis_show_topic, "Show topic"),
2414 };
2415
2416
2417 #ifdef AST_DEVMODE
2418
2419 AO2_STRING_FIELD_SORT_FN(stasis_subscription_statistics, uniqueid);
2420
2421 /*!
2422  * \internal
2423  * \brief CLI command implementation for 'stasis statistics show subscriptions'
2424  */
2425 static char *statistics_show_subscriptions(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2426 {
2427         struct ao2_container *sorted_subscriptions;
2428         struct ao2_iterator iter;
2429         struct stasis_subscription_statistics *statistics;
2430         int count = 0;
2431         int dropped = 0;
2432         int passed = 0;
2433 #define FMT_HEADERS             "%-64s %10s %10s %16s %16s\n"
2434 #define FMT_FIELDS              "%-64s %10d %10d %16ld %16ld\n"
2435 #define FMT_FIELDS2             "%-64s %10d %10d\n"
2436
2437         switch (cmd) {
2438         case CLI_INIT:
2439                 e->command = "stasis statistics show subscriptions";
2440                 e->usage =
2441                         "Usage: stasis statistics show subscriptions\n"
2442                         "       Shows a list of subscriptions and their general statistics\n";
2443                 return NULL;
2444         case CLI_GENERATE:
2445                 return NULL;
2446         }
2447
2448         if (a->argc != e->args) {
2449                 return CLI_SHOWUSAGE;
2450         }
2451
2452         sorted_subscriptions = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_NOLOCK, 0,
2453                 stasis_subscription_statistics_sort_fn, NULL);
2454         if (!sorted_subscriptions) {
2455                 ast_cli(a->fd, "Could not create container for sorting subscription statistics\n");
2456                 return CLI_SUCCESS;
2457         }
2458
2459         if (ao2_container_dup(sorted_subscriptions, subscription_statistics, 0)) {
2460                 ao2_ref(sorted_subscriptions, -1);
2461                 ast_cli(a->fd, "Could not sort subscription statistics\n");
2462                 return CLI_SUCCESS;
2463         }
2464
2465         ast_cli(a->fd, "\n" FMT_HEADERS, "Subscription", "Dropped", "Passed", "Lowest Invoke", "Highest Invoke");
2466
2467         iter = ao2_iterator_init(sorted_subscriptions, 0);
2468         while ((statistics = ao2_iterator_next(&iter))) {
2469                 ast_cli(a->fd, FMT_FIELDS, statistics->uniqueid, statistics->messages_dropped, statistics->messages_passed,
2470                         statistics->lowest_time_invoked, statistics->highest_time_invoked);
2471                 dropped += statistics->messages_dropped;
2472                 passed += statistics->messages_passed;
2473                 ao2_ref(statistics, -1);
2474                 ++count;
2475         }
2476         ao2_iterator_destroy(&iter);
2477
2478         ao2_ref(sorted_subscriptions, -1);
2479
2480         ast_cli(a->fd, FMT_FIELDS2, "Total", dropped, passed);
2481         ast_cli(a->fd, "\n%d subscriptions\n\n", count);
2482
2483 #undef FMT_HEADERS
2484 #undef FMT_FIELDS
2485 #undef FMT_FIELDS2
2486
2487         return CLI_SUCCESS;
2488 }
2489
2490 /*!
2491  * \internal
2492  * \brief CLI tab completion for subscription statistics names
2493  */
2494 static char *subscription_statistics_complete_name(const char *word, int state)
2495 {
2496         struct stasis_subscription_statistics *statistics;
2497         struct ao2_iterator it_statistics;
2498         int wordlen = strlen(word);
2499         int which = 0;
2500         char *result = NULL;
2501
2502         it_statistics = ao2_iterator_init(subscription_statistics, 0);
2503         while ((statistics = ao2_iterator_next(&it_statistics))) {
2504                 if (!strncasecmp(word, statistics->uniqueid, wordlen)
2505                         && ++which > state) {
2506                         result = ast_strdup(statistics->uniqueid);
2507                 }
2508                 ao2_ref(statistics, -1);
2509                 if (result) {
2510                         break;
2511                 }
2512         }
2513         ao2_iterator_destroy(&it_statistics);
2514         return result;
2515 }
2516
2517 /*!
2518  * \internal
2519  * \brief CLI command implementation for 'stasis statistics show subscription'
2520  */
2521 static char *statistics_show_subscription(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2522 {
2523         struct stasis_subscription_statistics *statistics;
2524         struct ao2_iterator i;
2525         char *name;
2526
2527         switch (cmd) {
2528         case CLI_INIT:
2529                 e->command = "stasis statistics show subscription";
2530                 e->usage =
2531                     "Usage: stasis statistics show subscription <uniqueid>\n"
2532                     "       Show stasis subscription statistics.\n";
2533                 return NULL;
2534         case CLI_GENERATE:
2535                 if (a->pos == 4) {
2536                         return subscription_statistics_complete_name(a->word, a->n);
2537                 } else {
2538                         return NULL;
2539                 }
2540         }
2541
2542         if (a->argc != 5) {
2543                 return CLI_SHOWUSAGE;
2544         }
2545
2546         statistics = ao2_find(subscription_statistics, a->argv[4], OBJ_SEARCH_KEY);
2547         if (!statistics) {
2548                 ast_cli(a->fd, "Specified subscription '%s' does not exist\n", a->argv[4]);
2549                 return CLI_FAILURE;
2550         }
2551
2552         ast_cli(a->fd, "Subscription: %s\n", statistics->uniqueid);
2553         ast_cli(a->fd, "Pointer Address: %p\n", statistics->sub);
2554         ast_cli(a->fd, "Source filename: %s\n", S_OR(statistics->file, "<unavailable>"));
2555         ast_cli(a->fd, "Source line number: %d\n", statistics->lineno);
2556         ast_cli(a->fd, "Source function: %s\n", S_OR(statistics->func, "<unavailable>"));
2557         ast_cli(a->fd, "Number of messages dropped due to filtering: %d\n", statistics->messages_dropped);
2558         ast_cli(a->fd, "Number of messages passed to subscriber callback: %d\n", statistics->messages_passed);
2559         ast_cli(a->fd, "Using mailbox to queue messages: %s\n", statistics->uses_mailbox ? "Yes" : "No");
2560         ast_cli(a->fd, "Using stasis threadpool for handling messages: %s\n", statistics->uses_threadpool ? "Yes" : "No");
2561         ast_cli(a->fd, "Lowest amount of time (in milliseconds) spent invoking message: %ld\n", statistics->lowest_time_invoked);
2562         ast_cli(a->fd, "Highest amount of time (in milliseconds) spent invoking message: %ld\n", statistics->highest_time_invoked);
2563
2564         ao2_lock(statistics);
2565         if (statistics->highest_time_message_type) {
2566                 ast_cli(a->fd, "Offender message type for highest invoking time: %s\n", stasis_message_type_name(statistics->highest_time_message_type));
2567         }
2568         ao2_unlock(statistics);
2569
2570         ast_cli(a->fd, "Number of topics: %d\n", ao2_container_count(statistics->topics));
2571
2572         ast_cli(a->fd, "Subscribed topics:\n");
2573         i = ao2_iterator_init(statistics->topics, 0);
2574         while ((name = ao2_iterator_next(&i))) {
2575                 ast_cli(a->fd, "\t%s\n", name);
2576                 ao2_ref(name, -1);
2577         }
2578         ao2_iterator_destroy(&i);
2579
2580         ao2_ref(statistics, -1);
2581
2582         return CLI_SUCCESS;
2583 }
2584
2585 AO2_STRING_FIELD_SORT_FN(stasis_topic_statistics, name);
2586
2587 /*!
2588  * \internal
2589  * \brief CLI command implementation for 'stasis statistics show topics'
2590  */
2591 static char *statistics_show_topics(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2592 {
2593         struct ao2_container *sorted_topics;
2594         struct ao2_iterator iter;
2595         struct stasis_topic_statistics *statistics;
2596         int count = 0;
2597         int not_dispatched = 0;
2598         int dispatched = 0;
2599 #define FMT_HEADERS             "%-64s %10s %10s %10s %16s %16s\n"
2600 #define FMT_FIELDS              "%-64s %10d %10d %10d %16ld %16ld\n"
2601 #define FMT_FIELDS2             "%-64s %10s %10d %10d\n"
2602
2603         switch (cmd) {
2604         case CLI_INIT:
2605                 e->command = "stasis statistics show topics";
2606                 e->usage =
2607                         "Usage: stasis statistics show topics\n"
2608                         "       Shows a list of topics and their general statistics\n";
2609                 return NULL;
2610         case CLI_GENERATE:
2611                 return NULL;
2612         }
2613
2614         if (a->argc != e->args) {
2615                 return CLI_SHOWUSAGE;
2616         }
2617
2618         sorted_topics = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_NOLOCK, 0,
2619                 stasis_topic_statistics_sort_fn, NULL);
2620         if (!sorted_topics) {
2621                 ast_cli(a->fd, "Could not create container for sorting topic statistics\n");
2622                 return CLI_SUCCESS;
2623         }
2624
2625         if (ao2_container_dup(sorted_topics, topic_statistics, 0)) {
2626                 ao2_ref(sorted_topics, -1);
2627                 ast_cli(a->fd, "Could not sort topic statistics\n");
2628                 return CLI_SUCCESS;
2629         }
2630
2631         ast_cli(a->fd, "\n" FMT_HEADERS, "Topic", "Subscribers", "Dropped", "Dispatched", "Lowest Dispatch", "Highest Dispatch");
2632
2633         iter = ao2_iterator_init(sorted_topics, 0);
2634         while ((statistics = ao2_iterator_next(&iter))) {
2635                 ast_cli(a->fd, FMT_FIELDS, statistics->name, ao2_container_count(statistics->subscribers),
2636                         statistics->messages_not_dispatched, statistics->messages_dispatched,
2637                         statistics->lowest_time_dispatched, statistics->highest_time_dispatched);
2638                 not_dispatched += statistics->messages_not_dispatched;
2639                 dispatched += statistics->messages_dispatched;
2640                 ao2_ref(statistics, -1);
2641                 ++count;
2642         }
2643         ao2_iterator_destroy(&iter);
2644
2645         ao2_ref(sorted_topics, -1);
2646
2647         ast_cli(a->fd, FMT_FIELDS2, "Total", "", not_dispatched, dispatched);
2648         ast_cli(a->fd, "\n%d topics\n\n", count);
2649
2650 #undef FMT_HEADERS
2651 #undef FMT_FIELDS
2652 #undef FMT_FIELDS2
2653
2654         return CLI_SUCCESS;
2655 }
2656
2657 /*!
2658  * \internal
2659  * \brief CLI tab completion for topic statistics names
2660  */
2661 static char *topic_statistics_complete_name(const char *word, int state)
2662 {
2663         struct stasis_topic_statistics *statistics;
2664         struct ao2_iterator it_statistics;
2665         int wordlen = strlen(word);
2666         int which = 0;
2667         char *result = NULL;
2668
2669         it_statistics = ao2_iterator_init(topic_statistics, 0);
2670         while ((statistics = ao2_iterator_next(&it_statistics))) {
2671                 if (!strncasecmp(word, statistics->name, wordlen)
2672                         && ++which > state) {
2673                         result = ast_strdup(statistics->name);
2674                 }
2675                 ao2_ref(statistics, -1);
2676                 if (result) {
2677                         break;
2678                 }
2679         }
2680         ao2_iterator_destroy(&it_statistics);
2681         return result;
2682 }
2683
2684 /*!
2685  * \internal
2686  * \brief CLI command implementation for 'stasis statistics show topic'
2687  */
2688 static char *statistics_show_topic(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2689 {
2690         struct stasis_topic_statistics *statistics;
2691         struct ao2_iterator i;
2692         char *uniqueid;
2693
2694         switch (cmd) {
2695         case CLI_INIT:
2696                 e->command = "stasis statistics show topic";
2697                 e->usage =
2698                     "Usage: stasis statistics show topic <name>\n"
2699                     "       Show stasis topic statistics.\n";
2700                 return NULL;
2701         case CLI_GENERATE:
2702                 if (a->pos == 4) {
2703                         return topic_statistics_complete_name(a->word, a->n);
2704                 } else {
2705                         return NULL;
2706                 }
2707         }
2708
2709         if (a->argc != 5) {
2710                 return CLI_SHOWUSAGE;
2711         }
2712
2713         statistics = ao2_find(topic_statistics, a->argv[4], OBJ_SEARCH_KEY);
2714         if (!statistics) {
2715                 ast_cli(a->fd, "Specified topic '%s' does not exist\n", a->argv[4]);
2716                 return CLI_FAILURE;
2717         }
2718
2719         ast_cli(a->fd, "Topic: %s\n", statistics->name);
2720         ast_cli(a->fd, "Pointer Address: %p\n", statistics->topic);
2721         ast_cli(a->fd, "Number of messages published that went to no subscriber: %d\n", statistics->messages_not_dispatched);
2722         ast_cli(a->fd, "Number of messages that went to at least one subscriber: %d\n", statistics->messages_dispatched);
2723         ast_cli(a->fd, "Lowest amount of time (in milliseconds) spent dispatching message: %ld\n", statistics->lowest_time_dispatched);
2724         ast_cli(a->fd, "Highest amount of time (in milliseconds) spent dispatching messages: %ld\n", statistics->highest_time_dispatched);
2725         ast_cli(a->fd, "Number of subscribers: %d\n", ao2_container_count(statistics->subscribers));
2726
2727         ast_cli(a->fd, "Subscribers:\n");
2728         i = ao2_iterator_init(statistics->subscribers, 0);
2729         while ((uniqueid = ao2_iterator_next(&i))) {
2730                 ast_cli(a->fd, "\t%s\n", uniqueid);
2731                 ao2_ref(uniqueid, -1);
2732         }
2733         ao2_iterator_destroy(&i);
2734
2735         ao2_ref(statistics, -1);
2736
2737         return CLI_SUCCESS;
2738 }
2739
2740 /*!
2741  * \internal
2742  * \brief CLI command implementation for 'stasis statistics show messages'
2743  */
2744 static char *statistics_show_messages(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2745 {
2746         int i;
2747         int count = 0;
2748         int published = 0;
2749         int unused = 0;
2750 #define FMT_HEADERS             "%-64s %10s %10s\n"
2751 #define FMT_FIELDS              "%-64s %10d %10d\n"
2752
2753         switch (cmd) {
2754         case CLI_INIT:
2755                 e->command = "stasis statistics show messages";
2756                 e->usage =
2757                         "Usage: stasis statistics show messages\n"
2758                         "       Shows a list of message types and their general statistics\n";
2759                 return NULL;
2760         case CLI_GENERATE:
2761                 return NULL;
2762         }
2763
2764         if (a->argc != e->args) {
2765                 return CLI_SHOWUSAGE;
2766         }
2767
2768         ast_cli(a->fd, "\n" FMT_HEADERS, "Message Type", "Published", "Unused");
2769
2770         ast_mutex_lock(&message_type_statistics_lock);
2771         for (i = 0; i < AST_VECTOR_SIZE(&message_type_statistics); ++i) {
2772                 struct stasis_message_type_statistics *statistics = AST_VECTOR_GET_ADDR(&message_type_statistics, i);
2773
2774                 if (!statistics->message_type) {
2775                         continue;
2776                 }
2777
2778                 ast_cli(a->fd, FMT_FIELDS, stasis_message_type_name(statistics->message_type), statistics->published,
2779                         statistics->unused);
2780                 published += statistics->published;
2781                 unused += statistics->unused;
2782                 ++count;
2783         }
2784         ast_mutex_unlock(&message_type_statistics_lock);
2785
2786         ast_cli(a->fd, FMT_FIELDS, "Total", published, unused);
2787         ast_cli(a->fd, "\n%d seen message types\n\n", count);
2788
2789 #undef FMT_HEADERS
2790 #undef FMT_FIELDS
2791
2792         return CLI_SUCCESS;
2793 }
2794
2795 static struct ast_cli_entry cli_stasis_statistics[] = {
2796         AST_CLI_DEFINE(statistics_show_subscriptions, "Show subscriptions with general statistics"),
2797         AST_CLI_DEFINE(statistics_show_subscription, "Show subscription statistics"),
2798         AST_CLI_DEFINE(statistics_show_topics, "Show topics with general statistics"),
2799         AST_CLI_DEFINE(statistics_show_topic, "Show topic statistics"),
2800         AST_CLI_DEFINE(statistics_show_messages, "Show message types with general statistics"),
2801 };
2802
2803 static int subscription_statistics_hash(const void *obj, const int flags)
2804 {
2805         const struct stasis_subscription_statistics *object;
2806         const char *key;
2807
2808         switch (flags & OBJ_SEARCH_MASK) {
2809         case OBJ_SEARCH_KEY:
2810                 key = obj;
2811                 break;
2812         case OBJ_SEARCH_OBJECT:
2813                 object = obj;
2814                 key = object->uniqueid;
2815                 break;
2816         default:
2817                 /* Hash can only work on something with a full key. */
2818                 ast_assert(0);
2819                 return 0;
2820         }
2821         return ast_str_case_hash(key);
2822 }
2823
2824 static int subscription_statistics_cmp(void *obj, void *arg, int flags)
2825 {
2826         const struct stasis_subscription_statistics *object_left = obj;
2827         const struct stasis_subscription_statistics *object_right = arg;
2828         const char *right_key = arg;
2829         int cmp;
2830
2831         switch (flags & OBJ_SEARCH_MASK) {
2832         case OBJ_SEARCH_OBJECT:
2833                 right_key = object_right->uniqueid;
2834                 /* Fall through */
2835         case OBJ_SEARCH_KEY:
2836                 cmp = strcasecmp(object_left->uniqueid, right_key);
2837                 break;
2838         case OBJ_SEARCH_PARTIAL_KEY:
2839                 /* Not supported by container */
2840                 ast_assert(0);
2841                 cmp = -1;
2842                 break;
2843         default:
2844                 /*
2845                  * What arg points to is specific to this traversal callback
2846                  * and has no special meaning to astobj2.
2847                  */
2848                 cmp = 0;
2849                 break;
2850         }
2851         if (cmp) {
2852                 return 0;
2853         }
2854         /*
2855          * At this point the traversal callback is identical to a sorted
2856          * container.
2857          */
2858         return CMP_MATCH;
2859 }
2860
2861 static int topic_statistics_hash(const void *obj, const int flags)
2862 {
2863         const struct stasis_topic_statistics *object;
2864         const char *key;
2865
2866         switch (flags & OBJ_SEARCH_MASK) {
2867         case OBJ_SEARCH_KEY:
2868                 key = obj;
2869                 break;
2870         case OBJ_SEARCH_OBJECT:
2871                 object = obj;
2872                 key = object->name;
2873                 break;
2874         default:
2875                 /* Hash can only work on something with a full key. */
2876                 ast_assert(0);
2877                 return 0;
2878         }
2879         return ast_str_case_hash(key);
2880 }
2881
2882 static int topic_statistics_cmp(void *obj, void *arg, int flags)
2883 {
2884         const struct stasis_topic_statistics *object_left = obj;
2885         const struct stasis_topic_statistics *object_right = arg;
2886         const char *right_key = arg;
2887         int cmp;
2888
2889         switch (flags & OBJ_SEARCH_MASK) {
2890         case OBJ_SEARCH_OBJECT:
2891                 right_key = object_right->name;
2892                 /* Fall through */
2893         case OBJ_SEARCH_KEY:
2894                 cmp = strcasecmp(object_left->name, right_key);
2895                 break;
2896         case OBJ_SEARCH_PARTIAL_KEY:
2897                 /* Not supported by container */
2898                 ast_assert(0);
2899                 cmp = -1;
2900                 break;
2901         default:
2902                 /*
2903                  * What arg points to is specific to this traversal callback
2904                  * and has no special meaning to astobj2.
2905                  */
2906                 cmp = 0;
2907                 break;
2908         }
2909         if (cmp) {
2910                 return 0;
2911         }
2912         /*
2913          * At this point the traversal callback is identical to a sorted
2914          * container.
2915          */
2916         return CMP_MATCH;
2917 }
2918 #endif
2919
2920 /*! \brief Cleanup function for graceful shutdowns */
2921 static void stasis_cleanup(void)
2922 {
2923 #ifdef AST_DEVMODE
2924         ast_cli_unregister_multiple(cli_stasis_statistics, ARRAY_LEN(cli_stasis_statistics));
2925         AST_VECTOR_FREE(&message_type_statistics);
2926         ao2_cleanup(subscription_statistics);
2927         ao2_cleanup(topic_statistics);
2928 #endif
2929         ast_cli_unregister_multiple(cli_stasis, ARRAY_LEN(cli_stasis));
2930         ao2_cleanup(topic_all);
2931         topic_all = NULL;
2932         ast_threadpool_shutdown(pool);
2933         pool = NULL;
2934         STASIS_MESSAGE_TYPE_CLEANUP(stasis_subscription_change_type);
2935         STASIS_MESSAGE_TYPE_CLEANUP(ast_multi_user_event_type);
2936         aco_info_destroy(&cfg_info);
2937         ao2_global_obj_release(globals);
2938 }
2939
2940 int stasis_init(void)
2941 {
2942         struct stasis_config *cfg;
2943         int cache_init;
2944         struct ast_threadpool_options threadpool_opts = { 0, };
2945
2946         /* Be sure the types are cleaned up after the message bus */
2947         ast_register_cleanup(stasis_cleanup);
2948
2949         if (aco_info_init(&cfg_info)) {
2950                 return -1;
2951         }
2952
2953         aco_option_register_custom(&cfg_info, "decline", ACO_EXACT,
2954                 declined_options, "", declined_handler, 0);
2955         aco_option_register(&cfg_info, "initial_size", ACO_EXACT,
2956                 threadpool_options, "5", OPT_INT_T, PARSE_IN_RANGE,
2957                 FLDSET(struct stasis_threadpool_conf, initial_size), 0,
2958                 INT_MAX);
2959         aco_option_register(&cfg_info, "idle_timeout_sec", ACO_EXACT,
2960                 threadpool_options, "20", OPT_INT_T, PARSE_IN_RANGE,
2961                 FLDSET(struct stasis_threadpool_conf, idle_timeout_sec), 0,
2962                 INT_MAX);
2963         aco_option_register(&cfg_info, "max_size", ACO_EXACT,
2964                 threadpool_options, "50", OPT_INT_T, PARSE_IN_RANGE,
2965                 FLDSET(struct stasis_threadpool_conf, max_size), 0,
2966                 INT_MAX);
2967
2968         if (aco_process_config(&cfg_info, 0) == ACO_PROCESS_ERROR) {
2969                 struct stasis_config *default_cfg = stasis_config_alloc();
2970
2971                 if (!default_cfg) {
2972                         return -1;
2973                 }
2974
2975                 if (aco_set_defaults(&threadpool_option, "threadpool", default_cfg->threadpool_options)) {
2976                         ast_log(LOG_ERROR, "Failed to initialize defaults on Stasis configuration object\n");
2977                         ao2_ref(default_cfg, -1);
2978
2979                         return -1;
2980                 }
2981
2982                 if (aco_set_defaults(&declined_option, "declined_message_types", default_cfg->declined_message_types)) {
2983                         ast_log(LOG_ERROR, "Failed to load stasis.conf and failed to initialize defaults.\n");
2984                         ao2_ref(default_cfg, -1);
2985
2986                         return -1;
2987                 }
2988
2989                 ast_log(LOG_NOTICE, "Could not load Stasis configuration; using defaults\n");
2990                 ao2_global_obj_replace_unref(globals, default_cfg);
2991                 cfg = default_cfg;
2992         } else {
2993                 cfg = ao2_global_obj_ref(globals);
2994                 if (!cfg) {
2995                         ast_log(LOG_ERROR, "Failed to obtain Stasis configuration object\n");
2996
2997                         return -1;
2998                 }
2999         }
3000
3001         threadpool_opts.version = AST_THREADPOOL_OPTIONS_VERSION;
3002         threadpool_opts.initial_size = cfg->threadpool_options->initial_size;
3003         threadpool_opts.auto_increment = 1;
3004         threadpool_opts.max_size = cfg->threadpool_options->max_size;
3005         threadpool_opts.idle_timeout = cfg->threadpool_options->idle_timeout_sec;
3006         pool = ast_threadpool_create("stasis", NULL, &threadpool_opts);
3007         ao2_ref(cfg, -1);
3008         if (!pool) {
3009                 ast_log(LOG_ERROR, "Failed to create 'stasis-core' threadpool\n");
3010
3011                 return -1;
3012         }
3013
3014         cache_init = stasis_cache_init();
3015         if (cache_init != 0) {
3016                 return -1;
3017         }
3018
3019         if (STASIS_MESSAGE_TYPE_INIT(stasis_subscription_change_type) != 0) {
3020                 return -1;
3021         }
3022         if (STASIS_MESSAGE_TYPE_INIT(ast_multi_user_event_type) != 0) {
3023                 return -1;
3024         }
3025
3026         topic_all = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, TOPIC_ALL_BUCKETS,
3027                         topic_proxy_hash_fn, 0, topic_proxy_cmp_fn);
3028         if (!topic_all) {
3029                 return -1;
3030         }
3031
3032         if (ast_cli_register_multiple(cli_stasis, ARRAY_LEN(cli_stasis))) {
3033                 return -1;
3034         }
3035
3036 #ifdef AST_DEVMODE
3037         /* Statistics information is stored separately so that we don't alter or interrupt the lifetime of the underlying
3038          * topic or subscripton.
3039          */
3040         subscription_statistics = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, SUBSCRIPTION_STATISTICS_BUCKETS,
3041                 subscription_statistics_hash, 0, subscription_statistics_cmp);
3042         if (!subscription_statistics) {
3043                 return -1;
3044         }
3045
3046         topic_statistics = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, TOPIC_STATISTICS_BUCKETS,
3047                 topic_statistics_hash, 0, topic_statistics_cmp);
3048         if (!topic_statistics) {
3049                 return -1;
3050         }
3051
3052         AST_VECTOR_INIT(&message_type_statistics, 0);
3053
3054         if (ast_cli_register_multiple(cli_stasis_statistics, ARRAY_LEN(cli_stasis_statistics))) {
3055                 return -1;
3056         }
3057 #endif
3058
3059         return 0;
3060 }