Stasis: Fix unsafe use of stasis_unsubscribe in modules.
[asterisk/asterisk.git] / main / stasis.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis Message Bus API.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 ASTERISK_REGISTER_FILE();
33
34 #include "asterisk/astobj2.h"
35 #include "asterisk/stasis_internal.h"
36 #include "asterisk/stasis.h"
37 #include "asterisk/taskprocessor.h"
38 #include "asterisk/threadpool.h"
39 #include "asterisk/utils.h"
40 #include "asterisk/uuid.h"
41 #include "asterisk/vector.h"
42 #include "asterisk/stasis_channels.h"
43 #include "asterisk/stasis_bridges.h"
44 #include "asterisk/stasis_endpoints.h"
45 #include "asterisk/config_options.h"
46
47 /*** DOCUMENTATION
48         <managerEvent language="en_US" name="UserEvent">
49                 <managerEventInstance class="EVENT_FLAG_USER">
50                         <synopsis>A user defined event raised from the dialplan.</synopsis>
51                         <syntax>
52                                 <channel_snapshot/>
53                                 <parameter name="UserEvent">
54                                         <para>The event name, as specified in the dialplan.</para>
55                                 </parameter>
56                         </syntax>
57                         <description>
58                                 <para>Event may contain additional arbitrary parameters in addition to optional bridge and endpoint snapshots.  Multiple snapshots of the same type are prefixed with a numeric value.</para>
59                         </description>
60                         <see-also>
61                                 <ref type="application">UserEvent</ref>
62                         </see-also>
63                 </managerEventInstance>
64         </managerEvent>
65         <configInfo name="stasis" language="en_US">
66                 <configFile name="stasis.conf">
67                         <configObject name="threadpool">
68                                 <synopsis>Settings that configure the threadpool Stasis uses to deliver some messages.</synopsis>
69                                 <configOption name="initial_size" default="5">
70                                         <synopsis>Initial number of threads in the message bus threadpool.</synopsis>
71                                 </configOption>
72                                 <configOption name="idle_timeout_sec" default="20">
73                                         <synopsis>Number of seconds before an idle thread is disposed of.</synopsis>
74                                 </configOption>
75                                 <configOption name="max_size" default="50">
76                                         <synopsis>Maximum number of threads in the threadpool.</synopsis>
77                                 </configOption>
78                         </configObject>
79                         <configObject name="declined_message_types">
80                                 <synopsis>Stasis message types for which to decline creation.</synopsis>
81                                 <configOption name="decline">
82                                         <synopsis>The message type to decline.</synopsis>
83                                         <description>
84                                                 <para>This configuration option defines the name of the Stasis
85                                                 message type that Asterisk is forbidden from creating and can be
86                                                 specified as many times as necessary to achieve the desired result.</para>
87                                                 <enumlist>
88                                                         <enum name="stasis_app_recording_snapshot_type" />
89                                                         <enum name="stasis_app_playback_snapshot_type" />
90                                                         <enum name="stasis_test_message_type" />
91                                                         <enum name="confbridge_start_type" />
92                                                         <enum name="confbridge_end_type" />
93                                                         <enum name="confbridge_join_type" />
94                                                         <enum name="confbridge_leave_type" />
95                                                         <enum name="confbridge_start_record_type" />
96                                                         <enum name="confbridge_stop_record_type" />
97                                                         <enum name="confbridge_mute_type" />
98                                                         <enum name="confbridge_unmute_type" />
99                                                         <enum name="confbridge_talking_type" />
100                                                         <enum name="cel_generic_type" />
101                                                         <enum name="ast_bridge_snapshot_type" />
102                                                         <enum name="ast_bridge_merge_message_type" />
103                                                         <enum name="ast_channel_entered_bridge_type" />
104                                                         <enum name="ast_channel_left_bridge_type" />
105                                                         <enum name="ast_blind_transfer_type" />
106                                                         <enum name="ast_attended_transfer_type" />
107                                                         <enum name="ast_endpoint_snapshot_type" />
108                                                         <enum name="ast_endpoint_state_type" />
109                                                         <enum name="ast_device_state_message_type" />
110                                                         <enum name="ast_test_suite_message_type" />
111                                                         <enum name="ast_mwi_state_type" />
112                                                         <enum name="ast_mwi_vm_app_type" />
113                                                         <enum name="ast_format_register_type" />
114                                                         <enum name="ast_format_unregister_type" />
115                                                         <enum name="ast_manager_get_generic_type" />
116                                                         <enum name="ast_parked_call_type" />
117                                                         <enum name="ast_channel_snapshot_type" />
118                                                         <enum name="ast_channel_dial_type" />
119                                                         <enum name="ast_channel_varset_type" />
120                                                         <enum name="ast_channel_hangup_request_type" />
121                                                         <enum name="ast_channel_dtmf_begin_type" />
122                                                         <enum name="ast_channel_dtmf_end_type" />
123                                                         <enum name="ast_channel_hold_type" />
124                                                         <enum name="ast_channel_unhold_type" />
125                                                         <enum name="ast_channel_chanspy_start_type" />
126                                                         <enum name="ast_channel_chanspy_stop_type" />
127                                                         <enum name="ast_channel_fax_type" />
128                                                         <enum name="ast_channel_hangup_handler_type" />
129                                                         <enum name="ast_channel_moh_start_type" />
130                                                         <enum name="ast_channel_moh_stop_type" />
131                                                         <enum name="ast_channel_monitor_start_type" />
132                                                         <enum name="ast_channel_monitor_stop_type" />
133                                                         <enum name="ast_channel_agent_login_type" />
134                                                         <enum name="ast_channel_agent_logoff_type" />
135                                                         <enum name="ast_channel_talking_start" />
136                                                         <enum name="ast_channel_talking_stop" />
137                                                         <enum name="ast_security_event_type" />
138                                                         <enum name="ast_named_acl_change_type" />
139                                                         <enum name="ast_local_bridge_type" />
140                                                         <enum name="ast_local_optimization_begin_type" />
141                                                         <enum name="ast_local_optimization_end_type" />
142                                                         <enum name="stasis_subscription_change_type" />
143                                                         <enum name="ast_multi_user_event_type" />
144                                                         <enum name="stasis_cache_clear_type" />
145                                                         <enum name="stasis_cache_update_type" />
146                                                         <enum name="ast_network_change_type" />
147                                                         <enum name="ast_system_registry_type" />
148                                                         <enum name="ast_cc_available_type" />
149                                                         <enum name="ast_cc_offertimerstart_type" />
150                                                         <enum name="ast_cc_requested_type" />
151                                                         <enum name="ast_cc_requestacknowledged_type" />
152                                                         <enum name="ast_cc_callerstopmonitoring_type" />
153                                                         <enum name="ast_cc_callerstartmonitoring_type" />
154                                                         <enum name="ast_cc_callerrecalling_type" />
155                                                         <enum name="ast_cc_recallcomplete_type" />
156                                                         <enum name="ast_cc_failure_type" />
157                                                         <enum name="ast_cc_monitorfailed_type" />
158                                                         <enum name="ast_presence_state_message_type" />
159                                                         <enum name="ast_rtp_rtcp_sent_type" />
160                                                         <enum name="ast_rtp_rtcp_received_type" />
161                                                         <enum name="ast_call_pickup_type" />
162                                                         <enum name="aoc_s_type" />
163                                                         <enum name="aoc_d_type" />
164                                                         <enum name="aoc_e_type" />
165                                                         <enum name="dahdichannel_type" />
166                                                         <enum name="mcid_type" />
167                                                         <enum name="session_timeout_type" />
168                                                         <enum name="cdr_read_message_type" />
169                                                         <enum name="cdr_write_message_type" />
170                                                         <enum name="cdr_prop_write_message_type" />
171                                                         <enum name="corosync_ping_message_type" />
172                                                         <enum name="agi_exec_start_type" />
173                                                         <enum name="agi_exec_end_type" />
174                                                         <enum name="agi_async_start_type" />
175                                                         <enum name="agi_async_exec_type" />
176                                                         <enum name="agi_async_end_type" />
177                                                         <enum name="queue_caller_join_type" />
178                                                         <enum name="queue_caller_leave_type" />
179                                                         <enum name="queue_caller_abandon_type" />
180                                                         <enum name="queue_member_status_type" />
181                                                         <enum name="queue_member_added_type" />
182                                                         <enum name="queue_member_removed_type" />
183                                                         <enum name="queue_member_pause_type" />
184                                                         <enum name="queue_member_penalty_type" />
185                                                         <enum name="queue_member_ringinuse_type" />
186                                                         <enum name="queue_agent_called_type" />
187                                                         <enum name="queue_agent_connect_type" />
188                                                         <enum name="queue_agent_complete_type" />
189                                                         <enum name="queue_agent_dump_type" />
190                                                         <enum name="queue_agent_ringnoanswer_type" />
191                                                         <enum name="meetme_join_type" />
192                                                         <enum name="meetme_leave_type" />
193                                                         <enum name="meetme_end_type" />
194                                                         <enum name="meetme_mute_type" />
195                                                         <enum name="meetme_talking_type" />
196                                                         <enum name="meetme_talk_request_type" />
197                                                         <enum name="appcdr_message_type" />
198                                                         <enum name="forkcdr_message_type" />
199                                                         <enum name="cdr_sync_message_type" />
200                                                 </enumlist>
201                                         </description>
202                                 </configOption>
203                         </configObject>
204                 </configFile>
205         </configInfo>
206 ***/
207
208 /*!
209  * \page stasis-impl Stasis Implementation Notes
210  *
211  * \par Reference counting
212  *
213  * Stasis introduces a number of objects, which are tightly related to one
214  * another. Because we rely on ref-counting for memory management, understanding
215  * these relationships is important to understanding this code.
216  *
217  * \code{.txt}
218  *
219  *   stasis_topic <----> stasis_subscription
220  *             ^          ^
221  *              \        /
222  *               \      /
223  *               dispatch
224  *                  |
225  *                  |
226  *                  v
227  *            stasis_message
228  *                  |
229  *                  |
230  *                  v
231  *          stasis_message_type
232  *
233  * \endcode
234  *
235  * The most troubling thing in this chart is the cyclic reference between
236  * stasis_topic and stasis_subscription. This is both unfortunate, and
237  * necessary. Topics need the subscription in order to dispatch messages;
238  * subscriptions need the topics to unsubscribe and check subscription status.
239  *
240  * The cycle is broken by stasis_unsubscribe(). The unsubscribe will remove the
241  * topic's reference to a subscription. When the subcription is destroyed, it
242  * will remove its reference to the topic.
243  *
244  * This means that until a subscription has be explicitly unsubscribed, it will
245  * not be destroyed. Neither will a topic be destroyed while it has subscribers.
246  * The destructors of both have assertions regarding this to catch ref-counting
247  * problems where a subscription or topic has had an extra ao2_cleanup().
248  *
249  * The \ref dispatch object is a transient object, which is posted to a
250  * subscription's taskprocessor to send a message to the subscriber. They have
251  * short life cycles, allocated on one thread, destroyed on another.
252  *
253  * During shutdown, or the deletion of a domain object, there are a flurry of
254  * ao2_cleanup()s on subscriptions and topics, as the final in-flight messages
255  * are processed. Any one of these cleanups could be the one to actually destroy
256  * a given object, so care must be taken to ensure that an object isn't
257  * referenced after an ao2_cleanup(). This includes the implicit ao2_unlock()
258  * that might happen when a RAII_VAR() goes out of scope.
259  *
260  * \par Typical life cycles
261  *
262  *  \li stasis_topic - There are several topics which live for the duration of
263  *      the Asterisk process (ast_channel_topic_all(), etc.) but most of these
264  *      are actually fed by shorter-lived topics whose lifetime is associated
265  *      with some domain object (like ast_channel_topic() for a given
266  *      ast_channel).
267  *
268  *  \li stasis_subscription - Subscriptions have a similar mix of lifetimes as
269  *      topics, for similar reasons.
270  *
271  *  \li dispatch - Very short lived; just long enough to post a message to a
272  *      subscriber.
273  *
274  *  \li stasis_message - Short to intermediate lifetimes, but that is mostly
275  *      irrelevant. Messages are strictly data and have no behavior associated
276  *      with them, so it doesn't really matter if/when they are destroyed. By
277  *      design, a component could hold a ref to a message forever without any
278  *      ill consequences (aside from consuming more memory).
279  *
280  *  \li stasis_message_type - Long life cycles, typically only destroyed on
281  *      module unloading or _clean_ process exit.
282  *
283  * \par Subscriber shutdown sequencing
284  *
285  * Subscribers are sensitive to shutdown sequencing, specifically in how the
286  * reference message types. This is fully detailed on the wiki at
287  * https://wiki.asterisk.org/wiki/x/K4BqAQ.
288  *
289  * In short, the lifetime of the \a data (and \a callback, if in a module) must
290  * be held until the stasis_subscription_final_message() has been received.
291  * Depending on the structure of the subscriber code, this can be handled by
292  * using stasis_subscription_final_message() to free resources on the final
293  * message, or using stasis_subscription_join()/stasis_unsubscribe_and_join() to
294  * block until the unsubscribe has completed.
295  */
296
297 /*! Initial size of the subscribers list. */
298 #define INITIAL_SUBSCRIBERS_MAX 4
299
300 /*! The number of buckets to use for topic pools */
301 #define TOPIC_POOL_BUCKETS 57
302
303 /*! Thread pool for topics that don't want a dedicated taskprocessor */
304 static struct ast_threadpool *pool;
305
306 STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type);
307
308 /*! \internal */
309 struct stasis_topic {
310         char *name;
311         /*! Variable length array of the subscribers */
312         AST_VECTOR(, struct stasis_subscription *) subscribers;
313
314         /*! Topics forwarding into this topic */
315         AST_VECTOR(, struct stasis_topic *) upstream_topics;
316 };
317
318 /* Forward declarations for the tightly-coupled subscription object */
319 static int topic_add_subscription(struct stasis_topic *topic,
320         struct stasis_subscription *sub);
321
322 static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
323
324 /*! \brief Lock two topics. */
325 #define topic_lock_both(topic1, topic2) \
326         do { \
327                 ao2_lock(topic1); \
328                 while (ao2_trylock(topic2)) { \
329                         AO2_DEADLOCK_AVOIDANCE(topic1); \
330                 } \
331         } while (0)
332
333 static void topic_dtor(void *obj)
334 {
335         struct stasis_topic *topic = obj;
336
337         /* Subscribers hold a reference to topics, so they should all be
338          * unsubscribed before we get here. */
339         ast_assert(AST_VECTOR_SIZE(&topic->subscribers) == 0);
340
341         ast_free(topic->name);
342         topic->name = NULL;
343
344         AST_VECTOR_FREE(&topic->subscribers);
345         AST_VECTOR_FREE(&topic->upstream_topics);
346 }
347
348 struct stasis_topic *stasis_topic_create(const char *name)
349 {
350         struct stasis_topic *topic;
351         int res = 0;
352
353         topic = ao2_t_alloc(sizeof(*topic), topic_dtor, name);
354         if (!topic) {
355                 return NULL;
356         }
357
358         topic->name = ast_strdup(name);
359         res |= AST_VECTOR_INIT(&topic->subscribers, INITIAL_SUBSCRIBERS_MAX);
360         res |= AST_VECTOR_INIT(&topic->upstream_topics, 0);
361         if (!topic->name || res) {
362                 ao2_cleanup(topic);
363                 return NULL;
364         }
365
366         return topic;
367 }
368
369 const char *stasis_topic_name(const struct stasis_topic *topic)
370 {
371         return topic->name;
372 }
373
374 /*! \internal */
375 struct stasis_subscription {
376         /*! Unique ID for this subscription */
377         char uniqueid[AST_UUID_STR_LEN];
378         /*! Topic subscribed to. */
379         struct stasis_topic *topic;
380         /*! Mailbox for processing incoming messages. */
381         struct ast_taskprocessor *mailbox;
382         /*! Callback function for incoming message processing. */
383         stasis_subscription_cb callback;
384         /*! Data pointer to be handed to the callback. */
385         void *data;
386
387         /*! Condition for joining with subscription. */
388         ast_cond_t join_cond;
389         /*! Flag set when final message for sub has been received.
390          *  Be sure join_lock is held before reading/setting. */
391         int final_message_rxed;
392         /*! Flag set when final message for sub has been processed.
393          *  Be sure join_lock is held before reading/setting. */
394         int final_message_processed;
395 };
396
397 static void subscription_dtor(void *obj)
398 {
399         struct stasis_subscription *sub = obj;
400
401         /* Subscriptions need to be manually unsubscribed before destruction
402          * b/c there's a cyclic reference between topics and subscriptions */
403         ast_assert(!stasis_subscription_is_subscribed(sub));
404         /* If there are any messages in flight to this subscription; that would
405          * be bad. */
406         ast_assert(stasis_subscription_is_done(sub));
407
408         ao2_cleanup(sub->topic);
409         sub->topic = NULL;
410         ast_taskprocessor_unreference(sub->mailbox);
411         sub->mailbox = NULL;
412         ast_cond_destroy(&sub->join_cond);
413 }
414
415 /*!
416  * \brief Invoke the subscription's callback.
417  * \param sub Subscription to invoke.
418  * \param topic Topic message was published to.
419  * \param message Message to send.
420  */
421 static void subscription_invoke(struct stasis_subscription *sub,
422                                   struct stasis_message *message)
423 {
424         /* Notify that the final message has been received */
425         if (stasis_subscription_final_message(sub, message)) {
426                 SCOPED_AO2LOCK(lock, sub);
427
428                 sub->final_message_rxed = 1;
429                 ast_cond_signal(&sub->join_cond);
430         }
431
432         /* Since sub is mostly immutable, no need to lock sub */
433         sub->callback(sub->data, sub, message);
434
435         /* Notify that the final message has been processed */
436         if (stasis_subscription_final_message(sub, message)) {
437                 SCOPED_AO2LOCK(lock, sub);
438
439                 sub->final_message_processed = 1;
440                 ast_cond_signal(&sub->join_cond);
441         }
442 }
443
444 static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
445 static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
446
447 void stasis_subscription_cb_noop(void *data, struct stasis_subscription *sub, struct stasis_message *message)
448 {
449 }
450
451 struct stasis_subscription *internal_stasis_subscribe(
452         struct stasis_topic *topic,
453         stasis_subscription_cb callback,
454         void *data,
455         int needs_mailbox,
456         int use_thread_pool)
457 {
458         RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup);
459
460         if (!topic) {
461                 return NULL;
462         }
463
464         /* The ao2 lock is used for join_cond. */
465         sub = ao2_t_alloc(sizeof(*sub), subscription_dtor, topic->name);
466         if (!sub) {
467                 return NULL;
468         }
469         ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid));
470
471         if (needs_mailbox) {
472                 /* With a small number of subscribers, a thread-per-sub is
473                  * acceptable. For larger number of subscribers, a thread
474                  * pool should be used.
475                  */
476                 if (use_thread_pool) {
477                         sub->mailbox = ast_threadpool_serializer(sub->uniqueid, pool);
478                 } else {
479                         sub->mailbox = ast_taskprocessor_get(sub->uniqueid,
480                                 TPS_REF_DEFAULT);
481                 }
482                 if (!sub->mailbox) {
483                         return NULL;
484                 }
485                 ast_taskprocessor_set_local(sub->mailbox, sub);
486                 /* Taskprocessor has a reference */
487                 ao2_ref(sub, +1);
488         }
489
490         ao2_ref(topic, +1);
491         sub->topic = topic;
492         sub->callback = callback;
493         sub->data = data;
494         ast_cond_init(&sub->join_cond, NULL);
495
496         if (topic_add_subscription(topic, sub) != 0) {
497                 return NULL;
498         }
499         send_subscription_subscribe(topic, sub);
500
501         ao2_ref(sub, +1);
502         return sub;
503 }
504
505 struct stasis_subscription *stasis_subscribe(
506         struct stasis_topic *topic,
507         stasis_subscription_cb callback,
508         void *data)
509 {
510         return internal_stasis_subscribe(topic, callback, data, 1, 0);
511 }
512
513 struct stasis_subscription *stasis_subscribe_pool(
514         struct stasis_topic *topic,
515         stasis_subscription_cb callback,
516         void *data)
517 {
518         return internal_stasis_subscribe(topic, callback, data, 1, 1);
519 }
520
521 static int sub_cleanup(void *data)
522 {
523         struct stasis_subscription *sub = data;
524         ao2_cleanup(sub);
525         return 0;
526 }
527
528 struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
529 {
530         /* The subscription may be the last ref to this topic. Hold
531          * the topic ref open until after the unlock. */
532         RAII_VAR(struct stasis_topic *, topic,
533                 ao2_bump(sub ? sub->topic : NULL), ao2_cleanup);
534
535         if (!sub) {
536                 return NULL;
537         }
538
539         /* We have to remove the subscription first, to ensure the unsubscribe
540          * is the final message */
541         if (topic_remove_subscription(sub->topic, sub) != 0) {
542                 ast_log(LOG_ERROR,
543                         "Internal error: subscription has invalid topic\n");
544                 return NULL;
545         }
546
547         /* Now let everyone know about the unsubscribe */
548         send_subscription_unsubscribe(topic, sub);
549
550         /* When all that's done, remove the ref the mailbox has on the sub */
551         if (sub->mailbox) {
552                 ast_taskprocessor_push(sub->mailbox, sub_cleanup, sub);
553         }
554
555         /* Unsubscribing unrefs the subscription */
556         ao2_cleanup(sub);
557         return NULL;
558 }
559
560 void stasis_subscription_join(struct stasis_subscription *subscription)
561 {
562         if (subscription) {
563                 SCOPED_AO2LOCK(lock, subscription);
564
565                 /* Wait until the processed flag has been set */
566                 while (!subscription->final_message_processed) {
567                         ast_cond_wait(&subscription->join_cond,
568                                 ao2_object_get_lockaddr(subscription));
569                 }
570         }
571 }
572
573 int stasis_subscription_is_done(struct stasis_subscription *subscription)
574 {
575         if (subscription) {
576                 SCOPED_AO2LOCK(lock, subscription);
577
578                 return subscription->final_message_rxed;
579         }
580
581         /* Null subscription is about as done as you can get */
582         return 1;
583 }
584
585 struct stasis_subscription *stasis_unsubscribe_and_join(
586         struct stasis_subscription *subscription)
587 {
588         if (!subscription) {
589                 return NULL;
590         }
591
592         /* Bump refcount to hold it past the unsubscribe */
593         ao2_ref(subscription, +1);
594         stasis_unsubscribe(subscription);
595         stasis_subscription_join(subscription);
596         /* Now decrement the refcount back */
597         ao2_cleanup(subscription);
598         return NULL;
599 }
600
601 int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
602 {
603         if (sub) {
604                 size_t i;
605                 struct stasis_topic *topic = sub->topic;
606                 SCOPED_AO2LOCK(lock_topic, topic);
607
608                 for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
609                         if (AST_VECTOR_GET(&topic->subscribers, i) == sub) {
610                                 return 1;
611                         }
612                 }
613         }
614
615         return 0;
616 }
617
618 const char *stasis_subscription_uniqueid(const struct stasis_subscription *sub)
619 {
620         return sub->uniqueid;
621 }
622
623 int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
624 {
625         struct stasis_subscription_change *change;
626
627         if (stasis_message_type(msg) != stasis_subscription_change_type()) {
628                 return 0;
629         }
630
631         change = stasis_message_data(msg);
632         if (strcmp("Unsubscribe", change->description)) {
633                 return 0;
634         }
635
636         if (strcmp(stasis_subscription_uniqueid(sub), change->uniqueid)) {
637                 return 0;
638         }
639
640         return 1;
641 }
642
643 /*!
644  * \brief Add a subscriber to a topic.
645  * \param topic Topic
646  * \param sub Subscriber
647  * \return 0 on success
648  * \return Non-zero on error
649  */
650 static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
651 {
652         size_t idx;
653         SCOPED_AO2LOCK(lock, topic);
654
655         /* The reference from the topic to the subscription is shared with
656          * the owner of the subscription, which will explicitly unsubscribe
657          * to release it.
658          *
659          * If we bumped the refcount here, the owner would have to unsubscribe
660          * and cleanup, which is a bit awkward. */
661         AST_VECTOR_APPEND(&topic->subscribers, sub);
662
663         for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
664                 topic_add_subscription(
665                         AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
666         }
667
668         return 0;
669 }
670
671 static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
672 {
673         size_t idx;
674         SCOPED_AO2LOCK(lock_topic, topic);
675
676         for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
677                 topic_remove_subscription(
678                         AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
679         }
680
681         return AST_VECTOR_REMOVE_ELEM_UNORDERED(&topic->subscribers, sub,
682                 AST_VECTOR_ELEM_CLEANUP_NOOP);
683 }
684
685 /*!
686  * \internal \brief Dispatch a message to a subscriber asynchronously
687  * \param local \ref ast_taskprocessor_local object
688  * \return 0
689  */
690 static int dispatch_exec_async(struct ast_taskprocessor_local *local)
691 {
692         struct stasis_subscription *sub = local->local_data;
693         struct stasis_message *message = local->data;
694
695         subscription_invoke(sub, message);
696         ao2_cleanup(message);
697
698         return 0;
699 }
700
701 /*!
702  * \internal \brief Data passed to \ref dispatch_exec_sync to synchronize
703  * a published message to a subscriber
704  */
705 struct sync_task_data {
706         ast_mutex_t lock;
707         ast_cond_t cond;
708         int complete;
709         void *task_data;
710 };
711
712 /*!
713  * \internal \brief Dispatch a message to a subscriber synchronously
714  * \param local \ref ast_taskprocessor_local object
715  * \return 0
716  */
717 static int dispatch_exec_sync(struct ast_taskprocessor_local *local)
718 {
719         struct stasis_subscription *sub = local->local_data;
720         struct sync_task_data *std = local->data;
721         struct stasis_message *message = std->task_data;
722
723         subscription_invoke(sub, message);
724         ao2_cleanup(message);
725
726         ast_mutex_lock(&std->lock);
727         std->complete = 1;
728         ast_cond_signal(&std->cond);
729         ast_mutex_unlock(&std->lock);
730
731         return 0;
732 }
733
734 /*!
735  * \internal \brief Dispatch a message to a subscriber
736  * \param sub The subscriber to dispatch to
737  * \param message The message to send
738  * \param synchronous If non-zero, synchronize on the subscriber receiving
739  * the message
740  */
741 static void dispatch_message(struct stasis_subscription *sub,
742         struct stasis_message *message,
743         int synchronous)
744 {
745         if (!sub->mailbox) {
746                 /* Dispatch directly */
747                 subscription_invoke(sub, message);
748                 return;
749         }
750
751         /* Bump the message for the taskprocessor push. This will get de-ref'd
752          * by the task processor callback.
753          */
754         ao2_bump(message);
755         if (!synchronous) {
756                 if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec_async, message)) {
757                         /* Push failed; ugh. */
758                         ast_log(LOG_ERROR, "Dropping async dispatch\n");
759                         ao2_cleanup(message);
760                 }
761         } else {
762                 struct sync_task_data std;
763
764                 ast_mutex_init(&std.lock);
765                 ast_cond_init(&std.cond, NULL);
766                 std.complete = 0;
767                 std.task_data = message;
768
769                 if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec_sync, &std)) {
770                         /* Push failed; ugh. */
771                         ast_log(LOG_ERROR, "Dropping sync dispatch\n");
772                         ao2_cleanup(message);
773                         ast_mutex_destroy(&std.lock);
774                         ast_cond_destroy(&std.cond);
775                         return;
776                 }
777
778                 ast_mutex_lock(&std.lock);
779                 while (!std.complete) {
780                         ast_cond_wait(&std.cond, &std.lock);
781                 }
782                 ast_mutex_unlock(&std.lock);
783
784                 ast_mutex_destroy(&std.lock);
785                 ast_cond_destroy(&std.cond);
786         }
787 }
788
789 /*!
790  * \internal \brief Publish a message to a topic's subscribers
791  * \brief topic The topic to publish to
792  * \brief message The message to publish
793  * \brief sync_sub An optional subscriber of the topic to publish synchronously
794  * to
795  */
796 static void publish_msg(struct stasis_topic *topic,
797         struct stasis_message *message, struct stasis_subscription *sync_sub)
798 {
799         size_t i;
800
801         ast_assert(topic != NULL);
802         ast_assert(message != NULL);
803
804         /*
805          * The topic may be unref'ed by the subscription invocation.
806          * Make sure we hold onto a reference while dispatching.
807          */
808         ao2_ref(topic, +1);
809         ao2_lock(topic);
810         for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
811                 struct stasis_subscription *sub = AST_VECTOR_GET(&topic->subscribers, i);
812
813                 ast_assert(sub != NULL);
814
815                 dispatch_message(sub, message, (sub == sync_sub));
816         }
817         ao2_unlock(topic);
818         ao2_ref(topic, -1);
819 }
820
821 void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
822 {
823         publish_msg(topic, message, NULL);
824 }
825
826 void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message)
827 {
828         ast_assert(sub != NULL);
829
830         publish_msg(sub->topic, message, sub);
831 }
832
833 /*!
834  * \brief Forwarding information
835  *
836  * Any message posted to \a from_topic is forwarded to \a to_topic.
837  *
838  * In cases where both the \a from_topic and \a to_topic need to be locked,
839  * always lock the \a to_topic first, then the \a from_topic. Lest you deadlock.
840  */
841 struct stasis_forward {
842         /*! Originating topic */
843         struct stasis_topic *from_topic;
844         /*! Destination topic */
845         struct stasis_topic *to_topic;
846 };
847
848 static void forward_dtor(void *obj)
849 {
850         struct stasis_forward *forward = obj;
851
852         ao2_cleanup(forward->from_topic);
853         forward->from_topic = NULL;
854         ao2_cleanup(forward->to_topic);
855         forward->to_topic = NULL;
856 }
857
858 struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward)
859 {
860         int idx;
861         struct stasis_topic *from;
862         struct stasis_topic *to;
863
864         if (!forward) {
865                 return NULL;
866         }
867
868         from = forward->from_topic;
869         to = forward->to_topic;
870
871         if (from && to) {
872                 topic_lock_both(to, from);
873                 AST_VECTOR_REMOVE_ELEM_UNORDERED(&to->upstream_topics, from,
874                         AST_VECTOR_ELEM_CLEANUP_NOOP);
875
876                 for (idx = 0; idx < AST_VECTOR_SIZE(&to->subscribers); ++idx) {
877                         topic_remove_subscription(from, AST_VECTOR_GET(&to->subscribers, idx));
878                 }
879                 ao2_unlock(from);
880                 ao2_unlock(to);
881         }
882
883         ao2_cleanup(forward);
884
885         return NULL;
886 }
887
888 struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic,
889         struct stasis_topic *to_topic)
890 {
891         int res;
892         size_t idx;
893         RAII_VAR(struct stasis_forward *, forward, NULL, ao2_cleanup);
894
895         if (!from_topic || !to_topic) {
896                 return NULL;
897         }
898
899         forward = ao2_alloc_options(sizeof(*forward), forward_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
900         if (!forward) {
901                 return NULL;
902         }
903
904         /* Forwards to ourselves are implicit. */
905         if (to_topic == from_topic) {
906                 return ao2_bump(forward);
907         }
908
909         forward->from_topic = ao2_bump(from_topic);
910         forward->to_topic = ao2_bump(to_topic);
911
912         topic_lock_both(to_topic, from_topic);
913         res = AST_VECTOR_APPEND(&to_topic->upstream_topics, from_topic);
914         if (res != 0) {
915                 ao2_unlock(from_topic);
916                 ao2_unlock(to_topic);
917                 return NULL;
918         }
919
920         for (idx = 0; idx < AST_VECTOR_SIZE(&to_topic->subscribers); ++idx) {
921                 topic_add_subscription(from_topic, AST_VECTOR_GET(&to_topic->subscribers, idx));
922         }
923         ao2_unlock(from_topic);
924         ao2_unlock(to_topic);
925
926         return ao2_bump(forward);
927 }
928
929 static void subscription_change_dtor(void *obj)
930 {
931         struct stasis_subscription_change *change = obj;
932
933         ast_string_field_free_memory(change);
934         ao2_cleanup(change->topic);
935 }
936
937 static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, const char *uniqueid, const char *description)
938 {
939         struct stasis_subscription_change *change;
940
941         change = ao2_alloc(sizeof(struct stasis_subscription_change), subscription_change_dtor);
942         if (!change || ast_string_field_init(change, 128)) {
943                 ao2_cleanup(change);
944                 return NULL;
945         }
946
947         ast_string_field_set(change, uniqueid, uniqueid);
948         ast_string_field_set(change, description, description);
949         ao2_ref(topic, +1);
950         change->topic = topic;
951
952         return change;
953 }
954
955 static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
956 {
957         struct stasis_subscription_change *change;
958         struct stasis_message *msg;
959
960         /* This assumes that we have already unsubscribed */
961         ast_assert(stasis_subscription_is_subscribed(sub));
962
963         if (!stasis_subscription_change_type()) {
964                 return;
965         }
966
967         change = subscription_change_alloc(topic, sub->uniqueid, "Subscribe");
968         if (!change) {
969                 return;
970         }
971
972         msg = stasis_message_create(stasis_subscription_change_type(), change);
973         if (!msg) {
974                 ao2_cleanup(change);
975                 return;
976         }
977
978         stasis_publish(topic, msg);
979         ao2_cleanup(msg);
980         ao2_cleanup(change);
981 }
982
983 static void send_subscription_unsubscribe(struct stasis_topic *topic,
984         struct stasis_subscription *sub)
985 {
986         struct stasis_subscription_change *change;
987         struct stasis_message *msg;
988
989         /* This assumes that we have already unsubscribed */
990         ast_assert(!stasis_subscription_is_subscribed(sub));
991
992         if (!stasis_subscription_change_type()) {
993                 return;
994         }
995
996         change = subscription_change_alloc(topic, sub->uniqueid, "Unsubscribe");
997         if (!change) {
998                 return;
999         }
1000
1001         msg = stasis_message_create(stasis_subscription_change_type(), change);
1002         if (!msg) {
1003                 ao2_cleanup(change);
1004                 return;
1005         }
1006
1007         stasis_publish(topic, msg);
1008
1009         /* Now we have to dispatch to the subscription itself */
1010         dispatch_message(sub, msg, 0);
1011
1012         ao2_cleanup(msg);
1013         ao2_cleanup(change);
1014 }
1015
1016 struct topic_pool_entry {
1017         struct stasis_forward *forward;
1018         struct stasis_topic *topic;
1019 };
1020
1021 static void topic_pool_entry_dtor(void *obj)
1022 {
1023         struct topic_pool_entry *entry = obj;
1024
1025         entry->forward = stasis_forward_cancel(entry->forward);
1026         ao2_cleanup(entry->topic);
1027         entry->topic = NULL;
1028 }
1029
1030 static struct topic_pool_entry *topic_pool_entry_alloc(void)
1031 {
1032         return ao2_alloc_options(sizeof(struct topic_pool_entry), topic_pool_entry_dtor,
1033                 AO2_ALLOC_OPT_LOCK_NOLOCK);
1034 }
1035
1036 struct stasis_topic_pool {
1037         struct ao2_container *pool_container;
1038         struct stasis_topic *pool_topic;
1039 };
1040
1041 static void topic_pool_dtor(void *obj)
1042 {
1043         struct stasis_topic_pool *pool = obj;
1044
1045         ao2_cleanup(pool->pool_container);
1046         pool->pool_container = NULL;
1047         ao2_cleanup(pool->pool_topic);
1048         pool->pool_topic = NULL;
1049 }
1050
1051 static int topic_pool_entry_hash(const void *obj, const int flags)
1052 {
1053         const struct topic_pool_entry *object;
1054         const char *key;
1055
1056         switch (flags & OBJ_SEARCH_MASK) {
1057         case OBJ_SEARCH_KEY:
1058                 key = obj;
1059                 break;
1060         case OBJ_SEARCH_OBJECT:
1061                 object = obj;
1062                 key = stasis_topic_name(object->topic);
1063                 break;
1064         default:
1065                 /* Hash can only work on something with a full key. */
1066                 ast_assert(0);
1067                 return 0;
1068         }
1069         return ast_str_case_hash(key);
1070 }
1071
1072 static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
1073 {
1074         const struct topic_pool_entry *object_left = obj;
1075         const struct topic_pool_entry *object_right = arg;
1076         const char *right_key = arg;
1077         int cmp;
1078
1079         switch (flags & OBJ_SEARCH_MASK) {
1080         case OBJ_SEARCH_OBJECT:
1081                 right_key = stasis_topic_name(object_right->topic);
1082                 /* Fall through */
1083         case OBJ_SEARCH_KEY:
1084                 cmp = strcasecmp(stasis_topic_name(object_left->topic), right_key);
1085                 break;
1086         case OBJ_SEARCH_PARTIAL_KEY:
1087                 /* Not supported by container */
1088                 ast_assert(0);
1089                 cmp = -1;
1090                 break;
1091         default:
1092                 /*
1093                  * What arg points to is specific to this traversal callback
1094                  * and has no special meaning to astobj2.
1095                  */
1096                 cmp = 0;
1097                 break;
1098         }
1099         if (cmp) {
1100                 return 0;
1101         }
1102         /*
1103          * At this point the traversal callback is identical to a sorted
1104          * container.
1105          */
1106         return CMP_MATCH;
1107 }
1108
1109 struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic)
1110 {
1111         struct stasis_topic_pool *pool;
1112
1113         pool = ao2_alloc_options(sizeof(*pool), topic_pool_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
1114         if (!pool) {
1115                 return NULL;
1116         }
1117
1118         pool->pool_container = ao2_container_alloc(TOPIC_POOL_BUCKETS,
1119                 topic_pool_entry_hash, topic_pool_entry_cmp);
1120         if (!pool->pool_container) {
1121                 ao2_cleanup(pool);
1122                 return NULL;
1123         }
1124         ao2_ref(pooled_topic, +1);
1125         pool->pool_topic = pooled_topic;
1126
1127         return pool;
1128 }
1129
1130 struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
1131 {
1132         RAII_VAR(struct topic_pool_entry *, topic_pool_entry, NULL, ao2_cleanup);
1133         SCOPED_AO2LOCK(topic_container_lock, pool->pool_container);
1134
1135         topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1136         if (topic_pool_entry) {
1137                 return topic_pool_entry->topic;
1138         }
1139
1140         topic_pool_entry = topic_pool_entry_alloc();
1141         if (!topic_pool_entry) {
1142                 return NULL;
1143         }
1144
1145         topic_pool_entry->topic = stasis_topic_create(topic_name);
1146         if (!topic_pool_entry->topic) {
1147                 return NULL;
1148         }
1149
1150         topic_pool_entry->forward = stasis_forward_all(topic_pool_entry->topic, pool->pool_topic);
1151         if (!topic_pool_entry->forward) {
1152                 return NULL;
1153         }
1154
1155         if (!ao2_link_flags(pool->pool_container, topic_pool_entry, OBJ_NOLOCK)) {
1156                 return NULL;
1157         }
1158
1159         return topic_pool_entry->topic;
1160 }
1161
1162 void stasis_log_bad_type_access(const char *name)
1163 {
1164 #ifdef AST_DEVMODE
1165         ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name);
1166 #endif
1167 }
1168
1169 /*! \brief A multi object blob data structure to carry user event stasis messages */
1170 struct ast_multi_object_blob {
1171         struct ast_json *blob;                             /*< A blob of JSON data */
1172         AST_VECTOR(, void *) snapshots[STASIS_UMOS_MAX];   /*< Vector of snapshots for each type */
1173 };
1174
1175 /*!
1176  * \internal
1177  * \brief Destructor for \ref ast_multi_object_blob objects
1178  */
1179 static void multi_object_blob_dtor(void *obj)
1180 {
1181         struct ast_multi_object_blob *multi = obj;
1182         int type;
1183         int i;
1184
1185         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1186                 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
1187                         ao2_cleanup(AST_VECTOR_GET(&multi->snapshots[type], i));
1188                 }
1189                 AST_VECTOR_FREE(&multi->snapshots[type]);
1190         }
1191         ast_json_unref(multi->blob);
1192 }
1193
1194 /*! \brief Create a stasis user event multi object blob */
1195 struct ast_multi_object_blob *ast_multi_object_blob_create(struct ast_json *blob)
1196 {
1197         int type;
1198         RAII_VAR(struct ast_multi_object_blob *, multi,
1199                         ao2_alloc(sizeof(*multi), multi_object_blob_dtor),
1200                         ao2_cleanup);
1201
1202         ast_assert(blob != NULL);
1203
1204         if (!multi) {
1205                 return NULL;
1206         }
1207
1208         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1209                 if (AST_VECTOR_INIT(&multi->snapshots[type], 0)) {
1210                         return NULL;
1211                 }
1212         }
1213
1214         multi->blob = ast_json_ref(blob);
1215
1216         ao2_ref(multi, +1);
1217         return multi;
1218 }
1219
1220 /*! \brief Add an object (snapshot) to the blob */
1221 void ast_multi_object_blob_add(struct ast_multi_object_blob *multi,
1222         enum stasis_user_multi_object_snapshot_type type, void *object)
1223 {
1224         if (!multi || !object) {
1225                 return;
1226         }
1227         AST_VECTOR_APPEND(&multi->snapshots[type],object);
1228 }
1229
1230 /*! \brief Publish single channel user event (for app_userevent compatibility) */
1231 void ast_multi_object_blob_single_channel_publish(struct ast_channel *chan,
1232         struct stasis_message_type *type, struct ast_json *blob)
1233 {
1234         RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
1235         RAII_VAR(struct ast_channel_snapshot *, channel_snapshot, NULL, ao2_cleanup);
1236         RAII_VAR(struct ast_multi_object_blob *, multi, NULL, ao2_cleanup);
1237
1238         if (!type) {
1239                 return;
1240         }
1241
1242         multi = ast_multi_object_blob_create(blob);
1243         if (!multi) {
1244                 return;
1245         }
1246
1247         channel_snapshot = ast_channel_snapshot_create(chan);
1248         ao2_ref(channel_snapshot, +1);
1249         ast_multi_object_blob_add(multi, STASIS_UMOS_CHANNEL, channel_snapshot);
1250
1251         message = stasis_message_create(type, multi);
1252         if (message) {
1253                 /* app_userevent still publishes to channel */
1254                 stasis_publish(ast_channel_topic(chan), message);
1255         }
1256 }
1257
1258 /*! \internal \brief convert multi object blob to ari json */
1259 static struct ast_json *multi_user_event_to_json(
1260         struct stasis_message *message,
1261         const struct stasis_message_sanitizer *sanitize)
1262 {
1263         RAII_VAR(struct ast_json *, out, NULL, ast_json_unref);
1264         struct ast_multi_object_blob *multi = stasis_message_data(message);
1265         struct ast_json *blob = multi->blob;
1266         const struct timeval *tv = stasis_message_timestamp(message);
1267         enum stasis_user_multi_object_snapshot_type type;
1268         int i;
1269
1270         out = ast_json_object_create();
1271         if (!out) {
1272                 return NULL;
1273         }
1274
1275         ast_json_object_set(out, "type", ast_json_string_create("ChannelUserevent"));
1276         ast_json_object_set(out, "timestamp", ast_json_timeval(*tv, NULL));
1277         ast_json_object_set(out, "eventname", ast_json_ref(ast_json_object_get(blob, "eventname")));
1278         ast_json_object_set(out, "userevent", ast_json_ref(blob)); /* eventname gets duplicated, that's ok */
1279
1280         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1281                 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
1282                         struct ast_json *json_object = NULL;
1283                         char *name = NULL;
1284                         void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
1285
1286                         switch (type) {
1287                         case STASIS_UMOS_CHANNEL:
1288                                 json_object = ast_channel_snapshot_to_json(snapshot, sanitize);
1289                                 name = "channel";
1290                                 break;
1291                         case STASIS_UMOS_BRIDGE:
1292                                 json_object = ast_bridge_snapshot_to_json(snapshot, sanitize);
1293                                 name = "bridge";
1294                                 break;
1295                         case STASIS_UMOS_ENDPOINT:
1296                                 json_object = ast_endpoint_snapshot_to_json(snapshot, sanitize);
1297                                 name = "endpoint";
1298                                 break;
1299                         }
1300                         if (json_object) {
1301                                 ast_json_object_set(out, name, json_object);
1302                         }
1303                 }
1304         }
1305         return ast_json_ref(out);
1306 }
1307
1308 /*! \internal \brief convert multi object blob to ami string */
1309 static struct ast_str *multi_object_blob_to_ami(void *obj)
1310 {
1311         struct ast_str *ami_str=ast_str_create(1024);
1312         struct ast_str *ami_snapshot;
1313         const struct ast_multi_object_blob *multi = obj;
1314         enum stasis_user_multi_object_snapshot_type type;
1315         int i;
1316
1317         if (!ami_str) {
1318                 return NULL;
1319         }
1320         if (!multi) {
1321                 ast_free(ami_str);
1322                 return NULL;
1323         }
1324
1325         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1326                 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
1327                         char *name = "";
1328                         void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
1329                         ami_snapshot = NULL;
1330
1331                         if (i > 0) {
1332                                 ast_asprintf(&name, "%d", i + 1);
1333                         }
1334
1335                         switch (type) {
1336                         case STASIS_UMOS_CHANNEL:
1337                                 ami_snapshot = ast_manager_build_channel_state_string_prefix(snapshot, name);
1338                                 break;
1339
1340                         case STASIS_UMOS_BRIDGE:
1341                                 ami_snapshot = ast_manager_build_bridge_state_string_prefix(snapshot, name);
1342                                 break;
1343
1344                         case STASIS_UMOS_ENDPOINT:
1345                                 /* currently not sending endpoint snapshots to AMI */
1346                                 break;
1347                         }
1348                         if (ami_snapshot) {
1349                                 ast_str_append(&ami_str, 0, "%s", ast_str_buffer(ami_snapshot));
1350                                 ast_free(ami_snapshot);
1351                         }
1352                 }
1353         }
1354
1355         return ami_str;
1356 }
1357
1358 /*! \internal \brief Callback to pass only user defined parameters from blob */
1359 static int userevent_exclusion_cb(const char *key)
1360 {
1361         if (!strcmp("eventname", key)) {
1362                 return 1;
1363         }
1364         return 0;
1365 }
1366
1367 static struct ast_manager_event_blob *multi_user_event_to_ami(
1368         struct stasis_message *message)
1369 {
1370         RAII_VAR(struct ast_str *, object_string, NULL, ast_free);
1371         RAII_VAR(struct ast_str *, body, NULL, ast_free);
1372         struct ast_multi_object_blob *multi = stasis_message_data(message);
1373         const char *eventname;
1374
1375         eventname = ast_json_string_get(ast_json_object_get(multi->blob, "eventname"));
1376         body = ast_manager_str_from_json_object(multi->blob, userevent_exclusion_cb);
1377         object_string = multi_object_blob_to_ami(multi);
1378         if (!object_string || !body) {
1379                 return NULL;
1380         }
1381
1382         return ast_manager_event_blob_create(EVENT_FLAG_USER, "UserEvent",
1383                 "%s"
1384                 "UserEvent: %s\r\n"
1385                 "%s",
1386                 ast_str_buffer(object_string),
1387                 eventname,
1388                 ast_str_buffer(body));
1389 }
1390
1391 /*! \brief A structure to hold global configuration-related options */
1392 struct stasis_declined_config {
1393         /*! The list of message types to decline */
1394         struct ao2_container *declined;
1395 };
1396
1397 /*! \brief Threadpool configuration options */
1398 struct stasis_threadpool_conf {
1399         /*! Initial size of the thread pool */
1400         int initial_size;
1401         /*! Time, in seconds, before we expire a thread */
1402         int idle_timeout_sec;
1403         /*! Maximum number of thread to allow */
1404         int max_size;
1405 };
1406
1407 struct stasis_config {
1408         /*! Thread pool configuration options */
1409         struct stasis_threadpool_conf *threadpool_options;
1410         /*! Declined message types */
1411         struct stasis_declined_config *declined_message_types;
1412 };
1413
1414 static struct aco_type threadpool_option = {
1415         .type = ACO_GLOBAL,
1416         .name = "threadpool",
1417         .item_offset = offsetof(struct stasis_config, threadpool_options),
1418         .category = "^threadpool$",
1419         .category_match = ACO_WHITELIST,
1420 };
1421
1422 static struct aco_type *threadpool_options[] = ACO_TYPES(&threadpool_option);
1423
1424 /*! \brief An aco_type structure to link the "declined_message_types" category to the stasis_declined_config type */
1425 static struct aco_type declined_option = {
1426         .type = ACO_GLOBAL,
1427         .name = "declined_message_types",
1428         .item_offset = offsetof(struct stasis_config, declined_message_types),
1429         .category_match = ACO_WHITELIST,
1430         .category = "^declined_message_types$",
1431 };
1432
1433 struct aco_type *declined_options[] = ACO_TYPES(&declined_option);
1434
1435 struct aco_file stasis_conf = {
1436         .filename = "stasis.conf",
1437         .types = ACO_TYPES(&declined_option, &threadpool_option),
1438 };
1439
1440 /*! \brief A global object container that will contain the stasis_config that gets swapped out on reloads */
1441 static AO2_GLOBAL_OBJ_STATIC(globals);
1442
1443 static void *stasis_config_alloc(void);
1444
1445 /*! \brief Register information about the configs being processed by this module */
1446 CONFIG_INFO_CORE("stasis", cfg_info, globals, stasis_config_alloc,
1447         .files = ACO_FILES(&stasis_conf),
1448 );
1449
1450 static void stasis_declined_config_destructor(void *obj)
1451 {
1452         struct stasis_declined_config *declined = obj;
1453
1454         ao2_cleanup(declined->declined);
1455 }
1456
1457 static void stasis_config_destructor(void *obj)
1458 {
1459         struct stasis_config *cfg = obj;
1460
1461         ao2_cleanup(cfg->declined_message_types);
1462         ast_free(cfg->threadpool_options);
1463 }
1464
1465 static void *stasis_config_alloc(void)
1466 {
1467         struct stasis_config *cfg;
1468
1469         if (!(cfg = ao2_alloc(sizeof(*cfg), stasis_config_destructor))) {
1470                 return NULL;
1471         }
1472
1473         cfg->threadpool_options = ast_calloc(1, sizeof(*cfg->threadpool_options));
1474         if (!cfg->threadpool_options) {
1475                 ao2_ref(cfg, -1);
1476                 return NULL;
1477         }
1478
1479         cfg->declined_message_types = ao2_alloc(sizeof(*cfg->declined_message_types),
1480                 stasis_declined_config_destructor);
1481         if (!cfg->declined_message_types) {
1482                 ao2_ref(cfg, -1);
1483                 return NULL;
1484         }
1485
1486         cfg->declined_message_types->declined = ast_str_container_alloc(13);
1487         if (!cfg->declined_message_types->declined) {
1488                 ao2_ref(cfg, -1);
1489                 return NULL;
1490         }
1491
1492         return cfg;
1493 }
1494
1495 int stasis_message_type_declined(const char *name)
1496 {
1497         RAII_VAR(struct stasis_config *, cfg, ao2_global_obj_ref(globals), ao2_cleanup);
1498         char *name_in_declined;
1499         int res;
1500
1501         if (!cfg || !cfg->declined_message_types) {
1502                 return 0;
1503         }
1504
1505         name_in_declined = ao2_find(cfg->declined_message_types->declined, name, OBJ_SEARCH_KEY);
1506         res = name_in_declined ? 1 : 0;
1507         ao2_cleanup(name_in_declined);
1508         if (res) {
1509                 ast_log(LOG_NOTICE, "Declining to allocate Stasis message type '%s' due to configuration\n", name);
1510         }
1511         return res;
1512 }
1513
1514 static int declined_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
1515 {
1516         struct stasis_declined_config *declined = obj;
1517
1518         if (ast_strlen_zero(var->value)) {
1519                 return 0;
1520         }
1521
1522         if (ast_str_container_add(declined->declined, var->value)) {
1523                 return -1;
1524         }
1525
1526         return 0;
1527 }
1528
1529 /*!
1530  * @{ \brief Define multi user event message type(s).
1531  */
1532
1533 STASIS_MESSAGE_TYPE_DEFN(ast_multi_user_event_type,
1534         .to_json = multi_user_event_to_json,
1535         .to_ami = multi_user_event_to_ami,
1536         );
1537
1538 /*! @} */
1539
1540 /*! \brief Cleanup function for graceful shutdowns */
1541 static void stasis_cleanup(void)
1542 {
1543         ast_threadpool_shutdown(pool);
1544         pool = NULL;
1545         STASIS_MESSAGE_TYPE_CLEANUP(stasis_subscription_change_type);
1546         STASIS_MESSAGE_TYPE_CLEANUP(ast_multi_user_event_type);
1547         aco_info_destroy(&cfg_info);
1548         ao2_global_obj_release(globals);
1549 }
1550
1551 int stasis_init(void)
1552 {
1553         RAII_VAR(struct stasis_config *, cfg, NULL, ao2_cleanup);
1554         int cache_init;
1555         struct ast_threadpool_options threadpool_opts = { 0, };
1556
1557         /* Be sure the types are cleaned up after the message bus */
1558         ast_register_cleanup(stasis_cleanup);
1559
1560         if (aco_info_init(&cfg_info)) {
1561                 return -1;
1562         }
1563
1564         aco_option_register_custom(&cfg_info, "decline", ACO_EXACT,
1565                 declined_options, "", declined_handler, 0);
1566         aco_option_register(&cfg_info, "initial_size", ACO_EXACT,
1567                 threadpool_options, "5", OPT_INT_T, PARSE_IN_RANGE,
1568                 FLDSET(struct stasis_threadpool_conf, initial_size), 0,
1569                 INT_MAX);
1570         aco_option_register(&cfg_info, "idle_timeout_sec", ACO_EXACT,
1571                 threadpool_options, "20", OPT_INT_T, PARSE_IN_RANGE,
1572                 FLDSET(struct stasis_threadpool_conf, idle_timeout_sec), 0,
1573                 INT_MAX);
1574         aco_option_register(&cfg_info, "max_size", ACO_EXACT,
1575                 threadpool_options, "50", OPT_INT_T, PARSE_IN_RANGE,
1576                 FLDSET(struct stasis_threadpool_conf, max_size), 0,
1577                 INT_MAX);
1578
1579         if (aco_process_config(&cfg_info, 0) == ACO_PROCESS_ERROR) {
1580                 struct stasis_config *default_cfg = stasis_config_alloc();
1581
1582                 if (!default_cfg) {
1583                         return -1;
1584                 }
1585
1586                 if (aco_set_defaults(&threadpool_option, "threadpool", default_cfg->threadpool_options)) {
1587                         ast_log(LOG_ERROR, "Failed to initialize defaults on Stasis configuration object\n");
1588                         ao2_ref(default_cfg, -1);
1589                         return -1;
1590                 }
1591
1592                 if (aco_set_defaults(&declined_option, "declined_message_types", default_cfg->declined_message_types)) {
1593                         ast_log(LOG_ERROR, "Failed to load stasis.conf and failed to initialize defaults.\n");
1594                         return -1;
1595                 }
1596
1597                 ast_log(LOG_NOTICE, "Could not load Stasis configuration; using defaults\n");
1598                 ao2_global_obj_replace_unref(globals, default_cfg);
1599                 cfg = default_cfg;
1600         } else {
1601                 cfg = ao2_global_obj_ref(globals);
1602                 if (!cfg) {
1603                         ast_log(LOG_ERROR, "Failed to obtain Stasis configuration object\n");
1604                         return -1;
1605                 }
1606         }
1607
1608         threadpool_opts.version = AST_THREADPOOL_OPTIONS_VERSION;
1609         threadpool_opts.initial_size = cfg->threadpool_options->initial_size;
1610         threadpool_opts.auto_increment = 1;
1611         threadpool_opts.max_size = cfg->threadpool_options->max_size;
1612         threadpool_opts.idle_timeout = cfg->threadpool_options->idle_timeout_sec;
1613         pool = ast_threadpool_create("stasis-core", NULL, &threadpool_opts);
1614         if (!pool) {
1615                 ast_log(LOG_ERROR, "Failed to create 'stasis-core' threadpool\n");
1616                 return -1;
1617         }
1618
1619         cache_init = stasis_cache_init();
1620         if (cache_init != 0) {
1621                 return -1;
1622         }
1623
1624         if (STASIS_MESSAGE_TYPE_INIT(stasis_subscription_change_type) != 0) {
1625                 return -1;
1626         }
1627         if (STASIS_MESSAGE_TYPE_INIT(ast_multi_user_event_type) != 0) {
1628                 return -1;
1629         }
1630
1631         return 0;
1632 }
1633