93112d98ebb9334c9a48771b05243853eb47052a
[asterisk/asterisk.git] / main / stasis.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis Message Bus API.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 #include "asterisk/astobj2.h"
33 #include "asterisk/stasis_internal.h"
34 #include "asterisk/stasis.h"
35 #include "asterisk/taskprocessor.h"
36 #include "asterisk/threadpool.h"
37 #include "asterisk/utils.h"
38 #include "asterisk/uuid.h"
39 #include "asterisk/vector.h"
40 #include "asterisk/stasis_channels.h"
41 #include "asterisk/stasis_bridges.h"
42 #include "asterisk/stasis_endpoints.h"
43 #include "asterisk/config_options.h"
44
45 /*** DOCUMENTATION
46         <managerEvent language="en_US" name="UserEvent">
47                 <managerEventInstance class="EVENT_FLAG_USER">
48                         <synopsis>A user defined event raised from the dialplan.</synopsis>
49                         <syntax>
50                                 <channel_snapshot/>
51                                 <parameter name="UserEvent">
52                                         <para>The event name, as specified in the dialplan.</para>
53                                 </parameter>
54                         </syntax>
55                         <description>
56                                 <para>Event may contain additional arbitrary parameters in addition to optional bridge and endpoint snapshots.  Multiple snapshots of the same type are prefixed with a numeric value.</para>
57                         </description>
58                         <see-also>
59                                 <ref type="application">UserEvent</ref>
60                                 <ref type="managerEvent">UserEvent</ref>
61                         </see-also>
62                 </managerEventInstance>
63         </managerEvent>
64         <configInfo name="stasis" language="en_US">
65                 <configFile name="stasis.conf">
66                         <configObject name="threadpool">
67                                 <synopsis>Settings that configure the threadpool Stasis uses to deliver some messages.</synopsis>
68                                 <configOption name="initial_size" default="5">
69                                         <synopsis>Initial number of threads in the message bus threadpool.</synopsis>
70                                 </configOption>
71                                 <configOption name="idle_timeout_sec" default="20">
72                                         <synopsis>Number of seconds before an idle thread is disposed of.</synopsis>
73                                 </configOption>
74                                 <configOption name="max_size" default="50">
75                                         <synopsis>Maximum number of threads in the threadpool.</synopsis>
76                                 </configOption>
77                         </configObject>
78                         <configObject name="declined_message_types">
79                                 <synopsis>Stasis message types for which to decline creation.</synopsis>
80                                 <configOption name="decline">
81                                         <synopsis>The message type to decline.</synopsis>
82                                         <description>
83                                                 <para>This configuration option defines the name of the Stasis
84                                                 message type that Asterisk is forbidden from creating and can be
85                                                 specified as many times as necessary to achieve the desired result.</para>
86                                                 <enumlist>
87                                                         <enum name="stasis_app_recording_snapshot_type" />
88                                                         <enum name="stasis_app_playback_snapshot_type" />
89                                                         <enum name="stasis_test_message_type" />
90                                                         <enum name="confbridge_start_type" />
91                                                         <enum name="confbridge_end_type" />
92                                                         <enum name="confbridge_join_type" />
93                                                         <enum name="confbridge_leave_type" />
94                                                         <enum name="confbridge_start_record_type" />
95                                                         <enum name="confbridge_stop_record_type" />
96                                                         <enum name="confbridge_mute_type" />
97                                                         <enum name="confbridge_unmute_type" />
98                                                         <enum name="confbridge_talking_type" />
99                                                         <enum name="cel_generic_type" />
100                                                         <enum name="ast_bridge_snapshot_type" />
101                                                         <enum name="ast_bridge_merge_message_type" />
102                                                         <enum name="ast_channel_entered_bridge_type" />
103                                                         <enum name="ast_channel_left_bridge_type" />
104                                                         <enum name="ast_blind_transfer_type" />
105                                                         <enum name="ast_attended_transfer_type" />
106                                                         <enum name="ast_endpoint_snapshot_type" />
107                                                         <enum name="ast_endpoint_state_type" />
108                                                         <enum name="ast_device_state_message_type" />
109                                                         <enum name="ast_test_suite_message_type" />
110                                                         <enum name="ast_mwi_state_type" />
111                                                         <enum name="ast_mwi_vm_app_type" />
112                                                         <enum name="ast_format_register_type" />
113                                                         <enum name="ast_format_unregister_type" />
114                                                         <enum name="ast_manager_get_generic_type" />
115                                                         <enum name="ast_parked_call_type" />
116                                                         <enum name="ast_channel_snapshot_type" />
117                                                         <enum name="ast_channel_dial_type" />
118                                                         <enum name="ast_channel_varset_type" />
119                                                         <enum name="ast_channel_hangup_request_type" />
120                                                         <enum name="ast_channel_dtmf_begin_type" />
121                                                         <enum name="ast_channel_dtmf_end_type" />
122                                                         <enum name="ast_channel_hold_type" />
123                                                         <enum name="ast_channel_unhold_type" />
124                                                         <enum name="ast_channel_chanspy_start_type" />
125                                                         <enum name="ast_channel_chanspy_stop_type" />
126                                                         <enum name="ast_channel_fax_type" />
127                                                         <enum name="ast_channel_hangup_handler_type" />
128                                                         <enum name="ast_channel_moh_start_type" />
129                                                         <enum name="ast_channel_moh_stop_type" />
130                                                         <enum name="ast_channel_monitor_start_type" />
131                                                         <enum name="ast_channel_monitor_stop_type" />
132                                                         <enum name="ast_channel_agent_login_type" />
133                                                         <enum name="ast_channel_agent_logoff_type" />
134                                                         <enum name="ast_channel_talking_start" />
135                                                         <enum name="ast_channel_talking_stop" />
136                                                         <enum name="ast_security_event_type" />
137                                                         <enum name="ast_named_acl_change_type" />
138                                                         <enum name="ast_local_bridge_type" />
139                                                         <enum name="ast_local_optimization_begin_type" />
140                                                         <enum name="ast_local_optimization_end_type" />
141                                                         <enum name="stasis_subscription_change_type" />
142                                                         <enum name="ast_multi_user_event_type" />
143                                                         <enum name="stasis_cache_clear_type" />
144                                                         <enum name="stasis_cache_update_type" />
145                                                         <enum name="ast_network_change_type" />
146                                                         <enum name="ast_system_registry_type" />
147                                                         <enum name="ast_cc_available_type" />
148                                                         <enum name="ast_cc_offertimerstart_type" />
149                                                         <enum name="ast_cc_requested_type" />
150                                                         <enum name="ast_cc_requestacknowledged_type" />
151                                                         <enum name="ast_cc_callerstopmonitoring_type" />
152                                                         <enum name="ast_cc_callerstartmonitoring_type" />
153                                                         <enum name="ast_cc_callerrecalling_type" />
154                                                         <enum name="ast_cc_recallcomplete_type" />
155                                                         <enum name="ast_cc_failure_type" />
156                                                         <enum name="ast_cc_monitorfailed_type" />
157                                                         <enum name="ast_presence_state_message_type" />
158                                                         <enum name="ast_rtp_rtcp_sent_type" />
159                                                         <enum name="ast_rtp_rtcp_received_type" />
160                                                         <enum name="ast_call_pickup_type" />
161                                                         <enum name="aoc_s_type" />
162                                                         <enum name="aoc_d_type" />
163                                                         <enum name="aoc_e_type" />
164                                                         <enum name="dahdichannel_type" />
165                                                         <enum name="mcid_type" />
166                                                         <enum name="session_timeout_type" />
167                                                         <enum name="cdr_read_message_type" />
168                                                         <enum name="cdr_write_message_type" />
169                                                         <enum name="cdr_prop_write_message_type" />
170                                                         <enum name="corosync_ping_message_type" />
171                                                         <enum name="agi_exec_start_type" />
172                                                         <enum name="agi_exec_end_type" />
173                                                         <enum name="agi_async_start_type" />
174                                                         <enum name="agi_async_exec_type" />
175                                                         <enum name="agi_async_end_type" />
176                                                         <enum name="queue_caller_join_type" />
177                                                         <enum name="queue_caller_leave_type" />
178                                                         <enum name="queue_caller_abandon_type" />
179                                                         <enum name="queue_member_status_type" />
180                                                         <enum name="queue_member_added_type" />
181                                                         <enum name="queue_member_removed_type" />
182                                                         <enum name="queue_member_pause_type" />
183                                                         <enum name="queue_member_penalty_type" />
184                                                         <enum name="queue_member_ringinuse_type" />
185                                                         <enum name="queue_agent_called_type" />
186                                                         <enum name="queue_agent_connect_type" />
187                                                         <enum name="queue_agent_complete_type" />
188                                                         <enum name="queue_agent_dump_type" />
189                                                         <enum name="queue_agent_ringnoanswer_type" />
190                                                         <enum name="meetme_join_type" />
191                                                         <enum name="meetme_leave_type" />
192                                                         <enum name="meetme_end_type" />
193                                                         <enum name="meetme_mute_type" />
194                                                         <enum name="meetme_talking_type" />
195                                                         <enum name="meetme_talk_request_type" />
196                                                         <enum name="appcdr_message_type" />
197                                                         <enum name="forkcdr_message_type" />
198                                                         <enum name="cdr_sync_message_type" />
199                                                 </enumlist>
200                                         </description>
201                                 </configOption>
202                         </configObject>
203                 </configFile>
204         </configInfo>
205 ***/
206
207 /*!
208  * \page stasis-impl Stasis Implementation Notes
209  *
210  * \par Reference counting
211  *
212  * Stasis introduces a number of objects, which are tightly related to one
213  * another. Because we rely on ref-counting for memory management, understanding
214  * these relationships is important to understanding this code.
215  *
216  * \code{.txt}
217  *
218  *   stasis_topic <----> stasis_subscription
219  *             ^          ^
220  *              \        /
221  *               \      /
222  *               dispatch
223  *                  |
224  *                  |
225  *                  v
226  *            stasis_message
227  *                  |
228  *                  |
229  *                  v
230  *          stasis_message_type
231  *
232  * \endcode
233  *
234  * The most troubling thing in this chart is the cyclic reference between
235  * stasis_topic and stasis_subscription. This is both unfortunate, and
236  * necessary. Topics need the subscription in order to dispatch messages;
237  * subscriptions need the topics to unsubscribe and check subscription status.
238  *
239  * The cycle is broken by stasis_unsubscribe(). The unsubscribe will remove the
240  * topic's reference to a subscription. When the subcription is destroyed, it
241  * will remove its reference to the topic.
242  *
243  * This means that until a subscription has be explicitly unsubscribed, it will
244  * not be destroyed. Neither will a topic be destroyed while it has subscribers.
245  * The destructors of both have assertions regarding this to catch ref-counting
246  * problems where a subscription or topic has had an extra ao2_cleanup().
247  *
248  * The \ref dispatch object is a transient object, which is posted to a
249  * subscription's taskprocessor to send a message to the subscriber. They have
250  * short life cycles, allocated on one thread, destroyed on another.
251  *
252  * During shutdown, or the deletion of a domain object, there are a flurry of
253  * ao2_cleanup()s on subscriptions and topics, as the final in-flight messages
254  * are processed. Any one of these cleanups could be the one to actually destroy
255  * a given object, so care must be taken to ensure that an object isn't
256  * referenced after an ao2_cleanup(). This includes the implicit ao2_unlock()
257  * that might happen when a RAII_VAR() goes out of scope.
258  *
259  * \par Typical life cycles
260  *
261  *  \li stasis_topic - There are several topics which live for the duration of
262  *      the Asterisk process (ast_channel_topic_all(), etc.) but most of these
263  *      are actually fed by shorter-lived topics whose lifetime is associated
264  *      with some domain object (like ast_channel_topic() for a given
265  *      ast_channel).
266  *
267  *  \li stasis_subscription - Subscriptions have a similar mix of lifetimes as
268  *      topics, for similar reasons.
269  *
270  *  \li dispatch - Very short lived; just long enough to post a message to a
271  *      subscriber.
272  *
273  *  \li stasis_message - Short to intermediate lifetimes, but that is mostly
274  *      irrelevant. Messages are strictly data and have no behavior associated
275  *      with them, so it doesn't really matter if/when they are destroyed. By
276  *      design, a component could hold a ref to a message forever without any
277  *      ill consequences (aside from consuming more memory).
278  *
279  *  \li stasis_message_type - Long life cycles, typically only destroyed on
280  *      module unloading or _clean_ process exit.
281  *
282  * \par Subscriber shutdown sequencing
283  *
284  * Subscribers are sensitive to shutdown sequencing, specifically in how the
285  * reference message types. This is fully detailed on the wiki at
286  * https://wiki.asterisk.org/wiki/x/K4BqAQ.
287  *
288  * In short, the lifetime of the \a data (and \a callback, if in a module) must
289  * be held until the stasis_subscription_final_message() has been received.
290  * Depending on the structure of the subscriber code, this can be handled by
291  * using stasis_subscription_final_message() to free resources on the final
292  * message, or using stasis_subscription_join()/stasis_unsubscribe_and_join() to
293  * block until the unsubscribe has completed.
294  */
295
296 /*! Initial size of the subscribers list. */
297 #define INITIAL_SUBSCRIBERS_MAX 4
298
299 /*! The number of buckets to use for topic pools */
300 #define TOPIC_POOL_BUCKETS 57
301
302 /*! Thread pool for topics that don't want a dedicated taskprocessor */
303 static struct ast_threadpool *pool;
304
305 STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type);
306
307 /*! \internal */
308 struct stasis_topic {
309         char *name;
310         /*! Variable length array of the subscribers */
311         AST_VECTOR(, struct stasis_subscription *) subscribers;
312
313         /*! Topics forwarding into this topic */
314         AST_VECTOR(, struct stasis_topic *) upstream_topics;
315 };
316
317 /* Forward declarations for the tightly-coupled subscription object */
318 static int topic_add_subscription(struct stasis_topic *topic,
319         struct stasis_subscription *sub);
320
321 static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
322
323 /*! \brief Lock two topics. */
324 #define topic_lock_both(topic1, topic2) \
325         do { \
326                 ao2_lock(topic1); \
327                 while (ao2_trylock(topic2)) { \
328                         AO2_DEADLOCK_AVOIDANCE(topic1); \
329                 } \
330         } while (0)
331
332 static void topic_dtor(void *obj)
333 {
334         struct stasis_topic *topic = obj;
335
336         /* Subscribers hold a reference to topics, so they should all be
337          * unsubscribed before we get here. */
338         ast_assert(AST_VECTOR_SIZE(&topic->subscribers) == 0);
339
340         ast_free(topic->name);
341         topic->name = NULL;
342
343         AST_VECTOR_FREE(&topic->subscribers);
344         AST_VECTOR_FREE(&topic->upstream_topics);
345 }
346
347 struct stasis_topic *stasis_topic_create(const char *name)
348 {
349         struct stasis_topic *topic;
350         int res = 0;
351
352         topic = ao2_t_alloc(sizeof(*topic), topic_dtor, name);
353         if (!topic) {
354                 return NULL;
355         }
356
357         topic->name = ast_strdup(name);
358         res |= AST_VECTOR_INIT(&topic->subscribers, INITIAL_SUBSCRIBERS_MAX);
359         res |= AST_VECTOR_INIT(&topic->upstream_topics, 0);
360         if (!topic->name || res) {
361                 ao2_cleanup(topic);
362                 return NULL;
363         }
364
365         return topic;
366 }
367
368 const char *stasis_topic_name(const struct stasis_topic *topic)
369 {
370         return topic->name;
371 }
372
373 size_t stasis_topic_subscribers(const struct stasis_topic *topic)
374 {
375         return AST_VECTOR_SIZE(&topic->subscribers);
376 }
377
378 /*! \internal */
379 struct stasis_subscription {
380         /*! Unique ID for this subscription */
381         char uniqueid[AST_UUID_STR_LEN];
382         /*! Topic subscribed to. */
383         struct stasis_topic *topic;
384         /*! Mailbox for processing incoming messages. */
385         struct ast_taskprocessor *mailbox;
386         /*! Callback function for incoming message processing. */
387         stasis_subscription_cb callback;
388         /*! Data pointer to be handed to the callback. */
389         void *data;
390
391         /*! Condition for joining with subscription. */
392         ast_cond_t join_cond;
393         /*! Flag set when final message for sub has been received.
394          *  Be sure join_lock is held before reading/setting. */
395         int final_message_rxed;
396         /*! Flag set when final message for sub has been processed.
397          *  Be sure join_lock is held before reading/setting. */
398         int final_message_processed;
399
400         /*! The message types this subscription is accepting */
401         AST_VECTOR(, char) accepted_message_types;
402         /*! The message filter currently in use */
403         enum stasis_subscription_message_filter filter;
404 };
405
406 static void subscription_dtor(void *obj)
407 {
408         struct stasis_subscription *sub = obj;
409
410         /* Subscriptions need to be manually unsubscribed before destruction
411          * b/c there's a cyclic reference between topics and subscriptions */
412         ast_assert(!stasis_subscription_is_subscribed(sub));
413         /* If there are any messages in flight to this subscription; that would
414          * be bad. */
415         ast_assert(stasis_subscription_is_done(sub));
416
417         ao2_cleanup(sub->topic);
418         sub->topic = NULL;
419         ast_taskprocessor_unreference(sub->mailbox);
420         sub->mailbox = NULL;
421         ast_cond_destroy(&sub->join_cond);
422
423         AST_VECTOR_FREE(&sub->accepted_message_types);
424 }
425
426 /*!
427  * \brief Invoke the subscription's callback.
428  * \param sub Subscription to invoke.
429  * \param topic Topic message was published to.
430  * \param message Message to send.
431  */
432 static void subscription_invoke(struct stasis_subscription *sub,
433                                   struct stasis_message *message)
434 {
435         unsigned int final = stasis_subscription_final_message(sub, message);
436         int message_type_id = stasis_message_type_id(stasis_subscription_change_type());
437
438         /* Notify that the final message has been received */
439         if (final) {
440                 ao2_lock(sub);
441                 sub->final_message_rxed = 1;
442                 ast_cond_signal(&sub->join_cond);
443                 ao2_unlock(sub);
444         }
445
446         if (!final || sub->filter != STASIS_SUBSCRIPTION_FILTER_SELECTIVE ||
447                 (message_type_id < AST_VECTOR_SIZE(&sub->accepted_message_types) && AST_VECTOR_GET(&sub->accepted_message_types, message_type_id))) {
448                 /* Since sub is mostly immutable, no need to lock sub */
449                 sub->callback(sub->data, sub, message);
450         }
451
452         /* Notify that the final message has been processed */
453         if (final) {
454                 ao2_lock(sub);
455                 sub->final_message_processed = 1;
456                 ast_cond_signal(&sub->join_cond);
457                 ao2_unlock(sub);
458         }
459 }
460
461 static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
462 static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
463
464 void stasis_subscription_cb_noop(void *data, struct stasis_subscription *sub, struct stasis_message *message)
465 {
466 }
467
468 struct stasis_subscription *internal_stasis_subscribe(
469         struct stasis_topic *topic,
470         stasis_subscription_cb callback,
471         void *data,
472         int needs_mailbox,
473         int use_thread_pool)
474 {
475         struct stasis_subscription *sub;
476
477         if (!topic) {
478                 return NULL;
479         }
480
481         /* The ao2 lock is used for join_cond. */
482         sub = ao2_t_alloc(sizeof(*sub), subscription_dtor, stasis_topic_name(topic));
483         if (!sub) {
484                 return NULL;
485         }
486         ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid));
487
488         if (needs_mailbox) {
489                 char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
490
491                 /* Create name with seq number appended. */
492                 ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "sub%c:%s",
493                         use_thread_pool ? 'p' : 'm',
494                         stasis_topic_name(topic));
495
496                 /*
497                  * With a small number of subscribers, a thread-per-sub is
498                  * acceptable. For a large number of subscribers, a thread
499                  * pool should be used.
500                  */
501                 if (use_thread_pool) {
502                         sub->mailbox = ast_threadpool_serializer(tps_name, pool);
503                 } else {
504                         sub->mailbox = ast_taskprocessor_get(tps_name, TPS_REF_DEFAULT);
505                 }
506                 if (!sub->mailbox) {
507                         ao2_ref(sub, -1);
508
509                         return NULL;
510                 }
511                 ast_taskprocessor_set_local(sub->mailbox, sub);
512                 /* Taskprocessor has a reference */
513                 ao2_ref(sub, +1);
514         }
515
516         ao2_ref(topic, +1);
517         sub->topic = topic;
518         sub->callback = callback;
519         sub->data = data;
520         ast_cond_init(&sub->join_cond, NULL);
521         sub->filter = STASIS_SUBSCRIPTION_FILTER_NONE;
522         AST_VECTOR_INIT(&sub->accepted_message_types, 0);
523
524         if (topic_add_subscription(topic, sub) != 0) {
525                 ao2_ref(sub, -1);
526
527                 return NULL;
528         }
529         send_subscription_subscribe(topic, sub);
530
531         return sub;
532 }
533
534 struct stasis_subscription *stasis_subscribe(
535         struct stasis_topic *topic,
536         stasis_subscription_cb callback,
537         void *data)
538 {
539         return internal_stasis_subscribe(topic, callback, data, 1, 0);
540 }
541
542 struct stasis_subscription *stasis_subscribe_pool(
543         struct stasis_topic *topic,
544         stasis_subscription_cb callback,
545         void *data)
546 {
547         return internal_stasis_subscribe(topic, callback, data, 1, 1);
548 }
549
550 static int sub_cleanup(void *data)
551 {
552         struct stasis_subscription *sub = data;
553         ao2_cleanup(sub);
554         return 0;
555 }
556
557 struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
558 {
559         /* The subscription may be the last ref to this topic. Hold
560          * the topic ref open until after the unlock. */
561         struct stasis_topic *topic;
562
563         if (!sub) {
564                 return NULL;
565         }
566
567         topic = ao2_bump(sub->topic);
568
569         /* We have to remove the subscription first, to ensure the unsubscribe
570          * is the final message */
571         if (topic_remove_subscription(sub->topic, sub) != 0) {
572                 ast_log(LOG_ERROR,
573                         "Internal error: subscription has invalid topic\n");
574                 ao2_cleanup(topic);
575
576                 return NULL;
577         }
578
579         /* Now let everyone know about the unsubscribe */
580         send_subscription_unsubscribe(topic, sub);
581
582         /* When all that's done, remove the ref the mailbox has on the sub */
583         if (sub->mailbox) {
584                 if (ast_taskprocessor_push(sub->mailbox, sub_cleanup, sub)) {
585                         /* Nothing we can do here, the conditional is just to keep
586                          * the compiler happy that we're not ignoring the result. */
587                 }
588         }
589
590         /* Unsubscribing unrefs the subscription */
591         ao2_cleanup(sub);
592         ao2_cleanup(topic);
593
594         return NULL;
595 }
596
597 int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscription,
598         long low_water, long high_water)
599 {
600         int res = -1;
601
602         if (subscription) {
603                 res = ast_taskprocessor_alert_set_levels(subscription->mailbox,
604                         low_water, high_water);
605         }
606         return res;
607 }
608
609 int stasis_subscription_accept_message_type(struct stasis_subscription *subscription,
610         const struct stasis_message_type *type)
611 {
612         if (!subscription) {
613                 return -1;
614         }
615
616         ast_assert(type != NULL);
617         ast_assert(stasis_message_type_name(type) != NULL);
618
619         if (!type || !stasis_message_type_name(type)) {
620                 /* Filtering is unreliable as this message type is not yet initialized
621                  * so force all messages through.
622                  */
623                 subscription->filter = STASIS_SUBSCRIPTION_FILTER_FORCED_NONE;
624                 return 0;
625         }
626
627         ao2_lock(subscription->topic);
628         if (AST_VECTOR_REPLACE(&subscription->accepted_message_types, stasis_message_type_id(type), 1)) {
629                 /* We do this for the same reason as above. The subscription can still operate, so allow
630                  * it to do so by forcing all messages through.
631                  */
632                 subscription->filter = STASIS_SUBSCRIPTION_FILTER_FORCED_NONE;
633         }
634         ao2_unlock(subscription->topic);
635
636         return 0;
637 }
638
639 int stasis_subscription_decline_message_type(struct stasis_subscription *subscription,
640         const struct stasis_message_type *type)
641 {
642         if (!subscription) {
643                 return -1;
644         }
645
646         ast_assert(type != NULL);
647         ast_assert(stasis_message_type_name(type) != NULL);
648
649         if (!type || !stasis_message_type_name(type)) {
650                 return 0;
651         }
652
653         ao2_lock(subscription->topic);
654         if (stasis_message_type_id(type) < AST_VECTOR_SIZE(&subscription->accepted_message_types)) {
655                 /* The memory is already allocated so this can't fail */
656                 AST_VECTOR_REPLACE(&subscription->accepted_message_types, stasis_message_type_id(type), 0);
657         }
658         ao2_unlock(subscription->topic);
659
660         return 0;
661 }
662
663 int stasis_subscription_set_filter(struct stasis_subscription *subscription,
664         enum stasis_subscription_message_filter filter)
665 {
666         if (!subscription) {
667                 return -1;
668         }
669
670         ao2_lock(subscription->topic);
671         if (subscription->filter != STASIS_SUBSCRIPTION_FILTER_FORCED_NONE) {
672                 subscription->filter = filter;
673         }
674         ao2_unlock(subscription->topic);
675
676         return 0;
677 }
678
679 void stasis_subscription_join(struct stasis_subscription *subscription)
680 {
681         if (subscription) {
682                 ao2_lock(subscription);
683                 /* Wait until the processed flag has been set */
684                 while (!subscription->final_message_processed) {
685                         ast_cond_wait(&subscription->join_cond,
686                                 ao2_object_get_lockaddr(subscription));
687                 }
688                 ao2_unlock(subscription);
689         }
690 }
691
692 int stasis_subscription_is_done(struct stasis_subscription *subscription)
693 {
694         if (subscription) {
695                 int ret;
696
697                 ao2_lock(subscription);
698                 ret = subscription->final_message_rxed;
699                 ao2_unlock(subscription);
700
701                 return ret;
702         }
703
704         /* Null subscription is about as done as you can get */
705         return 1;
706 }
707
708 struct stasis_subscription *stasis_unsubscribe_and_join(
709         struct stasis_subscription *subscription)
710 {
711         if (!subscription) {
712                 return NULL;
713         }
714
715         /* Bump refcount to hold it past the unsubscribe */
716         ao2_ref(subscription, +1);
717         stasis_unsubscribe(subscription);
718         stasis_subscription_join(subscription);
719         /* Now decrement the refcount back */
720         ao2_cleanup(subscription);
721         return NULL;
722 }
723
724 int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
725 {
726         if (sub) {
727                 size_t i;
728                 struct stasis_topic *topic = sub->topic;
729
730                 ao2_lock(topic);
731                 for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
732                         if (AST_VECTOR_GET(&topic->subscribers, i) == sub) {
733                                 ao2_unlock(topic);
734                                 return 1;
735                         }
736                 }
737                 ao2_unlock(topic);
738         }
739
740         return 0;
741 }
742
743 const char *stasis_subscription_uniqueid(const struct stasis_subscription *sub)
744 {
745         return sub->uniqueid;
746 }
747
748 int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
749 {
750         struct stasis_subscription_change *change;
751
752         if (stasis_message_type(msg) != stasis_subscription_change_type()) {
753                 return 0;
754         }
755
756         change = stasis_message_data(msg);
757         if (strcmp("Unsubscribe", change->description)) {
758                 return 0;
759         }
760
761         if (strcmp(stasis_subscription_uniqueid(sub), change->uniqueid)) {
762                 return 0;
763         }
764
765         return 1;
766 }
767
768 /*!
769  * \brief Add a subscriber to a topic.
770  * \param topic Topic
771  * \param sub Subscriber
772  * \return 0 on success
773  * \return Non-zero on error
774  */
775 static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
776 {
777         size_t idx;
778
779         ao2_lock(topic);
780         /* The reference from the topic to the subscription is shared with
781          * the owner of the subscription, which will explicitly unsubscribe
782          * to release it.
783          *
784          * If we bumped the refcount here, the owner would have to unsubscribe
785          * and cleanup, which is a bit awkward. */
786         AST_VECTOR_APPEND(&topic->subscribers, sub);
787
788         for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
789                 topic_add_subscription(
790                         AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
791         }
792         ao2_unlock(topic);
793
794         return 0;
795 }
796
797 static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
798 {
799         size_t idx;
800         int res;
801
802         ao2_lock(topic);
803         for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
804                 topic_remove_subscription(
805                         AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
806         }
807         res = AST_VECTOR_REMOVE_ELEM_UNORDERED(&topic->subscribers, sub,
808                 AST_VECTOR_ELEM_CLEANUP_NOOP);
809         ao2_unlock(topic);
810
811         return res;
812 }
813
814 /*!
815  * \internal \brief Dispatch a message to a subscriber asynchronously
816  * \param local \ref ast_taskprocessor_local object
817  * \return 0
818  */
819 static int dispatch_exec_async(struct ast_taskprocessor_local *local)
820 {
821         struct stasis_subscription *sub = local->local_data;
822         struct stasis_message *message = local->data;
823
824         subscription_invoke(sub, message);
825         ao2_cleanup(message);
826
827         return 0;
828 }
829
830 /*!
831  * \internal \brief Data passed to \ref dispatch_exec_sync to synchronize
832  * a published message to a subscriber
833  */
834 struct sync_task_data {
835         ast_mutex_t lock;
836         ast_cond_t cond;
837         int complete;
838         void *task_data;
839 };
840
841 /*!
842  * \internal \brief Dispatch a message to a subscriber synchronously
843  * \param local \ref ast_taskprocessor_local object
844  * \return 0
845  */
846 static int dispatch_exec_sync(struct ast_taskprocessor_local *local)
847 {
848         struct stasis_subscription *sub = local->local_data;
849         struct sync_task_data *std = local->data;
850         struct stasis_message *message = std->task_data;
851
852         subscription_invoke(sub, message);
853         ao2_cleanup(message);
854
855         ast_mutex_lock(&std->lock);
856         std->complete = 1;
857         ast_cond_signal(&std->cond);
858         ast_mutex_unlock(&std->lock);
859
860         return 0;
861 }
862
863 /*!
864  * \internal \brief Dispatch a message to a subscriber
865  * \param sub The subscriber to dispatch to
866  * \param message The message to send
867  * \param synchronous If non-zero, synchronize on the subscriber receiving
868  * the message
869  */
870 static void dispatch_message(struct stasis_subscription *sub,
871         struct stasis_message *message,
872         int synchronous)
873 {
874         /* Determine if this subscription is interested in this message. Note that final
875          * messages are special and are always invoked on the subscription.
876          */
877         if (sub->filter == STASIS_SUBSCRIPTION_FILTER_SELECTIVE) {
878                 int message_type_id = stasis_message_type_id(stasis_message_type(message));
879                 if ((message_type_id >= AST_VECTOR_SIZE(&sub->accepted_message_types) ||
880                         !AST_VECTOR_GET(&sub->accepted_message_types, message_type_id)) &&
881                         !stasis_subscription_final_message(sub, message)) {
882                         return;
883                 }
884         }
885
886         if (!sub->mailbox) {
887                 /* Dispatch directly */
888                 subscription_invoke(sub, message);
889                 return;
890         }
891
892         /* Bump the message for the taskprocessor push. This will get de-ref'd
893          * by the task processor callback.
894          */
895         ao2_bump(message);
896         if (!synchronous) {
897                 if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec_async, message)) {
898                         /* Push failed; ugh. */
899                         ast_log(LOG_ERROR, "Dropping async dispatch\n");
900                         ao2_cleanup(message);
901                 }
902         } else {
903                 struct sync_task_data std;
904
905                 ast_mutex_init(&std.lock);
906                 ast_cond_init(&std.cond, NULL);
907                 std.complete = 0;
908                 std.task_data = message;
909
910                 if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec_sync, &std)) {
911                         /* Push failed; ugh. */
912                         ast_log(LOG_ERROR, "Dropping sync dispatch\n");
913                         ao2_cleanup(message);
914                         ast_mutex_destroy(&std.lock);
915                         ast_cond_destroy(&std.cond);
916                         return;
917                 }
918
919                 ast_mutex_lock(&std.lock);
920                 while (!std.complete) {
921                         ast_cond_wait(&std.cond, &std.lock);
922                 }
923                 ast_mutex_unlock(&std.lock);
924
925                 ast_mutex_destroy(&std.lock);
926                 ast_cond_destroy(&std.cond);
927         }
928 }
929
930 /*!
931  * \internal \brief Publish a message to a topic's subscribers
932  * \brief topic The topic to publish to
933  * \brief message The message to publish
934  * \brief sync_sub An optional subscriber of the topic to publish synchronously
935  * to
936  */
937 static void publish_msg(struct stasis_topic *topic,
938         struct stasis_message *message, struct stasis_subscription *sync_sub)
939 {
940         size_t i;
941
942         ast_assert(topic != NULL);
943         ast_assert(message != NULL);
944
945         /* If there are no subscribers don't bother */
946         if (!stasis_topic_subscribers(topic)) {
947                 return;
948         }
949
950         /*
951          * The topic may be unref'ed by the subscription invocation.
952          * Make sure we hold onto a reference while dispatching.
953          */
954         ao2_ref(topic, +1);
955         ao2_lock(topic);
956         for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
957                 struct stasis_subscription *sub = AST_VECTOR_GET(&topic->subscribers, i);
958
959                 ast_assert(sub != NULL);
960
961                 dispatch_message(sub, message, (sub == sync_sub));
962         }
963         ao2_unlock(topic);
964         ao2_ref(topic, -1);
965 }
966
967 void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
968 {
969         publish_msg(topic, message, NULL);
970 }
971
972 void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message)
973 {
974         ast_assert(sub != NULL);
975
976         publish_msg(sub->topic, message, sub);
977 }
978
979 /*!
980  * \brief Forwarding information
981  *
982  * Any message posted to \a from_topic is forwarded to \a to_topic.
983  *
984  * In cases where both the \a from_topic and \a to_topic need to be locked,
985  * always lock the \a to_topic first, then the \a from_topic. Lest you deadlock.
986  */
987 struct stasis_forward {
988         /*! Originating topic */
989         struct stasis_topic *from_topic;
990         /*! Destination topic */
991         struct stasis_topic *to_topic;
992 };
993
994 static void forward_dtor(void *obj)
995 {
996         struct stasis_forward *forward = obj;
997
998         ao2_cleanup(forward->from_topic);
999         forward->from_topic = NULL;
1000         ao2_cleanup(forward->to_topic);
1001         forward->to_topic = NULL;
1002 }
1003
1004 struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward)
1005 {
1006         int idx;
1007         struct stasis_topic *from;
1008         struct stasis_topic *to;
1009
1010         if (!forward) {
1011                 return NULL;
1012         }
1013
1014         from = forward->from_topic;
1015         to = forward->to_topic;
1016
1017         if (from && to) {
1018                 topic_lock_both(to, from);
1019                 AST_VECTOR_REMOVE_ELEM_UNORDERED(&to->upstream_topics, from,
1020                         AST_VECTOR_ELEM_CLEANUP_NOOP);
1021
1022                 for (idx = 0; idx < AST_VECTOR_SIZE(&to->subscribers); ++idx) {
1023                         topic_remove_subscription(from, AST_VECTOR_GET(&to->subscribers, idx));
1024                 }
1025                 ao2_unlock(from);
1026                 ao2_unlock(to);
1027         }
1028
1029         ao2_cleanup(forward);
1030
1031         return NULL;
1032 }
1033
1034 struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic,
1035         struct stasis_topic *to_topic)
1036 {
1037         int res;
1038         size_t idx;
1039         struct stasis_forward *forward;
1040
1041         if (!from_topic || !to_topic) {
1042                 return NULL;
1043         }
1044
1045         forward = ao2_alloc_options(sizeof(*forward), forward_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
1046         if (!forward) {
1047                 return NULL;
1048         }
1049
1050         /* Forwards to ourselves are implicit. */
1051         if (to_topic == from_topic) {
1052                 return forward;
1053         }
1054
1055         forward->from_topic = ao2_bump(from_topic);
1056         forward->to_topic = ao2_bump(to_topic);
1057
1058         topic_lock_both(to_topic, from_topic);
1059         res = AST_VECTOR_APPEND(&to_topic->upstream_topics, from_topic);
1060         if (res != 0) {
1061                 ao2_unlock(from_topic);
1062                 ao2_unlock(to_topic);
1063                 ao2_ref(forward, -1);
1064                 return NULL;
1065         }
1066
1067         for (idx = 0; idx < AST_VECTOR_SIZE(&to_topic->subscribers); ++idx) {
1068                 topic_add_subscription(from_topic, AST_VECTOR_GET(&to_topic->subscribers, idx));
1069         }
1070         ao2_unlock(from_topic);
1071         ao2_unlock(to_topic);
1072
1073         return forward;
1074 }
1075
1076 static void subscription_change_dtor(void *obj)
1077 {
1078         struct stasis_subscription_change *change = obj;
1079
1080         ast_string_field_free_memory(change);
1081         ao2_cleanup(change->topic);
1082 }
1083
1084 static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, const char *uniqueid, const char *description)
1085 {
1086         struct stasis_subscription_change *change;
1087
1088         change = ao2_alloc(sizeof(struct stasis_subscription_change), subscription_change_dtor);
1089         if (!change || ast_string_field_init(change, 128)) {
1090                 ao2_cleanup(change);
1091                 return NULL;
1092         }
1093
1094         ast_string_field_set(change, uniqueid, uniqueid);
1095         ast_string_field_set(change, description, description);
1096         ao2_ref(topic, +1);
1097         change->topic = topic;
1098
1099         return change;
1100 }
1101
1102 static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
1103 {
1104         struct stasis_subscription_change *change;
1105         struct stasis_message *msg;
1106
1107         /* This assumes that we have already unsubscribed */
1108         ast_assert(stasis_subscription_is_subscribed(sub));
1109
1110         if (!stasis_subscription_change_type()) {
1111                 return;
1112         }
1113
1114         change = subscription_change_alloc(topic, sub->uniqueid, "Subscribe");
1115         if (!change) {
1116                 return;
1117         }
1118
1119         msg = stasis_message_create(stasis_subscription_change_type(), change);
1120         if (!msg) {
1121                 ao2_cleanup(change);
1122                 return;
1123         }
1124
1125         stasis_publish(topic, msg);
1126         ao2_cleanup(msg);
1127         ao2_cleanup(change);
1128 }
1129
1130 static void send_subscription_unsubscribe(struct stasis_topic *topic,
1131         struct stasis_subscription *sub)
1132 {
1133         struct stasis_subscription_change *change;
1134         struct stasis_message *msg;
1135
1136         /* This assumes that we have already unsubscribed */
1137         ast_assert(!stasis_subscription_is_subscribed(sub));
1138
1139         if (!stasis_subscription_change_type()) {
1140                 return;
1141         }
1142
1143         change = subscription_change_alloc(topic, sub->uniqueid, "Unsubscribe");
1144         if (!change) {
1145                 return;
1146         }
1147
1148         msg = stasis_message_create(stasis_subscription_change_type(), change);
1149         if (!msg) {
1150                 ao2_cleanup(change);
1151                 return;
1152         }
1153
1154         stasis_publish(topic, msg);
1155
1156         /* Now we have to dispatch to the subscription itself */
1157         dispatch_message(sub, msg, 0);
1158
1159         ao2_cleanup(msg);
1160         ao2_cleanup(change);
1161 }
1162
1163 struct topic_pool_entry {
1164         struct stasis_forward *forward;
1165         struct stasis_topic *topic;
1166 };
1167
1168 static void topic_pool_entry_dtor(void *obj)
1169 {
1170         struct topic_pool_entry *entry = obj;
1171
1172         entry->forward = stasis_forward_cancel(entry->forward);
1173         ao2_cleanup(entry->topic);
1174         entry->topic = NULL;
1175 }
1176
1177 static struct topic_pool_entry *topic_pool_entry_alloc(void)
1178 {
1179         return ao2_alloc_options(sizeof(struct topic_pool_entry), topic_pool_entry_dtor,
1180                 AO2_ALLOC_OPT_LOCK_NOLOCK);
1181 }
1182
1183 struct stasis_topic_pool {
1184         struct ao2_container *pool_container;
1185         struct stasis_topic *pool_topic;
1186 };
1187
1188 static void topic_pool_dtor(void *obj)
1189 {
1190         struct stasis_topic_pool *pool = obj;
1191
1192 #ifdef AO2_DEBUG
1193         {
1194                 char *container_name =
1195                         ast_alloca(strlen(stasis_topic_name(pool->pool_topic)) + strlen("-pool") + 1);
1196                 sprintf(container_name, "%s-pool", stasis_topic_name(pool->pool_topic));
1197                 ao2_container_unregister(container_name);
1198         }
1199 #endif
1200
1201         ao2_cleanup(pool->pool_container);
1202         pool->pool_container = NULL;
1203         ao2_cleanup(pool->pool_topic);
1204         pool->pool_topic = NULL;
1205 }
1206
1207 static int topic_pool_entry_hash(const void *obj, const int flags)
1208 {
1209         const struct topic_pool_entry *object;
1210         const char *key;
1211
1212         switch (flags & OBJ_SEARCH_MASK) {
1213         case OBJ_SEARCH_KEY:
1214                 key = obj;
1215                 break;
1216         case OBJ_SEARCH_OBJECT:
1217                 object = obj;
1218                 key = stasis_topic_name(object->topic);
1219                 break;
1220         default:
1221                 /* Hash can only work on something with a full key. */
1222                 ast_assert(0);
1223                 return 0;
1224         }
1225         return ast_str_case_hash(key);
1226 }
1227
1228 static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
1229 {
1230         const struct topic_pool_entry *object_left = obj;
1231         const struct topic_pool_entry *object_right = arg;
1232         const char *right_key = arg;
1233         int cmp;
1234
1235         switch (flags & OBJ_SEARCH_MASK) {
1236         case OBJ_SEARCH_OBJECT:
1237                 right_key = stasis_topic_name(object_right->topic);
1238                 /* Fall through */
1239         case OBJ_SEARCH_KEY:
1240                 cmp = strcasecmp(stasis_topic_name(object_left->topic), right_key);
1241                 break;
1242         case OBJ_SEARCH_PARTIAL_KEY:
1243                 /* Not supported by container */
1244                 ast_assert(0);
1245                 cmp = -1;
1246                 break;
1247         default:
1248                 /*
1249                  * What arg points to is specific to this traversal callback
1250                  * and has no special meaning to astobj2.
1251                  */
1252                 cmp = 0;
1253                 break;
1254         }
1255         if (cmp) {
1256                 return 0;
1257         }
1258         /*
1259          * At this point the traversal callback is identical to a sorted
1260          * container.
1261          */
1262         return CMP_MATCH;
1263 }
1264
1265 #ifdef AO2_DEBUG
1266 static void topic_pool_prnt_obj(void *v_obj, void *where, ao2_prnt_fn *prnt)
1267 {
1268         struct topic_pool_entry *entry = v_obj;
1269
1270         if (!entry) {
1271                 return;
1272         }
1273         prnt(where, "%s", stasis_topic_name(entry->topic));
1274 }
1275 #endif
1276
1277 struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic)
1278 {
1279         struct stasis_topic_pool *pool;
1280
1281         pool = ao2_alloc_options(sizeof(*pool), topic_pool_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
1282         if (!pool) {
1283                 return NULL;
1284         }
1285
1286         pool->pool_container = ao2_container_alloc(TOPIC_POOL_BUCKETS,
1287                 topic_pool_entry_hash, topic_pool_entry_cmp);
1288         if (!pool->pool_container) {
1289                 ao2_cleanup(pool);
1290                 return NULL;
1291         }
1292
1293 #ifdef AO2_DEBUG
1294         {
1295                 char *container_name =
1296                         ast_alloca(strlen(stasis_topic_name(pooled_topic)) + strlen("-pool") + 1);
1297                 sprintf(container_name, "%s-pool", stasis_topic_name(pooled_topic));
1298                 ao2_container_register(container_name, pool->pool_container, topic_pool_prnt_obj);
1299         }
1300 #endif
1301
1302         ao2_ref(pooled_topic, +1);
1303         pool->pool_topic = pooled_topic;
1304
1305         return pool;
1306 }
1307
1308 void stasis_topic_pool_delete_topic(struct stasis_topic_pool *pool, const char *topic_name)
1309 {
1310         ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY | OBJ_NODATA | OBJ_UNLINK);
1311 }
1312
1313 struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
1314 {
1315         RAII_VAR(struct topic_pool_entry *, topic_pool_entry, NULL, ao2_cleanup);
1316         SCOPED_AO2LOCK(topic_container_lock, pool->pool_container);
1317
1318         topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1319         if (topic_pool_entry) {
1320                 return topic_pool_entry->topic;
1321         }
1322
1323         topic_pool_entry = topic_pool_entry_alloc();
1324         if (!topic_pool_entry) {
1325                 return NULL;
1326         }
1327
1328         topic_pool_entry->topic = stasis_topic_create(topic_name);
1329         if (!topic_pool_entry->topic) {
1330                 return NULL;
1331         }
1332
1333         topic_pool_entry->forward = stasis_forward_all(topic_pool_entry->topic, pool->pool_topic);
1334         if (!topic_pool_entry->forward) {
1335                 return NULL;
1336         }
1337
1338         if (!ao2_link_flags(pool->pool_container, topic_pool_entry, OBJ_NOLOCK)) {
1339                 return NULL;
1340         }
1341
1342         return topic_pool_entry->topic;
1343 }
1344
1345 int stasis_topic_pool_topic_exists(const struct stasis_topic_pool *pool, const char *topic_name)
1346 {
1347         struct topic_pool_entry *topic_pool_entry;
1348
1349         topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY);
1350         if (!topic_pool_entry) {
1351                 return 0;
1352         }
1353
1354         ao2_ref(topic_pool_entry, -1);
1355         return 1;
1356 }
1357
1358 void stasis_log_bad_type_access(const char *name)
1359 {
1360 #ifdef AST_DEVMODE
1361         if (!stasis_message_type_declined(name)) {
1362                 ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name);
1363         }
1364 #endif
1365 }
1366
1367 /*! \brief A multi object blob data structure to carry user event stasis messages */
1368 struct ast_multi_object_blob {
1369         struct ast_json *blob;                             /*< A blob of JSON data */
1370         AST_VECTOR(, void *) snapshots[STASIS_UMOS_MAX];   /*< Vector of snapshots for each type */
1371 };
1372
1373 /*!
1374  * \internal
1375  * \brief Destructor for \ref ast_multi_object_blob objects
1376  */
1377 static void multi_object_blob_dtor(void *obj)
1378 {
1379         struct ast_multi_object_blob *multi = obj;
1380         int type;
1381         int i;
1382
1383         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1384                 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
1385                         ao2_cleanup(AST_VECTOR_GET(&multi->snapshots[type], i));
1386                 }
1387                 AST_VECTOR_FREE(&multi->snapshots[type]);
1388         }
1389         ast_json_unref(multi->blob);
1390 }
1391
1392 /*! \brief Create a stasis user event multi object blob */
1393 struct ast_multi_object_blob *ast_multi_object_blob_create(struct ast_json *blob)
1394 {
1395         int type;
1396         struct ast_multi_object_blob *multi;
1397
1398         ast_assert(blob != NULL);
1399
1400         multi = ao2_alloc(sizeof(*multi), multi_object_blob_dtor);
1401         if (!multi) {
1402                 return NULL;
1403         }
1404
1405         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1406                 if (AST_VECTOR_INIT(&multi->snapshots[type], 0)) {
1407                         ao2_ref(multi, -1);
1408
1409                         return NULL;
1410                 }
1411         }
1412
1413         multi->blob = ast_json_ref(blob);
1414
1415         return multi;
1416 }
1417
1418 /*! \brief Add an object (snapshot) to the blob */
1419 void ast_multi_object_blob_add(struct ast_multi_object_blob *multi,
1420         enum stasis_user_multi_object_snapshot_type type, void *object)
1421 {
1422         if (!multi || !object || AST_VECTOR_APPEND(&multi->snapshots[type], object)) {
1423                 ao2_cleanup(object);
1424         }
1425 }
1426
1427 /*! \brief Publish single channel user event (for app_userevent compatibility) */
1428 void ast_multi_object_blob_single_channel_publish(struct ast_channel *chan,
1429         struct stasis_message_type *type, struct ast_json *blob)
1430 {
1431         struct stasis_message *message;
1432         struct ast_channel_snapshot *channel_snapshot;
1433         struct ast_multi_object_blob *multi;
1434
1435         if (!type) {
1436                 return;
1437         }
1438
1439         multi = ast_multi_object_blob_create(blob);
1440         if (!multi) {
1441                 return;
1442         }
1443
1444         channel_snapshot = ast_channel_snapshot_create(chan);
1445         if (!channel_snapshot) {
1446                 ao2_ref(multi, -1);
1447                 return;
1448         }
1449
1450         /* this call steals the channel_snapshot reference */
1451         ast_multi_object_blob_add(multi, STASIS_UMOS_CHANNEL, channel_snapshot);
1452
1453         message = stasis_message_create(type, multi);
1454         ao2_ref(multi, -1);
1455         if (message) {
1456                 /* app_userevent still publishes to channel */
1457                 stasis_publish(ast_channel_topic(chan), message);
1458                 ao2_ref(message, -1);
1459         }
1460 }
1461
1462 /*! \internal \brief convert multi object blob to ari json */
1463 static struct ast_json *multi_user_event_to_json(
1464         struct stasis_message *message,
1465         const struct stasis_message_sanitizer *sanitize)
1466 {
1467         struct ast_json *out;
1468         struct ast_multi_object_blob *multi = stasis_message_data(message);
1469         struct ast_json *blob = multi->blob;
1470         const struct timeval *tv = stasis_message_timestamp(message);
1471         enum stasis_user_multi_object_snapshot_type type;
1472         int i;
1473
1474         out = ast_json_object_create();
1475         if (!out) {
1476                 return NULL;
1477         }
1478
1479         ast_json_object_set(out, "type", ast_json_string_create("ChannelUserevent"));
1480         ast_json_object_set(out, "timestamp", ast_json_timeval(*tv, NULL));
1481         ast_json_object_set(out, "eventname", ast_json_ref(ast_json_object_get(blob, "eventname")));
1482         ast_json_object_set(out, "userevent", ast_json_ref(blob));
1483
1484         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1485                 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
1486                         struct ast_json *json_object = NULL;
1487                         char *name = NULL;
1488                         void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
1489
1490                         switch (type) {
1491                         case STASIS_UMOS_CHANNEL:
1492                                 json_object = ast_channel_snapshot_to_json(snapshot, sanitize);
1493                                 name = "channel";
1494                                 break;
1495                         case STASIS_UMOS_BRIDGE:
1496                                 json_object = ast_bridge_snapshot_to_json(snapshot, sanitize);
1497                                 name = "bridge";
1498                                 break;
1499                         case STASIS_UMOS_ENDPOINT:
1500                                 json_object = ast_endpoint_snapshot_to_json(snapshot, sanitize);
1501                                 name = "endpoint";
1502                                 break;
1503                         }
1504                         if (json_object) {
1505                                 ast_json_object_set(out, name, json_object);
1506                         }
1507                 }
1508         }
1509
1510         return out;
1511 }
1512
1513 /*! \internal \brief convert multi object blob to ami string */
1514 static struct ast_str *multi_object_blob_to_ami(void *obj)
1515 {
1516         struct ast_str *ami_str=ast_str_create(1024);
1517         struct ast_str *ami_snapshot;
1518         const struct ast_multi_object_blob *multi = obj;
1519         enum stasis_user_multi_object_snapshot_type type;
1520         int i;
1521
1522         if (!ami_str) {
1523                 return NULL;
1524         }
1525         if (!multi) {
1526                 ast_free(ami_str);
1527                 return NULL;
1528         }
1529
1530         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1531                 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
1532                         char *name = NULL;
1533                         void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
1534                         ami_snapshot = NULL;
1535
1536                         if (i > 0) {
1537                                 ast_asprintf(&name, "%d", i + 1);
1538                         }
1539
1540                         switch (type) {
1541                         case STASIS_UMOS_CHANNEL:
1542                                 ami_snapshot = ast_manager_build_channel_state_string_prefix(snapshot, name ?: "");
1543                                 break;
1544
1545                         case STASIS_UMOS_BRIDGE:
1546                                 ami_snapshot = ast_manager_build_bridge_state_string_prefix(snapshot, name ?: "");
1547                                 break;
1548
1549                         case STASIS_UMOS_ENDPOINT:
1550                                 /* currently not sending endpoint snapshots to AMI */
1551                                 break;
1552                         }
1553                         if (ami_snapshot) {
1554                                 ast_str_append(&ami_str, 0, "%s", ast_str_buffer(ami_snapshot));
1555                                 ast_free(ami_snapshot);
1556                         }
1557                         ast_free(name);
1558                 }
1559         }
1560
1561         return ami_str;
1562 }
1563
1564 /*! \internal \brief Callback to pass only user defined parameters from blob */
1565 static int userevent_exclusion_cb(const char *key)
1566 {
1567         if (!strcmp("eventname", key)) {
1568                 return 1;
1569         }
1570         return 0;
1571 }
1572
1573 static struct ast_manager_event_blob *multi_user_event_to_ami(
1574         struct stasis_message *message)
1575 {
1576         RAII_VAR(struct ast_str *, object_string, NULL, ast_free);
1577         RAII_VAR(struct ast_str *, body, NULL, ast_free);
1578         struct ast_multi_object_blob *multi = stasis_message_data(message);
1579         const char *eventname;
1580
1581         eventname = ast_json_string_get(ast_json_object_get(multi->blob, "eventname"));
1582         body = ast_manager_str_from_json_object(multi->blob, userevent_exclusion_cb);
1583         object_string = multi_object_blob_to_ami(multi);
1584         if (!object_string || !body) {
1585                 return NULL;
1586         }
1587
1588         return ast_manager_event_blob_create(EVENT_FLAG_USER, "UserEvent",
1589                 "%s"
1590                 "UserEvent: %s\r\n"
1591                 "%s",
1592                 ast_str_buffer(object_string),
1593                 eventname,
1594                 ast_str_buffer(body));
1595 }
1596
1597 /*! \brief A structure to hold global configuration-related options */
1598 struct stasis_declined_config {
1599         /*! The list of message types to decline */
1600         struct ao2_container *declined;
1601 };
1602
1603 /*! \brief Threadpool configuration options */
1604 struct stasis_threadpool_conf {
1605         /*! Initial size of the thread pool */
1606         int initial_size;
1607         /*! Time, in seconds, before we expire a thread */
1608         int idle_timeout_sec;
1609         /*! Maximum number of thread to allow */
1610         int max_size;
1611 };
1612
1613 struct stasis_config {
1614         /*! Thread pool configuration options */
1615         struct stasis_threadpool_conf *threadpool_options;
1616         /*! Declined message types */
1617         struct stasis_declined_config *declined_message_types;
1618 };
1619
1620 static struct aco_type threadpool_option = {
1621         .type = ACO_GLOBAL,
1622         .name = "threadpool",
1623         .item_offset = offsetof(struct stasis_config, threadpool_options),
1624         .category = "threadpool",
1625         .category_match = ACO_WHITELIST_EXACT,
1626 };
1627
1628 static struct aco_type *threadpool_options[] = ACO_TYPES(&threadpool_option);
1629
1630 /*! \brief An aco_type structure to link the "declined_message_types" category to the stasis_declined_config type */
1631 static struct aco_type declined_option = {
1632         .type = ACO_GLOBAL,
1633         .name = "declined_message_types",
1634         .item_offset = offsetof(struct stasis_config, declined_message_types),
1635         .category_match = ACO_WHITELIST_EXACT,
1636         .category = "declined_message_types",
1637 };
1638
1639 struct aco_type *declined_options[] = ACO_TYPES(&declined_option);
1640
1641 struct aco_file stasis_conf = {
1642         .filename = "stasis.conf",
1643         .types = ACO_TYPES(&declined_option, &threadpool_option),
1644 };
1645
1646 /*! \brief A global object container that will contain the stasis_config that gets swapped out on reloads */
1647 static AO2_GLOBAL_OBJ_STATIC(globals);
1648
1649 static void *stasis_config_alloc(void);
1650
1651 /*! \brief Register information about the configs being processed by this module */
1652 CONFIG_INFO_CORE("stasis", cfg_info, globals, stasis_config_alloc,
1653         .files = ACO_FILES(&stasis_conf),
1654 );
1655
1656 static void stasis_declined_config_destructor(void *obj)
1657 {
1658         struct stasis_declined_config *declined = obj;
1659
1660         ao2_cleanup(declined->declined);
1661 }
1662
1663 static void stasis_config_destructor(void *obj)
1664 {
1665         struct stasis_config *cfg = obj;
1666
1667         ao2_cleanup(cfg->declined_message_types);
1668         ast_free(cfg->threadpool_options);
1669 }
1670
1671 static void *stasis_config_alloc(void)
1672 {
1673         struct stasis_config *cfg;
1674
1675         if (!(cfg = ao2_alloc(sizeof(*cfg), stasis_config_destructor))) {
1676                 return NULL;
1677         }
1678
1679         cfg->threadpool_options = ast_calloc(1, sizeof(*cfg->threadpool_options));
1680         if (!cfg->threadpool_options) {
1681                 ao2_ref(cfg, -1);
1682                 return NULL;
1683         }
1684
1685         cfg->declined_message_types = ao2_alloc(sizeof(*cfg->declined_message_types),
1686                 stasis_declined_config_destructor);
1687         if (!cfg->declined_message_types) {
1688                 ao2_ref(cfg, -1);
1689                 return NULL;
1690         }
1691
1692         cfg->declined_message_types->declined = ast_str_container_alloc(13);
1693         if (!cfg->declined_message_types->declined) {
1694                 ao2_ref(cfg, -1);
1695                 return NULL;
1696         }
1697
1698         return cfg;
1699 }
1700
1701 int stasis_message_type_declined(const char *name)
1702 {
1703         struct stasis_config *cfg = ao2_global_obj_ref(globals);
1704         char *name_in_declined;
1705         int res;
1706
1707         if (!cfg || !cfg->declined_message_types) {
1708                 ao2_cleanup(cfg);
1709                 return 0;
1710         }
1711
1712         name_in_declined = ao2_find(cfg->declined_message_types->declined, name, OBJ_SEARCH_KEY);
1713         res = name_in_declined ? 1 : 0;
1714         ao2_cleanup(name_in_declined);
1715         ao2_ref(cfg, -1);
1716         if (res) {
1717                 ast_log(LOG_NOTICE, "Declining to allocate Stasis message type '%s' due to configuration\n", name);
1718         }
1719         return res;
1720 }
1721
1722 static int declined_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
1723 {
1724         struct stasis_declined_config *declined = obj;
1725
1726         if (ast_strlen_zero(var->value)) {
1727                 return 0;
1728         }
1729
1730         if (ast_str_container_add(declined->declined, var->value)) {
1731                 return -1;
1732         }
1733
1734         return 0;
1735 }
1736
1737 /*!
1738  * @{ \brief Define multi user event message type(s).
1739  */
1740
1741 STASIS_MESSAGE_TYPE_DEFN(ast_multi_user_event_type,
1742         .to_json = multi_user_event_to_json,
1743         .to_ami = multi_user_event_to_ami,
1744         );
1745
1746 /*! @} */
1747
1748 /*! \brief Cleanup function for graceful shutdowns */
1749 static void stasis_cleanup(void)
1750 {
1751         ast_threadpool_shutdown(pool);
1752         pool = NULL;
1753         STASIS_MESSAGE_TYPE_CLEANUP(stasis_subscription_change_type);
1754         STASIS_MESSAGE_TYPE_CLEANUP(ast_multi_user_event_type);
1755         aco_info_destroy(&cfg_info);
1756         ao2_global_obj_release(globals);
1757 }
1758
1759 int stasis_init(void)
1760 {
1761         struct stasis_config *cfg;
1762         int cache_init;
1763         struct ast_threadpool_options threadpool_opts = { 0, };
1764
1765         /* Be sure the types are cleaned up after the message bus */
1766         ast_register_cleanup(stasis_cleanup);
1767
1768         if (aco_info_init(&cfg_info)) {
1769                 return -1;
1770         }
1771
1772         aco_option_register_custom(&cfg_info, "decline", ACO_EXACT,
1773                 declined_options, "", declined_handler, 0);
1774         aco_option_register(&cfg_info, "initial_size", ACO_EXACT,
1775                 threadpool_options, "5", OPT_INT_T, PARSE_IN_RANGE,
1776                 FLDSET(struct stasis_threadpool_conf, initial_size), 0,
1777                 INT_MAX);
1778         aco_option_register(&cfg_info, "idle_timeout_sec", ACO_EXACT,
1779                 threadpool_options, "20", OPT_INT_T, PARSE_IN_RANGE,
1780                 FLDSET(struct stasis_threadpool_conf, idle_timeout_sec), 0,
1781                 INT_MAX);
1782         aco_option_register(&cfg_info, "max_size", ACO_EXACT,
1783                 threadpool_options, "50", OPT_INT_T, PARSE_IN_RANGE,
1784                 FLDSET(struct stasis_threadpool_conf, max_size), 0,
1785                 INT_MAX);
1786
1787         if (aco_process_config(&cfg_info, 0) == ACO_PROCESS_ERROR) {
1788                 struct stasis_config *default_cfg = stasis_config_alloc();
1789
1790                 if (!default_cfg) {
1791                         return -1;
1792                 }
1793
1794                 if (aco_set_defaults(&threadpool_option, "threadpool", default_cfg->threadpool_options)) {
1795                         ast_log(LOG_ERROR, "Failed to initialize defaults on Stasis configuration object\n");
1796                         ao2_ref(default_cfg, -1);
1797
1798                         return -1;
1799                 }
1800
1801                 if (aco_set_defaults(&declined_option, "declined_message_types", default_cfg->declined_message_types)) {
1802                         ast_log(LOG_ERROR, "Failed to load stasis.conf and failed to initialize defaults.\n");
1803                         ao2_ref(default_cfg, -1);
1804
1805                         return -1;
1806                 }
1807
1808                 ast_log(LOG_NOTICE, "Could not load Stasis configuration; using defaults\n");
1809                 ao2_global_obj_replace_unref(globals, default_cfg);
1810                 cfg = default_cfg;
1811         } else {
1812                 cfg = ao2_global_obj_ref(globals);
1813                 if (!cfg) {
1814                         ast_log(LOG_ERROR, "Failed to obtain Stasis configuration object\n");
1815
1816                         return -1;
1817                 }
1818         }
1819
1820         threadpool_opts.version = AST_THREADPOOL_OPTIONS_VERSION;
1821         threadpool_opts.initial_size = cfg->threadpool_options->initial_size;
1822         threadpool_opts.auto_increment = 1;
1823         threadpool_opts.max_size = cfg->threadpool_options->max_size;
1824         threadpool_opts.idle_timeout = cfg->threadpool_options->idle_timeout_sec;
1825         pool = ast_threadpool_create("stasis-core", NULL, &threadpool_opts);
1826         ao2_ref(cfg, -1);
1827         if (!pool) {
1828                 ast_log(LOG_ERROR, "Failed to create 'stasis-core' threadpool\n");
1829
1830                 return -1;
1831         }
1832
1833         cache_init = stasis_cache_init();
1834         if (cache_init != 0) {
1835                 return -1;
1836         }
1837
1838         if (STASIS_MESSAGE_TYPE_INIT(stasis_subscription_change_type) != 0) {
1839                 return -1;
1840         }
1841         if (STASIS_MESSAGE_TYPE_INIT(ast_multi_user_event_type) != 0) {
1842                 return -1;
1843         }
1844
1845         return 0;
1846 }