0c60b13290bdf2361854f74bca141658e81afce2
[asterisk/asterisk.git] / main / stasis.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis Message Bus API.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 #include "asterisk/astobj2.h"
33 #include "asterisk/stasis_internal.h"
34 #include "asterisk/stasis.h"
35 #include "asterisk/taskprocessor.h"
36 #include "asterisk/threadpool.h"
37 #include "asterisk/utils.h"
38 #include "asterisk/uuid.h"
39 #include "asterisk/vector.h"
40 #include "asterisk/stasis_channels.h"
41 #include "asterisk/stasis_bridges.h"
42 #include "asterisk/stasis_endpoints.h"
43 #include "asterisk/config_options.h"
44
45 /*** DOCUMENTATION
46         <managerEvent language="en_US" name="UserEvent">
47                 <managerEventInstance class="EVENT_FLAG_USER">
48                         <synopsis>A user defined event raised from the dialplan.</synopsis>
49                         <syntax>
50                                 <channel_snapshot/>
51                                 <parameter name="UserEvent">
52                                         <para>The event name, as specified in the dialplan.</para>
53                                 </parameter>
54                         </syntax>
55                         <description>
56                                 <para>Event may contain additional arbitrary parameters in addition to optional bridge and endpoint snapshots.  Multiple snapshots of the same type are prefixed with a numeric value.</para>
57                         </description>
58                         <see-also>
59                                 <ref type="application">UserEvent</ref>
60                                 <ref type="managerEvent">UserEvent</ref>
61                         </see-also>
62                 </managerEventInstance>
63         </managerEvent>
64         <configInfo name="stasis" language="en_US">
65                 <configFile name="stasis.conf">
66                         <configObject name="threadpool">
67                                 <synopsis>Settings that configure the threadpool Stasis uses to deliver some messages.</synopsis>
68                                 <configOption name="initial_size" default="5">
69                                         <synopsis>Initial number of threads in the message bus threadpool.</synopsis>
70                                 </configOption>
71                                 <configOption name="idle_timeout_sec" default="20">
72                                         <synopsis>Number of seconds before an idle thread is disposed of.</synopsis>
73                                 </configOption>
74                                 <configOption name="max_size" default="50">
75                                         <synopsis>Maximum number of threads in the threadpool.</synopsis>
76                                 </configOption>
77                         </configObject>
78                         <configObject name="declined_message_types">
79                                 <synopsis>Stasis message types for which to decline creation.</synopsis>
80                                 <configOption name="decline">
81                                         <synopsis>The message type to decline.</synopsis>
82                                         <description>
83                                                 <para>This configuration option defines the name of the Stasis
84                                                 message type that Asterisk is forbidden from creating and can be
85                                                 specified as many times as necessary to achieve the desired result.</para>
86                                                 <enumlist>
87                                                         <enum name="stasis_app_recording_snapshot_type" />
88                                                         <enum name="stasis_app_playback_snapshot_type" />
89                                                         <enum name="stasis_test_message_type" />
90                                                         <enum name="confbridge_start_type" />
91                                                         <enum name="confbridge_end_type" />
92                                                         <enum name="confbridge_join_type" />
93                                                         <enum name="confbridge_leave_type" />
94                                                         <enum name="confbridge_start_record_type" />
95                                                         <enum name="confbridge_stop_record_type" />
96                                                         <enum name="confbridge_mute_type" />
97                                                         <enum name="confbridge_unmute_type" />
98                                                         <enum name="confbridge_talking_type" />
99                                                         <enum name="cel_generic_type" />
100                                                         <enum name="ast_bridge_snapshot_type" />
101                                                         <enum name="ast_bridge_merge_message_type" />
102                                                         <enum name="ast_channel_entered_bridge_type" />
103                                                         <enum name="ast_channel_left_bridge_type" />
104                                                         <enum name="ast_blind_transfer_type" />
105                                                         <enum name="ast_attended_transfer_type" />
106                                                         <enum name="ast_endpoint_snapshot_type" />
107                                                         <enum name="ast_endpoint_state_type" />
108                                                         <enum name="ast_device_state_message_type" />
109                                                         <enum name="ast_test_suite_message_type" />
110                                                         <enum name="ast_mwi_state_type" />
111                                                         <enum name="ast_mwi_vm_app_type" />
112                                                         <enum name="ast_format_register_type" />
113                                                         <enum name="ast_format_unregister_type" />
114                                                         <enum name="ast_manager_get_generic_type" />
115                                                         <enum name="ast_parked_call_type" />
116                                                         <enum name="ast_channel_snapshot_type" />
117                                                         <enum name="ast_channel_dial_type" />
118                                                         <enum name="ast_channel_varset_type" />
119                                                         <enum name="ast_channel_hangup_request_type" />
120                                                         <enum name="ast_channel_dtmf_begin_type" />
121                                                         <enum name="ast_channel_dtmf_end_type" />
122                                                         <enum name="ast_channel_hold_type" />
123                                                         <enum name="ast_channel_unhold_type" />
124                                                         <enum name="ast_channel_chanspy_start_type" />
125                                                         <enum name="ast_channel_chanspy_stop_type" />
126                                                         <enum name="ast_channel_fax_type" />
127                                                         <enum name="ast_channel_hangup_handler_type" />
128                                                         <enum name="ast_channel_moh_start_type" />
129                                                         <enum name="ast_channel_moh_stop_type" />
130                                                         <enum name="ast_channel_monitor_start_type" />
131                                                         <enum name="ast_channel_monitor_stop_type" />
132                                                         <enum name="ast_channel_agent_login_type" />
133                                                         <enum name="ast_channel_agent_logoff_type" />
134                                                         <enum name="ast_channel_talking_start" />
135                                                         <enum name="ast_channel_talking_stop" />
136                                                         <enum name="ast_security_event_type" />
137                                                         <enum name="ast_named_acl_change_type" />
138                                                         <enum name="ast_local_bridge_type" />
139                                                         <enum name="ast_local_optimization_begin_type" />
140                                                         <enum name="ast_local_optimization_end_type" />
141                                                         <enum name="stasis_subscription_change_type" />
142                                                         <enum name="ast_multi_user_event_type" />
143                                                         <enum name="stasis_cache_clear_type" />
144                                                         <enum name="stasis_cache_update_type" />
145                                                         <enum name="ast_network_change_type" />
146                                                         <enum name="ast_system_registry_type" />
147                                                         <enum name="ast_cc_available_type" />
148                                                         <enum name="ast_cc_offertimerstart_type" />
149                                                         <enum name="ast_cc_requested_type" />
150                                                         <enum name="ast_cc_requestacknowledged_type" />
151                                                         <enum name="ast_cc_callerstopmonitoring_type" />
152                                                         <enum name="ast_cc_callerstartmonitoring_type" />
153                                                         <enum name="ast_cc_callerrecalling_type" />
154                                                         <enum name="ast_cc_recallcomplete_type" />
155                                                         <enum name="ast_cc_failure_type" />
156                                                         <enum name="ast_cc_monitorfailed_type" />
157                                                         <enum name="ast_presence_state_message_type" />
158                                                         <enum name="ast_rtp_rtcp_sent_type" />
159                                                         <enum name="ast_rtp_rtcp_received_type" />
160                                                         <enum name="ast_call_pickup_type" />
161                                                         <enum name="aoc_s_type" />
162                                                         <enum name="aoc_d_type" />
163                                                         <enum name="aoc_e_type" />
164                                                         <enum name="dahdichannel_type" />
165                                                         <enum name="mcid_type" />
166                                                         <enum name="session_timeout_type" />
167                                                         <enum name="cdr_read_message_type" />
168                                                         <enum name="cdr_write_message_type" />
169                                                         <enum name="cdr_prop_write_message_type" />
170                                                         <enum name="corosync_ping_message_type" />
171                                                         <enum name="agi_exec_start_type" />
172                                                         <enum name="agi_exec_end_type" />
173                                                         <enum name="agi_async_start_type" />
174                                                         <enum name="agi_async_exec_type" />
175                                                         <enum name="agi_async_end_type" />
176                                                         <enum name="queue_caller_join_type" />
177                                                         <enum name="queue_caller_leave_type" />
178                                                         <enum name="queue_caller_abandon_type" />
179                                                         <enum name="queue_member_status_type" />
180                                                         <enum name="queue_member_added_type" />
181                                                         <enum name="queue_member_removed_type" />
182                                                         <enum name="queue_member_pause_type" />
183                                                         <enum name="queue_member_penalty_type" />
184                                                         <enum name="queue_member_ringinuse_type" />
185                                                         <enum name="queue_agent_called_type" />
186                                                         <enum name="queue_agent_connect_type" />
187                                                         <enum name="queue_agent_complete_type" />
188                                                         <enum name="queue_agent_dump_type" />
189                                                         <enum name="queue_agent_ringnoanswer_type" />
190                                                         <enum name="meetme_join_type" />
191                                                         <enum name="meetme_leave_type" />
192                                                         <enum name="meetme_end_type" />
193                                                         <enum name="meetme_mute_type" />
194                                                         <enum name="meetme_talking_type" />
195                                                         <enum name="meetme_talk_request_type" />
196                                                         <enum name="appcdr_message_type" />
197                                                         <enum name="forkcdr_message_type" />
198                                                         <enum name="cdr_sync_message_type" />
199                                                 </enumlist>
200                                         </description>
201                                 </configOption>
202                         </configObject>
203                 </configFile>
204         </configInfo>
205 ***/
206
207 /*!
208  * \page stasis-impl Stasis Implementation Notes
209  *
210  * \par Reference counting
211  *
212  * Stasis introduces a number of objects, which are tightly related to one
213  * another. Because we rely on ref-counting for memory management, understanding
214  * these relationships is important to understanding this code.
215  *
216  * \code{.txt}
217  *
218  *   stasis_topic <----> stasis_subscription
219  *             ^          ^
220  *              \        /
221  *               \      /
222  *               dispatch
223  *                  |
224  *                  |
225  *                  v
226  *            stasis_message
227  *                  |
228  *                  |
229  *                  v
230  *          stasis_message_type
231  *
232  * \endcode
233  *
234  * The most troubling thing in this chart is the cyclic reference between
235  * stasis_topic and stasis_subscription. This is both unfortunate, and
236  * necessary. Topics need the subscription in order to dispatch messages;
237  * subscriptions need the topics to unsubscribe and check subscription status.
238  *
239  * The cycle is broken by stasis_unsubscribe(). The unsubscribe will remove the
240  * topic's reference to a subscription. When the subcription is destroyed, it
241  * will remove its reference to the topic.
242  *
243  * This means that until a subscription has be explicitly unsubscribed, it will
244  * not be destroyed. Neither will a topic be destroyed while it has subscribers.
245  * The destructors of both have assertions regarding this to catch ref-counting
246  * problems where a subscription or topic has had an extra ao2_cleanup().
247  *
248  * The \ref dispatch object is a transient object, which is posted to a
249  * subscription's taskprocessor to send a message to the subscriber. They have
250  * short life cycles, allocated on one thread, destroyed on another.
251  *
252  * During shutdown, or the deletion of a domain object, there are a flurry of
253  * ao2_cleanup()s on subscriptions and topics, as the final in-flight messages
254  * are processed. Any one of these cleanups could be the one to actually destroy
255  * a given object, so care must be taken to ensure that an object isn't
256  * referenced after an ao2_cleanup(). This includes the implicit ao2_unlock()
257  * that might happen when a RAII_VAR() goes out of scope.
258  *
259  * \par Typical life cycles
260  *
261  *  \li stasis_topic - There are several topics which live for the duration of
262  *      the Asterisk process (ast_channel_topic_all(), etc.) but most of these
263  *      are actually fed by shorter-lived topics whose lifetime is associated
264  *      with some domain object (like ast_channel_topic() for a given
265  *      ast_channel).
266  *
267  *  \li stasis_subscription - Subscriptions have a similar mix of lifetimes as
268  *      topics, for similar reasons.
269  *
270  *  \li dispatch - Very short lived; just long enough to post a message to a
271  *      subscriber.
272  *
273  *  \li stasis_message - Short to intermediate lifetimes, but that is mostly
274  *      irrelevant. Messages are strictly data and have no behavior associated
275  *      with them, so it doesn't really matter if/when they are destroyed. By
276  *      design, a component could hold a ref to a message forever without any
277  *      ill consequences (aside from consuming more memory).
278  *
279  *  \li stasis_message_type - Long life cycles, typically only destroyed on
280  *      module unloading or _clean_ process exit.
281  *
282  * \par Subscriber shutdown sequencing
283  *
284  * Subscribers are sensitive to shutdown sequencing, specifically in how the
285  * reference message types. This is fully detailed on the wiki at
286  * https://wiki.asterisk.org/wiki/x/K4BqAQ.
287  *
288  * In short, the lifetime of the \a data (and \a callback, if in a module) must
289  * be held until the stasis_subscription_final_message() has been received.
290  * Depending on the structure of the subscriber code, this can be handled by
291  * using stasis_subscription_final_message() to free resources on the final
292  * message, or using stasis_subscription_join()/stasis_unsubscribe_and_join() to
293  * block until the unsubscribe has completed.
294  */
295
296 /*! Initial size of the subscribers list. */
297 #define INITIAL_SUBSCRIBERS_MAX 4
298
299 /*! The number of buckets to use for topic pools */
300 #define TOPIC_POOL_BUCKETS 57
301
302 /*! Thread pool for topics that don't want a dedicated taskprocessor */
303 static struct ast_threadpool *pool;
304
305 STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type);
306
307 /*! \internal */
308 struct stasis_topic {
309         char *name;
310         /*! Variable length array of the subscribers */
311         AST_VECTOR(, struct stasis_subscription *) subscribers;
312
313         /*! Topics forwarding into this topic */
314         AST_VECTOR(, struct stasis_topic *) upstream_topics;
315 };
316
317 /* Forward declarations for the tightly-coupled subscription object */
318 static int topic_add_subscription(struct stasis_topic *topic,
319         struct stasis_subscription *sub);
320
321 static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
322
323 /*! \brief Lock two topics. */
324 #define topic_lock_both(topic1, topic2) \
325         do { \
326                 ao2_lock(topic1); \
327                 while (ao2_trylock(topic2)) { \
328                         AO2_DEADLOCK_AVOIDANCE(topic1); \
329                 } \
330         } while (0)
331
332 static void topic_dtor(void *obj)
333 {
334         struct stasis_topic *topic = obj;
335
336         /* Subscribers hold a reference to topics, so they should all be
337          * unsubscribed before we get here. */
338         ast_assert(AST_VECTOR_SIZE(&topic->subscribers) == 0);
339
340         ast_free(topic->name);
341         topic->name = NULL;
342
343         AST_VECTOR_FREE(&topic->subscribers);
344         AST_VECTOR_FREE(&topic->upstream_topics);
345 }
346
347 struct stasis_topic *stasis_topic_create(const char *name)
348 {
349         struct stasis_topic *topic;
350         int res = 0;
351
352         topic = ao2_t_alloc(sizeof(*topic), topic_dtor, name);
353         if (!topic) {
354                 return NULL;
355         }
356
357         topic->name = ast_strdup(name);
358         res |= AST_VECTOR_INIT(&topic->subscribers, INITIAL_SUBSCRIBERS_MAX);
359         res |= AST_VECTOR_INIT(&topic->upstream_topics, 0);
360         if (!topic->name || res) {
361                 ao2_cleanup(topic);
362                 return NULL;
363         }
364
365         return topic;
366 }
367
368 const char *stasis_topic_name(const struct stasis_topic *topic)
369 {
370         return topic->name;
371 }
372
373 size_t stasis_topic_subscribers(const struct stasis_topic *topic)
374 {
375         return AST_VECTOR_SIZE(&topic->subscribers);
376 }
377
378 /*! \internal */
379 struct stasis_subscription {
380         /*! Unique ID for this subscription */
381         char uniqueid[AST_UUID_STR_LEN];
382         /*! Topic subscribed to. */
383         struct stasis_topic *topic;
384         /*! Mailbox for processing incoming messages. */
385         struct ast_taskprocessor *mailbox;
386         /*! Callback function for incoming message processing. */
387         stasis_subscription_cb callback;
388         /*! Data pointer to be handed to the callback. */
389         void *data;
390
391         /*! Condition for joining with subscription. */
392         ast_cond_t join_cond;
393         /*! Flag set when final message for sub has been received.
394          *  Be sure join_lock is held before reading/setting. */
395         int final_message_rxed;
396         /*! Flag set when final message for sub has been processed.
397          *  Be sure join_lock is held before reading/setting. */
398         int final_message_processed;
399
400         /*! The message types this subscription is accepting */
401         AST_VECTOR(, char) accepted_message_types;
402         /*! The message filter currently in use */
403         enum stasis_subscription_message_filter filter;
404 };
405
406 static void subscription_dtor(void *obj)
407 {
408         struct stasis_subscription *sub = obj;
409
410         /* Subscriptions need to be manually unsubscribed before destruction
411          * b/c there's a cyclic reference between topics and subscriptions */
412         ast_assert(!stasis_subscription_is_subscribed(sub));
413         /* If there are any messages in flight to this subscription; that would
414          * be bad. */
415         ast_assert(stasis_subscription_is_done(sub));
416
417         ao2_cleanup(sub->topic);
418         sub->topic = NULL;
419         ast_taskprocessor_unreference(sub->mailbox);
420         sub->mailbox = NULL;
421         ast_cond_destroy(&sub->join_cond);
422
423         AST_VECTOR_FREE(&sub->accepted_message_types);
424 }
425
426 /*!
427  * \brief Invoke the subscription's callback.
428  * \param sub Subscription to invoke.
429  * \param topic Topic message was published to.
430  * \param message Message to send.
431  */
432 static void subscription_invoke(struct stasis_subscription *sub,
433                                   struct stasis_message *message)
434 {
435         unsigned int final = stasis_subscription_final_message(sub, message);
436         int message_type_id = stasis_message_type_id(stasis_subscription_change_type());
437
438         /* Notify that the final message has been received */
439         if (final) {
440                 ao2_lock(sub);
441                 sub->final_message_rxed = 1;
442                 ast_cond_signal(&sub->join_cond);
443                 ao2_unlock(sub);
444         }
445
446         if (!final || sub->filter != STASIS_SUBSCRIPTION_FILTER_SELECTIVE ||
447                 (message_type_id < AST_VECTOR_SIZE(&sub->accepted_message_types) && AST_VECTOR_GET(&sub->accepted_message_types, message_type_id))) {
448                 /* Since sub is mostly immutable, no need to lock sub */
449                 sub->callback(sub->data, sub, message);
450         }
451
452         /* Notify that the final message has been processed */
453         if (final) {
454                 ao2_lock(sub);
455                 sub->final_message_processed = 1;
456                 ast_cond_signal(&sub->join_cond);
457                 ao2_unlock(sub);
458         }
459 }
460
461 static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
462 static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
463
464 void stasis_subscription_cb_noop(void *data, struct stasis_subscription *sub, struct stasis_message *message)
465 {
466 }
467
468 struct stasis_subscription *internal_stasis_subscribe(
469         struct stasis_topic *topic,
470         stasis_subscription_cb callback,
471         void *data,
472         int needs_mailbox,
473         int use_thread_pool)
474 {
475         struct stasis_subscription *sub;
476
477         if (!topic) {
478                 return NULL;
479         }
480
481         /* The ao2 lock is used for join_cond. */
482         sub = ao2_t_alloc(sizeof(*sub), subscription_dtor, stasis_topic_name(topic));
483         if (!sub) {
484                 return NULL;
485         }
486         ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid));
487
488         if (needs_mailbox) {
489                 char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
490
491                 /* Create name with seq number appended. */
492                 ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "sub%c:%s",
493                         use_thread_pool ? 'p' : 'm',
494                         stasis_topic_name(topic));
495
496                 /*
497                  * With a small number of subscribers, a thread-per-sub is
498                  * acceptable. For a large number of subscribers, a thread
499                  * pool should be used.
500                  */
501                 if (use_thread_pool) {
502                         sub->mailbox = ast_threadpool_serializer(tps_name, pool);
503                 } else {
504                         sub->mailbox = ast_taskprocessor_get(tps_name, TPS_REF_DEFAULT);
505                 }
506                 if (!sub->mailbox) {
507                         ao2_ref(sub, -1);
508
509                         return NULL;
510                 }
511                 ast_taskprocessor_set_local(sub->mailbox, sub);
512                 /* Taskprocessor has a reference */
513                 ao2_ref(sub, +1);
514         }
515
516         ao2_ref(topic, +1);
517         sub->topic = topic;
518         sub->callback = callback;
519         sub->data = data;
520         ast_cond_init(&sub->join_cond, NULL);
521         sub->filter = STASIS_SUBSCRIPTION_FILTER_NONE;
522         AST_VECTOR_INIT(&sub->accepted_message_types, 0);
523
524         if (topic_add_subscription(topic, sub) != 0) {
525                 ao2_ref(sub, -1);
526
527                 return NULL;
528         }
529         send_subscription_subscribe(topic, sub);
530
531         return sub;
532 }
533
534 struct stasis_subscription *stasis_subscribe(
535         struct stasis_topic *topic,
536         stasis_subscription_cb callback,
537         void *data)
538 {
539         return internal_stasis_subscribe(topic, callback, data, 1, 0);
540 }
541
542 struct stasis_subscription *stasis_subscribe_pool(
543         struct stasis_topic *topic,
544         stasis_subscription_cb callback,
545         void *data)
546 {
547         return internal_stasis_subscribe(topic, callback, data, 1, 1);
548 }
549
550 static int sub_cleanup(void *data)
551 {
552         struct stasis_subscription *sub = data;
553         ao2_cleanup(sub);
554         return 0;
555 }
556
557 struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
558 {
559         /* The subscription may be the last ref to this topic. Hold
560          * the topic ref open until after the unlock. */
561         struct stasis_topic *topic;
562
563         if (!sub) {
564                 return NULL;
565         }
566
567         topic = ao2_bump(sub->topic);
568
569         /* We have to remove the subscription first, to ensure the unsubscribe
570          * is the final message */
571         if (topic_remove_subscription(sub->topic, sub) != 0) {
572                 ast_log(LOG_ERROR,
573                         "Internal error: subscription has invalid topic\n");
574                 ao2_cleanup(topic);
575
576                 return NULL;
577         }
578
579         /* Now let everyone know about the unsubscribe */
580         send_subscription_unsubscribe(topic, sub);
581
582         /* When all that's done, remove the ref the mailbox has on the sub */
583         if (sub->mailbox) {
584                 if (ast_taskprocessor_push(sub->mailbox, sub_cleanup, sub)) {
585                         /* Nothing we can do here, the conditional is just to keep
586                          * the compiler happy that we're not ignoring the result. */
587                 }
588         }
589
590         /* Unsubscribing unrefs the subscription */
591         ao2_cleanup(sub);
592         ao2_cleanup(topic);
593
594         return NULL;
595 }
596
597 int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscription,
598         long low_water, long high_water)
599 {
600         int res = -1;
601
602         if (subscription) {
603                 res = ast_taskprocessor_alert_set_levels(subscription->mailbox,
604                         low_water, high_water);
605         }
606         return res;
607 }
608
609 int stasis_subscription_accept_message_type(struct stasis_subscription *subscription,
610         const struct stasis_message_type *type)
611 {
612         if (!subscription) {
613                 return -1;
614         }
615
616         ast_assert(type != NULL);
617         ast_assert(stasis_message_type_name(type) != NULL);
618
619         if (!type || !stasis_message_type_name(type)) {
620                 /* Filtering is unreliable as this message type is not yet initialized
621                  * so force all messages through.
622                  */
623                 subscription->filter = STASIS_SUBSCRIPTION_FILTER_FORCED_NONE;
624                 return 0;
625         }
626
627         ao2_lock(subscription->topic);
628         if (AST_VECTOR_REPLACE(&subscription->accepted_message_types, stasis_message_type_id(type), 1)) {
629                 /* We do this for the same reason as above. The subscription can still operate, so allow
630                  * it to do so by forcing all messages through.
631                  */
632                 subscription->filter = STASIS_SUBSCRIPTION_FILTER_FORCED_NONE;
633         }
634         ao2_unlock(subscription->topic);
635
636         return 0;
637 }
638
639 int stasis_subscription_decline_message_type(struct stasis_subscription *subscription,
640         const struct stasis_message_type *type)
641 {
642         if (!subscription) {
643                 return -1;
644         }
645
646         ast_assert(type != NULL);
647         ast_assert(stasis_message_type_name(type) != NULL);
648
649         if (!type || !stasis_message_type_name(type)) {
650                 return 0;
651         }
652
653         ao2_lock(subscription->topic);
654         if (stasis_message_type_id(type) < AST_VECTOR_SIZE(&subscription->accepted_message_types)) {
655                 /* The memory is already allocated so this can't fail */
656                 AST_VECTOR_REPLACE(&subscription->accepted_message_types, stasis_message_type_id(type), 0);
657         }
658         ao2_unlock(subscription->topic);
659
660         return 0;
661 }
662
663 int stasis_subscription_set_filter(struct stasis_subscription *subscription,
664         enum stasis_subscription_message_filter filter)
665 {
666         if (!subscription) {
667                 return -1;
668         }
669
670         ao2_lock(subscription->topic);
671         if (subscription->filter != STASIS_SUBSCRIPTION_FILTER_FORCED_NONE) {
672                 subscription->filter = filter;
673         }
674         ao2_unlock(subscription->topic);
675
676         return 0;
677 }
678
679 void stasis_subscription_join(struct stasis_subscription *subscription)
680 {
681         if (subscription) {
682                 ao2_lock(subscription);
683                 /* Wait until the processed flag has been set */
684                 while (!subscription->final_message_processed) {
685                         ast_cond_wait(&subscription->join_cond,
686                                 ao2_object_get_lockaddr(subscription));
687                 }
688                 ao2_unlock(subscription);
689         }
690 }
691
692 int stasis_subscription_is_done(struct stasis_subscription *subscription)
693 {
694         if (subscription) {
695                 int ret;
696
697                 ao2_lock(subscription);
698                 ret = subscription->final_message_rxed;
699                 ao2_unlock(subscription);
700
701                 return ret;
702         }
703
704         /* Null subscription is about as done as you can get */
705         return 1;
706 }
707
708 struct stasis_subscription *stasis_unsubscribe_and_join(
709         struct stasis_subscription *subscription)
710 {
711         if (!subscription) {
712                 return NULL;
713         }
714
715         /* Bump refcount to hold it past the unsubscribe */
716         ao2_ref(subscription, +1);
717         stasis_unsubscribe(subscription);
718         stasis_subscription_join(subscription);
719         /* Now decrement the refcount back */
720         ao2_cleanup(subscription);
721         return NULL;
722 }
723
724 int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
725 {
726         if (sub) {
727                 size_t i;
728                 struct stasis_topic *topic = sub->topic;
729
730                 ao2_lock(topic);
731                 for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
732                         if (AST_VECTOR_GET(&topic->subscribers, i) == sub) {
733                                 ao2_unlock(topic);
734                                 return 1;
735                         }
736                 }
737                 ao2_unlock(topic);
738         }
739
740         return 0;
741 }
742
743 const char *stasis_subscription_uniqueid(const struct stasis_subscription *sub)
744 {
745         return sub->uniqueid;
746 }
747
748 int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
749 {
750         struct stasis_subscription_change *change;
751
752         if (stasis_message_type(msg) != stasis_subscription_change_type()) {
753                 return 0;
754         }
755
756         change = stasis_message_data(msg);
757         if (strcmp("Unsubscribe", change->description)) {
758                 return 0;
759         }
760
761         if (strcmp(stasis_subscription_uniqueid(sub), change->uniqueid)) {
762                 return 0;
763         }
764
765         return 1;
766 }
767
768 /*!
769  * \brief Add a subscriber to a topic.
770  * \param topic Topic
771  * \param sub Subscriber
772  * \return 0 on success
773  * \return Non-zero on error
774  */
775 static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
776 {
777         size_t idx;
778
779         ao2_lock(topic);
780         /* The reference from the topic to the subscription is shared with
781          * the owner of the subscription, which will explicitly unsubscribe
782          * to release it.
783          *
784          * If we bumped the refcount here, the owner would have to unsubscribe
785          * and cleanup, which is a bit awkward. */
786         AST_VECTOR_APPEND(&topic->subscribers, sub);
787
788         for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
789                 topic_add_subscription(
790                         AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
791         }
792         ao2_unlock(topic);
793
794         return 0;
795 }
796
797 static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
798 {
799         size_t idx;
800         int res;
801
802         ao2_lock(topic);
803         for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
804                 topic_remove_subscription(
805                         AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
806         }
807         res = AST_VECTOR_REMOVE_ELEM_UNORDERED(&topic->subscribers, sub,
808                 AST_VECTOR_ELEM_CLEANUP_NOOP);
809         ao2_unlock(topic);
810
811         return res;
812 }
813
814 /*!
815  * \internal \brief Dispatch a message to a subscriber asynchronously
816  * \param local \ref ast_taskprocessor_local object
817  * \return 0
818  */
819 static int dispatch_exec_async(struct ast_taskprocessor_local *local)
820 {
821         struct stasis_subscription *sub = local->local_data;
822         struct stasis_message *message = local->data;
823
824         subscription_invoke(sub, message);
825         ao2_cleanup(message);
826
827         return 0;
828 }
829
830 /*!
831  * \internal \brief Data passed to \ref dispatch_exec_sync to synchronize
832  * a published message to a subscriber
833  */
834 struct sync_task_data {
835         ast_mutex_t lock;
836         ast_cond_t cond;
837         int complete;
838         void *task_data;
839 };
840
841 /*!
842  * \internal \brief Dispatch a message to a subscriber synchronously
843  * \param local \ref ast_taskprocessor_local object
844  * \return 0
845  */
846 static int dispatch_exec_sync(struct ast_taskprocessor_local *local)
847 {
848         struct stasis_subscription *sub = local->local_data;
849         struct sync_task_data *std = local->data;
850         struct stasis_message *message = std->task_data;
851
852         subscription_invoke(sub, message);
853         ao2_cleanup(message);
854
855         ast_mutex_lock(&std->lock);
856         std->complete = 1;
857         ast_cond_signal(&std->cond);
858         ast_mutex_unlock(&std->lock);
859
860         return 0;
861 }
862
863 /*!
864  * \internal \brief Dispatch a message to a subscriber
865  * \param sub The subscriber to dispatch to
866  * \param message The message to send
867  * \param synchronous If non-zero, synchronize on the subscriber receiving
868  * the message
869  */
870 static void dispatch_message(struct stasis_subscription *sub,
871         struct stasis_message *message,
872         int synchronous)
873 {
874         /* Determine if this subscription is interested in this message. Note that final
875          * messages are special and are always invoked on the subscription.
876          */
877         if (sub->filter == STASIS_SUBSCRIPTION_FILTER_SELECTIVE) {
878                 int message_type_id = stasis_message_type_id(stasis_message_type(message));
879                 if ((message_type_id >= AST_VECTOR_SIZE(&sub->accepted_message_types) ||
880                         !AST_VECTOR_GET(&sub->accepted_message_types, message_type_id)) &&
881                         !stasis_subscription_final_message(sub, message)) {
882                         return;
883                 }
884         }
885
886         if (!sub->mailbox) {
887                 /* Dispatch directly */
888                 subscription_invoke(sub, message);
889                 return;
890         }
891
892         /* Bump the message for the taskprocessor push. This will get de-ref'd
893          * by the task processor callback.
894          */
895         ao2_bump(message);
896         if (!synchronous) {
897                 if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec_async, message)) {
898                         /* Push failed; ugh. */
899                         ast_log(LOG_ERROR, "Dropping async dispatch\n");
900                         ao2_cleanup(message);
901                 }
902         } else {
903                 struct sync_task_data std;
904
905                 ast_mutex_init(&std.lock);
906                 ast_cond_init(&std.cond, NULL);
907                 std.complete = 0;
908                 std.task_data = message;
909
910                 if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec_sync, &std)) {
911                         /* Push failed; ugh. */
912                         ast_log(LOG_ERROR, "Dropping sync dispatch\n");
913                         ao2_cleanup(message);
914                         ast_mutex_destroy(&std.lock);
915                         ast_cond_destroy(&std.cond);
916                         return;
917                 }
918
919                 ast_mutex_lock(&std.lock);
920                 while (!std.complete) {
921                         ast_cond_wait(&std.cond, &std.lock);
922                 }
923                 ast_mutex_unlock(&std.lock);
924
925                 ast_mutex_destroy(&std.lock);
926                 ast_cond_destroy(&std.cond);
927         }
928 }
929
930 /*!
931  * \internal \brief Publish a message to a topic's subscribers
932  * \brief topic The topic to publish to
933  * \brief message The message to publish
934  * \brief sync_sub An optional subscriber of the topic to publish synchronously
935  * to
936  */
937 static void publish_msg(struct stasis_topic *topic,
938         struct stasis_message *message, struct stasis_subscription *sync_sub)
939 {
940         size_t i;
941
942         ast_assert(topic != NULL);
943         ast_assert(message != NULL);
944
945         /* If there are no subscribers don't bother */
946         if (!stasis_topic_subscribers(topic)) {
947                 return;
948         }
949
950         /*
951          * The topic may be unref'ed by the subscription invocation.
952          * Make sure we hold onto a reference while dispatching.
953          */
954         ao2_ref(topic, +1);
955         ao2_lock(topic);
956         for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
957                 struct stasis_subscription *sub = AST_VECTOR_GET(&topic->subscribers, i);
958
959                 ast_assert(sub != NULL);
960
961                 dispatch_message(sub, message, (sub == sync_sub));
962         }
963         ao2_unlock(topic);
964         ao2_ref(topic, -1);
965 }
966
967 void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
968 {
969         publish_msg(topic, message, NULL);
970 }
971
972 void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message)
973 {
974         ast_assert(sub != NULL);
975
976         publish_msg(sub->topic, message, sub);
977 }
978
979 /*!
980  * \brief Forwarding information
981  *
982  * Any message posted to \a from_topic is forwarded to \a to_topic.
983  *
984  * In cases where both the \a from_topic and \a to_topic need to be locked,
985  * always lock the \a to_topic first, then the \a from_topic. Lest you deadlock.
986  */
987 struct stasis_forward {
988         /*! Originating topic */
989         struct stasis_topic *from_topic;
990         /*! Destination topic */
991         struct stasis_topic *to_topic;
992 };
993
994 static void forward_dtor(void *obj)
995 {
996         struct stasis_forward *forward = obj;
997
998         ao2_cleanup(forward->from_topic);
999         forward->from_topic = NULL;
1000         ao2_cleanup(forward->to_topic);
1001         forward->to_topic = NULL;
1002 }
1003
1004 struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward)
1005 {
1006         int idx;
1007         struct stasis_topic *from;
1008         struct stasis_topic *to;
1009
1010         if (!forward) {
1011                 return NULL;
1012         }
1013
1014         from = forward->from_topic;
1015         to = forward->to_topic;
1016
1017         if (from && to) {
1018                 topic_lock_both(to, from);
1019                 AST_VECTOR_REMOVE_ELEM_UNORDERED(&to->upstream_topics, from,
1020                         AST_VECTOR_ELEM_CLEANUP_NOOP);
1021
1022                 for (idx = 0; idx < AST_VECTOR_SIZE(&to->subscribers); ++idx) {
1023                         topic_remove_subscription(from, AST_VECTOR_GET(&to->subscribers, idx));
1024                 }
1025                 ao2_unlock(from);
1026                 ao2_unlock(to);
1027         }
1028
1029         ao2_cleanup(forward);
1030
1031         return NULL;
1032 }
1033
1034 struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic,
1035         struct stasis_topic *to_topic)
1036 {
1037         int res;
1038         size_t idx;
1039         struct stasis_forward *forward;
1040
1041         if (!from_topic || !to_topic) {
1042                 return NULL;
1043         }
1044
1045         forward = ao2_alloc_options(sizeof(*forward), forward_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
1046         if (!forward) {
1047                 return NULL;
1048         }
1049
1050         /* Forwards to ourselves are implicit. */
1051         if (to_topic == from_topic) {
1052                 return forward;
1053         }
1054
1055         forward->from_topic = ao2_bump(from_topic);
1056         forward->to_topic = ao2_bump(to_topic);
1057
1058         topic_lock_both(to_topic, from_topic);
1059         res = AST_VECTOR_APPEND(&to_topic->upstream_topics, from_topic);
1060         if (res != 0) {
1061                 ao2_unlock(from_topic);
1062                 ao2_unlock(to_topic);
1063                 ao2_ref(forward, -1);
1064                 return NULL;
1065         }
1066
1067         for (idx = 0; idx < AST_VECTOR_SIZE(&to_topic->subscribers); ++idx) {
1068                 topic_add_subscription(from_topic, AST_VECTOR_GET(&to_topic->subscribers, idx));
1069         }
1070         ao2_unlock(from_topic);
1071         ao2_unlock(to_topic);
1072
1073         return forward;
1074 }
1075
1076 static void subscription_change_dtor(void *obj)
1077 {
1078         struct stasis_subscription_change *change = obj;
1079
1080         ao2_cleanup(change->topic);
1081 }
1082
1083 static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, const char *uniqueid, const char *description)
1084 {
1085         size_t description_len = strlen(description) + 1;
1086         struct stasis_subscription_change *change;
1087
1088         change = ao2_alloc_options(sizeof(*change) + description_len + strlen(uniqueid) + 1,
1089                 subscription_change_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
1090         if (!change) {
1091                 return NULL;
1092         }
1093
1094         strcpy(change->description, description); /* SAFE */
1095         change->uniqueid = change->description + description_len;
1096         strcpy(change->uniqueid, uniqueid); /* SAFE */
1097         ao2_ref(topic, +1);
1098         change->topic = topic;
1099
1100         return change;
1101 }
1102
1103 static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
1104 {
1105         struct stasis_subscription_change *change;
1106         struct stasis_message *msg;
1107
1108         /* This assumes that we have already unsubscribed */
1109         ast_assert(stasis_subscription_is_subscribed(sub));
1110
1111         if (!stasis_subscription_change_type()) {
1112                 return;
1113         }
1114
1115         change = subscription_change_alloc(topic, sub->uniqueid, "Subscribe");
1116         if (!change) {
1117                 return;
1118         }
1119
1120         msg = stasis_message_create(stasis_subscription_change_type(), change);
1121         if (!msg) {
1122                 ao2_cleanup(change);
1123                 return;
1124         }
1125
1126         stasis_publish(topic, msg);
1127         ao2_cleanup(msg);
1128         ao2_cleanup(change);
1129 }
1130
1131 static void send_subscription_unsubscribe(struct stasis_topic *topic,
1132         struct stasis_subscription *sub)
1133 {
1134         struct stasis_subscription_change *change;
1135         struct stasis_message *msg;
1136
1137         /* This assumes that we have already unsubscribed */
1138         ast_assert(!stasis_subscription_is_subscribed(sub));
1139
1140         if (!stasis_subscription_change_type()) {
1141                 return;
1142         }
1143
1144         change = subscription_change_alloc(topic, sub->uniqueid, "Unsubscribe");
1145         if (!change) {
1146                 return;
1147         }
1148
1149         msg = stasis_message_create(stasis_subscription_change_type(), change);
1150         if (!msg) {
1151                 ao2_cleanup(change);
1152                 return;
1153         }
1154
1155         stasis_publish(topic, msg);
1156
1157         /* Now we have to dispatch to the subscription itself */
1158         dispatch_message(sub, msg, 0);
1159
1160         ao2_cleanup(msg);
1161         ao2_cleanup(change);
1162 }
1163
1164 struct topic_pool_entry {
1165         struct stasis_forward *forward;
1166         struct stasis_topic *topic;
1167 };
1168
1169 static void topic_pool_entry_dtor(void *obj)
1170 {
1171         struct topic_pool_entry *entry = obj;
1172
1173         entry->forward = stasis_forward_cancel(entry->forward);
1174         ao2_cleanup(entry->topic);
1175         entry->topic = NULL;
1176 }
1177
1178 static struct topic_pool_entry *topic_pool_entry_alloc(void)
1179 {
1180         return ao2_alloc_options(sizeof(struct topic_pool_entry), topic_pool_entry_dtor,
1181                 AO2_ALLOC_OPT_LOCK_NOLOCK);
1182 }
1183
1184 struct stasis_topic_pool {
1185         struct ao2_container *pool_container;
1186         struct stasis_topic *pool_topic;
1187 };
1188
1189 static void topic_pool_dtor(void *obj)
1190 {
1191         struct stasis_topic_pool *pool = obj;
1192
1193 #ifdef AO2_DEBUG
1194         {
1195                 char *container_name =
1196                         ast_alloca(strlen(stasis_topic_name(pool->pool_topic)) + strlen("-pool") + 1);
1197                 sprintf(container_name, "%s-pool", stasis_topic_name(pool->pool_topic));
1198                 ao2_container_unregister(container_name);
1199         }
1200 #endif
1201
1202         ao2_cleanup(pool->pool_container);
1203         pool->pool_container = NULL;
1204         ao2_cleanup(pool->pool_topic);
1205         pool->pool_topic = NULL;
1206 }
1207
1208 static int topic_pool_entry_hash(const void *obj, const int flags)
1209 {
1210         const struct topic_pool_entry *object;
1211         const char *key;
1212
1213         switch (flags & OBJ_SEARCH_MASK) {
1214         case OBJ_SEARCH_KEY:
1215                 key = obj;
1216                 break;
1217         case OBJ_SEARCH_OBJECT:
1218                 object = obj;
1219                 key = stasis_topic_name(object->topic);
1220                 break;
1221         default:
1222                 /* Hash can only work on something with a full key. */
1223                 ast_assert(0);
1224                 return 0;
1225         }
1226         return ast_str_case_hash(key);
1227 }
1228
1229 static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
1230 {
1231         const struct topic_pool_entry *object_left = obj;
1232         const struct topic_pool_entry *object_right = arg;
1233         const char *right_key = arg;
1234         int cmp;
1235
1236         switch (flags & OBJ_SEARCH_MASK) {
1237         case OBJ_SEARCH_OBJECT:
1238                 right_key = stasis_topic_name(object_right->topic);
1239                 /* Fall through */
1240         case OBJ_SEARCH_KEY:
1241                 cmp = strcasecmp(stasis_topic_name(object_left->topic), right_key);
1242                 break;
1243         case OBJ_SEARCH_PARTIAL_KEY:
1244                 /* Not supported by container */
1245                 ast_assert(0);
1246                 cmp = -1;
1247                 break;
1248         default:
1249                 /*
1250                  * What arg points to is specific to this traversal callback
1251                  * and has no special meaning to astobj2.
1252                  */
1253                 cmp = 0;
1254                 break;
1255         }
1256         if (cmp) {
1257                 return 0;
1258         }
1259         /*
1260          * At this point the traversal callback is identical to a sorted
1261          * container.
1262          */
1263         return CMP_MATCH;
1264 }
1265
1266 #ifdef AO2_DEBUG
1267 static void topic_pool_prnt_obj(void *v_obj, void *where, ao2_prnt_fn *prnt)
1268 {
1269         struct topic_pool_entry *entry = v_obj;
1270
1271         if (!entry) {
1272                 return;
1273         }
1274         prnt(where, "%s", stasis_topic_name(entry->topic));
1275 }
1276 #endif
1277
1278 struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic)
1279 {
1280         struct stasis_topic_pool *pool;
1281
1282         pool = ao2_alloc_options(sizeof(*pool), topic_pool_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
1283         if (!pool) {
1284                 return NULL;
1285         }
1286
1287         pool->pool_container = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
1288                 TOPIC_POOL_BUCKETS, topic_pool_entry_hash, NULL, topic_pool_entry_cmp);
1289         if (!pool->pool_container) {
1290                 ao2_cleanup(pool);
1291                 return NULL;
1292         }
1293
1294 #ifdef AO2_DEBUG
1295         {
1296                 char *container_name =
1297                         ast_alloca(strlen(stasis_topic_name(pooled_topic)) + strlen("-pool") + 1);
1298                 sprintf(container_name, "%s-pool", stasis_topic_name(pooled_topic));
1299                 ao2_container_register(container_name, pool->pool_container, topic_pool_prnt_obj);
1300         }
1301 #endif
1302
1303         ao2_ref(pooled_topic, +1);
1304         pool->pool_topic = pooled_topic;
1305
1306         return pool;
1307 }
1308
1309 void stasis_topic_pool_delete_topic(struct stasis_topic_pool *pool, const char *topic_name)
1310 {
1311         ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY | OBJ_NODATA | OBJ_UNLINK);
1312 }
1313
1314 struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
1315 {
1316         RAII_VAR(struct topic_pool_entry *, topic_pool_entry, NULL, ao2_cleanup);
1317         SCOPED_AO2LOCK(topic_container_lock, pool->pool_container);
1318
1319         topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1320         if (topic_pool_entry) {
1321                 return topic_pool_entry->topic;
1322         }
1323
1324         topic_pool_entry = topic_pool_entry_alloc();
1325         if (!topic_pool_entry) {
1326                 return NULL;
1327         }
1328
1329         topic_pool_entry->topic = stasis_topic_create(topic_name);
1330         if (!topic_pool_entry->topic) {
1331                 return NULL;
1332         }
1333
1334         topic_pool_entry->forward = stasis_forward_all(topic_pool_entry->topic, pool->pool_topic);
1335         if (!topic_pool_entry->forward) {
1336                 return NULL;
1337         }
1338
1339         if (!ao2_link_flags(pool->pool_container, topic_pool_entry, OBJ_NOLOCK)) {
1340                 return NULL;
1341         }
1342
1343         return topic_pool_entry->topic;
1344 }
1345
1346 int stasis_topic_pool_topic_exists(const struct stasis_topic_pool *pool, const char *topic_name)
1347 {
1348         struct topic_pool_entry *topic_pool_entry;
1349
1350         topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY);
1351         if (!topic_pool_entry) {
1352                 return 0;
1353         }
1354
1355         ao2_ref(topic_pool_entry, -1);
1356         return 1;
1357 }
1358
1359 void stasis_log_bad_type_access(const char *name)
1360 {
1361 #ifdef AST_DEVMODE
1362         if (!stasis_message_type_declined(name)) {
1363                 ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name);
1364         }
1365 #endif
1366 }
1367
1368 /*! \brief A multi object blob data structure to carry user event stasis messages */
1369 struct ast_multi_object_blob {
1370         struct ast_json *blob;                             /*< A blob of JSON data */
1371         AST_VECTOR(, void *) snapshots[STASIS_UMOS_MAX];   /*< Vector of snapshots for each type */
1372 };
1373
1374 /*!
1375  * \internal
1376  * \brief Destructor for \ref ast_multi_object_blob objects
1377  */
1378 static void multi_object_blob_dtor(void *obj)
1379 {
1380         struct ast_multi_object_blob *multi = obj;
1381         int type;
1382         int i;
1383
1384         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1385                 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
1386                         ao2_cleanup(AST_VECTOR_GET(&multi->snapshots[type], i));
1387                 }
1388                 AST_VECTOR_FREE(&multi->snapshots[type]);
1389         }
1390         ast_json_unref(multi->blob);
1391 }
1392
1393 /*! \brief Create a stasis user event multi object blob */
1394 struct ast_multi_object_blob *ast_multi_object_blob_create(struct ast_json *blob)
1395 {
1396         int type;
1397         struct ast_multi_object_blob *multi;
1398
1399         ast_assert(blob != NULL);
1400
1401         multi = ao2_alloc(sizeof(*multi), multi_object_blob_dtor);
1402         if (!multi) {
1403                 return NULL;
1404         }
1405
1406         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1407                 if (AST_VECTOR_INIT(&multi->snapshots[type], 0)) {
1408                         ao2_ref(multi, -1);
1409
1410                         return NULL;
1411                 }
1412         }
1413
1414         multi->blob = ast_json_ref(blob);
1415
1416         return multi;
1417 }
1418
1419 /*! \brief Add an object (snapshot) to the blob */
1420 void ast_multi_object_blob_add(struct ast_multi_object_blob *multi,
1421         enum stasis_user_multi_object_snapshot_type type, void *object)
1422 {
1423         if (!multi || !object || AST_VECTOR_APPEND(&multi->snapshots[type], object)) {
1424                 ao2_cleanup(object);
1425         }
1426 }
1427
1428 /*! \brief Publish single channel user event (for app_userevent compatibility) */
1429 void ast_multi_object_blob_single_channel_publish(struct ast_channel *chan,
1430         struct stasis_message_type *type, struct ast_json *blob)
1431 {
1432         struct stasis_message *message;
1433         struct ast_channel_snapshot *channel_snapshot;
1434         struct ast_multi_object_blob *multi;
1435
1436         if (!type) {
1437                 return;
1438         }
1439
1440         multi = ast_multi_object_blob_create(blob);
1441         if (!multi) {
1442                 return;
1443         }
1444
1445         channel_snapshot = ast_channel_snapshot_create(chan);
1446         if (!channel_snapshot) {
1447                 ao2_ref(multi, -1);
1448                 return;
1449         }
1450
1451         /* this call steals the channel_snapshot reference */
1452         ast_multi_object_blob_add(multi, STASIS_UMOS_CHANNEL, channel_snapshot);
1453
1454         message = stasis_message_create(type, multi);
1455         ao2_ref(multi, -1);
1456         if (message) {
1457                 /* app_userevent still publishes to channel */
1458                 stasis_publish(ast_channel_topic(chan), message);
1459                 ao2_ref(message, -1);
1460         }
1461 }
1462
1463 /*! \internal \brief convert multi object blob to ari json */
1464 static struct ast_json *multi_user_event_to_json(
1465         struct stasis_message *message,
1466         const struct stasis_message_sanitizer *sanitize)
1467 {
1468         struct ast_json *out;
1469         struct ast_multi_object_blob *multi = stasis_message_data(message);
1470         struct ast_json *blob = multi->blob;
1471         const struct timeval *tv = stasis_message_timestamp(message);
1472         enum stasis_user_multi_object_snapshot_type type;
1473         int i;
1474
1475         out = ast_json_object_create();
1476         if (!out) {
1477                 return NULL;
1478         }
1479
1480         ast_json_object_set(out, "type", ast_json_string_create("ChannelUserevent"));
1481         ast_json_object_set(out, "timestamp", ast_json_timeval(*tv, NULL));
1482         ast_json_object_set(out, "eventname", ast_json_ref(ast_json_object_get(blob, "eventname")));
1483         ast_json_object_set(out, "userevent", ast_json_ref(blob));
1484
1485         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1486                 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
1487                         struct ast_json *json_object = NULL;
1488                         char *name = NULL;
1489                         void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
1490
1491                         switch (type) {
1492                         case STASIS_UMOS_CHANNEL:
1493                                 json_object = ast_channel_snapshot_to_json(snapshot, sanitize);
1494                                 name = "channel";
1495                                 break;
1496                         case STASIS_UMOS_BRIDGE:
1497                                 json_object = ast_bridge_snapshot_to_json(snapshot, sanitize);
1498                                 name = "bridge";
1499                                 break;
1500                         case STASIS_UMOS_ENDPOINT:
1501                                 json_object = ast_endpoint_snapshot_to_json(snapshot, sanitize);
1502                                 name = "endpoint";
1503                                 break;
1504                         }
1505                         if (json_object) {
1506                                 ast_json_object_set(out, name, json_object);
1507                         }
1508                 }
1509         }
1510
1511         return out;
1512 }
1513
1514 /*! \internal \brief convert multi object blob to ami string */
1515 static struct ast_str *multi_object_blob_to_ami(void *obj)
1516 {
1517         struct ast_str *ami_str=ast_str_create(1024);
1518         struct ast_str *ami_snapshot;
1519         const struct ast_multi_object_blob *multi = obj;
1520         enum stasis_user_multi_object_snapshot_type type;
1521         int i;
1522
1523         if (!ami_str) {
1524                 return NULL;
1525         }
1526         if (!multi) {
1527                 ast_free(ami_str);
1528                 return NULL;
1529         }
1530
1531         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1532                 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
1533                         char *name = NULL;
1534                         void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
1535                         ami_snapshot = NULL;
1536
1537                         if (i > 0) {
1538                                 ast_asprintf(&name, "%d", i + 1);
1539                         }
1540
1541                         switch (type) {
1542                         case STASIS_UMOS_CHANNEL:
1543                                 ami_snapshot = ast_manager_build_channel_state_string_prefix(snapshot, name ?: "");
1544                                 break;
1545
1546                         case STASIS_UMOS_BRIDGE:
1547                                 ami_snapshot = ast_manager_build_bridge_state_string_prefix(snapshot, name ?: "");
1548                                 break;
1549
1550                         case STASIS_UMOS_ENDPOINT:
1551                                 /* currently not sending endpoint snapshots to AMI */
1552                                 break;
1553                         }
1554                         if (ami_snapshot) {
1555                                 ast_str_append(&ami_str, 0, "%s", ast_str_buffer(ami_snapshot));
1556                                 ast_free(ami_snapshot);
1557                         }
1558                         ast_free(name);
1559                 }
1560         }
1561
1562         return ami_str;
1563 }
1564
1565 /*! \internal \brief Callback to pass only user defined parameters from blob */
1566 static int userevent_exclusion_cb(const char *key)
1567 {
1568         if (!strcmp("eventname", key)) {
1569                 return 1;
1570         }
1571         return 0;
1572 }
1573
1574 static struct ast_manager_event_blob *multi_user_event_to_ami(
1575         struct stasis_message *message)
1576 {
1577         RAII_VAR(struct ast_str *, object_string, NULL, ast_free);
1578         RAII_VAR(struct ast_str *, body, NULL, ast_free);
1579         struct ast_multi_object_blob *multi = stasis_message_data(message);
1580         const char *eventname;
1581
1582         eventname = ast_json_string_get(ast_json_object_get(multi->blob, "eventname"));
1583         body = ast_manager_str_from_json_object(multi->blob, userevent_exclusion_cb);
1584         object_string = multi_object_blob_to_ami(multi);
1585         if (!object_string || !body) {
1586                 return NULL;
1587         }
1588
1589         return ast_manager_event_blob_create(EVENT_FLAG_USER, "UserEvent",
1590                 "%s"
1591                 "UserEvent: %s\r\n"
1592                 "%s",
1593                 ast_str_buffer(object_string),
1594                 eventname,
1595                 ast_str_buffer(body));
1596 }
1597
1598 /*! \brief A structure to hold global configuration-related options */
1599 struct stasis_declined_config {
1600         /*! The list of message types to decline */
1601         struct ao2_container *declined;
1602 };
1603
1604 /*! \brief Threadpool configuration options */
1605 struct stasis_threadpool_conf {
1606         /*! Initial size of the thread pool */
1607         int initial_size;
1608         /*! Time, in seconds, before we expire a thread */
1609         int idle_timeout_sec;
1610         /*! Maximum number of thread to allow */
1611         int max_size;
1612 };
1613
1614 struct stasis_config {
1615         /*! Thread pool configuration options */
1616         struct stasis_threadpool_conf *threadpool_options;
1617         /*! Declined message types */
1618         struct stasis_declined_config *declined_message_types;
1619 };
1620
1621 static struct aco_type threadpool_option = {
1622         .type = ACO_GLOBAL,
1623         .name = "threadpool",
1624         .item_offset = offsetof(struct stasis_config, threadpool_options),
1625         .category = "threadpool",
1626         .category_match = ACO_WHITELIST_EXACT,
1627 };
1628
1629 static struct aco_type *threadpool_options[] = ACO_TYPES(&threadpool_option);
1630
1631 /*! \brief An aco_type structure to link the "declined_message_types" category to the stasis_declined_config type */
1632 static struct aco_type declined_option = {
1633         .type = ACO_GLOBAL,
1634         .name = "declined_message_types",
1635         .item_offset = offsetof(struct stasis_config, declined_message_types),
1636         .category_match = ACO_WHITELIST_EXACT,
1637         .category = "declined_message_types",
1638 };
1639
1640 struct aco_type *declined_options[] = ACO_TYPES(&declined_option);
1641
1642 struct aco_file stasis_conf = {
1643         .filename = "stasis.conf",
1644         .types = ACO_TYPES(&declined_option, &threadpool_option),
1645 };
1646
1647 /*! \brief A global object container that will contain the stasis_config that gets swapped out on reloads */
1648 static AO2_GLOBAL_OBJ_STATIC(globals);
1649
1650 static void *stasis_config_alloc(void);
1651
1652 /*! \brief Register information about the configs being processed by this module */
1653 CONFIG_INFO_CORE("stasis", cfg_info, globals, stasis_config_alloc,
1654         .files = ACO_FILES(&stasis_conf),
1655 );
1656
1657 static void stasis_declined_config_destructor(void *obj)
1658 {
1659         struct stasis_declined_config *declined = obj;
1660
1661         ao2_cleanup(declined->declined);
1662 }
1663
1664 static void stasis_config_destructor(void *obj)
1665 {
1666         struct stasis_config *cfg = obj;
1667
1668         ao2_cleanup(cfg->declined_message_types);
1669         ast_free(cfg->threadpool_options);
1670 }
1671
1672 static void *stasis_config_alloc(void)
1673 {
1674         struct stasis_config *cfg;
1675
1676         if (!(cfg = ao2_alloc(sizeof(*cfg), stasis_config_destructor))) {
1677                 return NULL;
1678         }
1679
1680         cfg->threadpool_options = ast_calloc(1, sizeof(*cfg->threadpool_options));
1681         if (!cfg->threadpool_options) {
1682                 ao2_ref(cfg, -1);
1683                 return NULL;
1684         }
1685
1686         cfg->declined_message_types = ao2_alloc(sizeof(*cfg->declined_message_types),
1687                 stasis_declined_config_destructor);
1688         if (!cfg->declined_message_types) {
1689                 ao2_ref(cfg, -1);
1690                 return NULL;
1691         }
1692
1693         cfg->declined_message_types->declined = ast_str_container_alloc(13);
1694         if (!cfg->declined_message_types->declined) {
1695                 ao2_ref(cfg, -1);
1696                 return NULL;
1697         }
1698
1699         return cfg;
1700 }
1701
1702 int stasis_message_type_declined(const char *name)
1703 {
1704         struct stasis_config *cfg = ao2_global_obj_ref(globals);
1705         char *name_in_declined;
1706         int res;
1707
1708         if (!cfg || !cfg->declined_message_types) {
1709                 ao2_cleanup(cfg);
1710                 return 0;
1711         }
1712
1713         name_in_declined = ao2_find(cfg->declined_message_types->declined, name, OBJ_SEARCH_KEY);
1714         res = name_in_declined ? 1 : 0;
1715         ao2_cleanup(name_in_declined);
1716         ao2_ref(cfg, -1);
1717         if (res) {
1718                 ast_log(LOG_NOTICE, "Declining to allocate Stasis message type '%s' due to configuration\n", name);
1719         }
1720         return res;
1721 }
1722
1723 static int declined_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
1724 {
1725         struct stasis_declined_config *declined = obj;
1726
1727         if (ast_strlen_zero(var->value)) {
1728                 return 0;
1729         }
1730
1731         if (ast_str_container_add(declined->declined, var->value)) {
1732                 return -1;
1733         }
1734
1735         return 0;
1736 }
1737
1738 /*!
1739  * @{ \brief Define multi user event message type(s).
1740  */
1741
1742 STASIS_MESSAGE_TYPE_DEFN(ast_multi_user_event_type,
1743         .to_json = multi_user_event_to_json,
1744         .to_ami = multi_user_event_to_ami,
1745         );
1746
1747 /*! @} */
1748
1749 /*! \brief Cleanup function for graceful shutdowns */
1750 static void stasis_cleanup(void)
1751 {
1752         ast_threadpool_shutdown(pool);
1753         pool = NULL;
1754         STASIS_MESSAGE_TYPE_CLEANUP(stasis_subscription_change_type);
1755         STASIS_MESSAGE_TYPE_CLEANUP(ast_multi_user_event_type);
1756         aco_info_destroy(&cfg_info);
1757         ao2_global_obj_release(globals);
1758 }
1759
1760 int stasis_init(void)
1761 {
1762         struct stasis_config *cfg;
1763         int cache_init;
1764         struct ast_threadpool_options threadpool_opts = { 0, };
1765
1766         /* Be sure the types are cleaned up after the message bus */
1767         ast_register_cleanup(stasis_cleanup);
1768
1769         if (aco_info_init(&cfg_info)) {
1770                 return -1;
1771         }
1772
1773         aco_option_register_custom(&cfg_info, "decline", ACO_EXACT,
1774                 declined_options, "", declined_handler, 0);
1775         aco_option_register(&cfg_info, "initial_size", ACO_EXACT,
1776                 threadpool_options, "5", OPT_INT_T, PARSE_IN_RANGE,
1777                 FLDSET(struct stasis_threadpool_conf, initial_size), 0,
1778                 INT_MAX);
1779         aco_option_register(&cfg_info, "idle_timeout_sec", ACO_EXACT,
1780                 threadpool_options, "20", OPT_INT_T, PARSE_IN_RANGE,
1781                 FLDSET(struct stasis_threadpool_conf, idle_timeout_sec), 0,
1782                 INT_MAX);
1783         aco_option_register(&cfg_info, "max_size", ACO_EXACT,
1784                 threadpool_options, "50", OPT_INT_T, PARSE_IN_RANGE,
1785                 FLDSET(struct stasis_threadpool_conf, max_size), 0,
1786                 INT_MAX);
1787
1788         if (aco_process_config(&cfg_info, 0) == ACO_PROCESS_ERROR) {
1789                 struct stasis_config *default_cfg = stasis_config_alloc();
1790
1791                 if (!default_cfg) {
1792                         return -1;
1793                 }
1794
1795                 if (aco_set_defaults(&threadpool_option, "threadpool", default_cfg->threadpool_options)) {
1796                         ast_log(LOG_ERROR, "Failed to initialize defaults on Stasis configuration object\n");
1797                         ao2_ref(default_cfg, -1);
1798
1799                         return -1;
1800                 }
1801
1802                 if (aco_set_defaults(&declined_option, "declined_message_types", default_cfg->declined_message_types)) {
1803                         ast_log(LOG_ERROR, "Failed to load stasis.conf and failed to initialize defaults.\n");
1804                         ao2_ref(default_cfg, -1);
1805
1806                         return -1;
1807                 }
1808
1809                 ast_log(LOG_NOTICE, "Could not load Stasis configuration; using defaults\n");
1810                 ao2_global_obj_replace_unref(globals, default_cfg);
1811                 cfg = default_cfg;
1812         } else {
1813                 cfg = ao2_global_obj_ref(globals);
1814                 if (!cfg) {
1815                         ast_log(LOG_ERROR, "Failed to obtain Stasis configuration object\n");
1816
1817                         return -1;
1818                 }
1819         }
1820
1821         threadpool_opts.version = AST_THREADPOOL_OPTIONS_VERSION;
1822         threadpool_opts.initial_size = cfg->threadpool_options->initial_size;
1823         threadpool_opts.auto_increment = 1;
1824         threadpool_opts.max_size = cfg->threadpool_options->max_size;
1825         threadpool_opts.idle_timeout = cfg->threadpool_options->idle_timeout_sec;
1826         pool = ast_threadpool_create("stasis-core", NULL, &threadpool_opts);
1827         ao2_ref(cfg, -1);
1828         if (!pool) {
1829                 ast_log(LOG_ERROR, "Failed to create 'stasis-core' threadpool\n");
1830
1831                 return -1;
1832         }
1833
1834         cache_init = stasis_cache_init();
1835         if (cache_init != 0) {
1836                 return -1;
1837         }
1838
1839         if (STASIS_MESSAGE_TYPE_INIT(stasis_subscription_change_type) != 0) {
1840                 return -1;
1841         }
1842         if (STASIS_MESSAGE_TYPE_INIT(ast_multi_user_event_type) != 0) {
1843                 return -1;
1844         }
1845
1846         return 0;
1847 }