manager: Add <see-also> tags to relate UserEvent actions/apps/events
[asterisk/asterisk.git] / main / stasis.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis Message Bus API.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 ASTERISK_REGISTER_FILE();
33
34 #include "asterisk/astobj2.h"
35 #include "asterisk/stasis_internal.h"
36 #include "asterisk/stasis.h"
37 #include "asterisk/taskprocessor.h"
38 #include "asterisk/threadpool.h"
39 #include "asterisk/utils.h"
40 #include "asterisk/uuid.h"
41 #include "asterisk/vector.h"
42 #include "asterisk/stasis_channels.h"
43 #include "asterisk/stasis_bridges.h"
44 #include "asterisk/stasis_endpoints.h"
45 #include "asterisk/config_options.h"
46
47 /*** DOCUMENTATION
48         <managerEvent language="en_US" name="UserEvent">
49                 <managerEventInstance class="EVENT_FLAG_USER">
50                         <synopsis>A user defined event raised from the dialplan.</synopsis>
51                         <syntax>
52                                 <channel_snapshot/>
53                                 <parameter name="UserEvent">
54                                         <para>The event name, as specified in the dialplan.</para>
55                                 </parameter>
56                         </syntax>
57                         <description>
58                                 <para>Event may contain additional arbitrary parameters in addition to optional bridge and endpoint snapshots.  Multiple snapshots of the same type are prefixed with a numeric value.</para>
59                         </description>
60                         <see-also>
61                                 <ref type="application">UserEvent</ref>
62                                 <ref type="managerEvent">UserEvent</ref>
63                         </see-also>
64                 </managerEventInstance>
65         </managerEvent>
66         <configInfo name="stasis" language="en_US">
67                 <configFile name="stasis.conf">
68                         <configObject name="threadpool">
69                                 <synopsis>Settings that configure the threadpool Stasis uses to deliver some messages.</synopsis>
70                                 <configOption name="initial_size" default="5">
71                                         <synopsis>Initial number of threads in the message bus threadpool.</synopsis>
72                                 </configOption>
73                                 <configOption name="idle_timeout_sec" default="20">
74                                         <synopsis>Number of seconds before an idle thread is disposed of.</synopsis>
75                                 </configOption>
76                                 <configOption name="max_size" default="50">
77                                         <synopsis>Maximum number of threads in the threadpool.</synopsis>
78                                 </configOption>
79                         </configObject>
80                         <configObject name="declined_message_types">
81                                 <synopsis>Stasis message types for which to decline creation.</synopsis>
82                                 <configOption name="decline">
83                                         <synopsis>The message type to decline.</synopsis>
84                                         <description>
85                                                 <para>This configuration option defines the name of the Stasis
86                                                 message type that Asterisk is forbidden from creating and can be
87                                                 specified as many times as necessary to achieve the desired result.</para>
88                                                 <enumlist>
89                                                         <enum name="stasis_app_recording_snapshot_type" />
90                                                         <enum name="stasis_app_playback_snapshot_type" />
91                                                         <enum name="stasis_test_message_type" />
92                                                         <enum name="confbridge_start_type" />
93                                                         <enum name="confbridge_end_type" />
94                                                         <enum name="confbridge_join_type" />
95                                                         <enum name="confbridge_leave_type" />
96                                                         <enum name="confbridge_start_record_type" />
97                                                         <enum name="confbridge_stop_record_type" />
98                                                         <enum name="confbridge_mute_type" />
99                                                         <enum name="confbridge_unmute_type" />
100                                                         <enum name="confbridge_talking_type" />
101                                                         <enum name="cel_generic_type" />
102                                                         <enum name="ast_bridge_snapshot_type" />
103                                                         <enum name="ast_bridge_merge_message_type" />
104                                                         <enum name="ast_channel_entered_bridge_type" />
105                                                         <enum name="ast_channel_left_bridge_type" />
106                                                         <enum name="ast_blind_transfer_type" />
107                                                         <enum name="ast_attended_transfer_type" />
108                                                         <enum name="ast_endpoint_snapshot_type" />
109                                                         <enum name="ast_endpoint_state_type" />
110                                                         <enum name="ast_device_state_message_type" />
111                                                         <enum name="ast_test_suite_message_type" />
112                                                         <enum name="ast_mwi_state_type" />
113                                                         <enum name="ast_mwi_vm_app_type" />
114                                                         <enum name="ast_format_register_type" />
115                                                         <enum name="ast_format_unregister_type" />
116                                                         <enum name="ast_manager_get_generic_type" />
117                                                         <enum name="ast_parked_call_type" />
118                                                         <enum name="ast_channel_snapshot_type" />
119                                                         <enum name="ast_channel_dial_type" />
120                                                         <enum name="ast_channel_varset_type" />
121                                                         <enum name="ast_channel_hangup_request_type" />
122                                                         <enum name="ast_channel_dtmf_begin_type" />
123                                                         <enum name="ast_channel_dtmf_end_type" />
124                                                         <enum name="ast_channel_hold_type" />
125                                                         <enum name="ast_channel_unhold_type" />
126                                                         <enum name="ast_channel_chanspy_start_type" />
127                                                         <enum name="ast_channel_chanspy_stop_type" />
128                                                         <enum name="ast_channel_fax_type" />
129                                                         <enum name="ast_channel_hangup_handler_type" />
130                                                         <enum name="ast_channel_moh_start_type" />
131                                                         <enum name="ast_channel_moh_stop_type" />
132                                                         <enum name="ast_channel_monitor_start_type" />
133                                                         <enum name="ast_channel_monitor_stop_type" />
134                                                         <enum name="ast_channel_agent_login_type" />
135                                                         <enum name="ast_channel_agent_logoff_type" />
136                                                         <enum name="ast_channel_talking_start" />
137                                                         <enum name="ast_channel_talking_stop" />
138                                                         <enum name="ast_security_event_type" />
139                                                         <enum name="ast_named_acl_change_type" />
140                                                         <enum name="ast_local_bridge_type" />
141                                                         <enum name="ast_local_optimization_begin_type" />
142                                                         <enum name="ast_local_optimization_end_type" />
143                                                         <enum name="stasis_subscription_change_type" />
144                                                         <enum name="ast_multi_user_event_type" />
145                                                         <enum name="stasis_cache_clear_type" />
146                                                         <enum name="stasis_cache_update_type" />
147                                                         <enum name="ast_network_change_type" />
148                                                         <enum name="ast_system_registry_type" />
149                                                         <enum name="ast_cc_available_type" />
150                                                         <enum name="ast_cc_offertimerstart_type" />
151                                                         <enum name="ast_cc_requested_type" />
152                                                         <enum name="ast_cc_requestacknowledged_type" />
153                                                         <enum name="ast_cc_callerstopmonitoring_type" />
154                                                         <enum name="ast_cc_callerstartmonitoring_type" />
155                                                         <enum name="ast_cc_callerrecalling_type" />
156                                                         <enum name="ast_cc_recallcomplete_type" />
157                                                         <enum name="ast_cc_failure_type" />
158                                                         <enum name="ast_cc_monitorfailed_type" />
159                                                         <enum name="ast_presence_state_message_type" />
160                                                         <enum name="ast_rtp_rtcp_sent_type" />
161                                                         <enum name="ast_rtp_rtcp_received_type" />
162                                                         <enum name="ast_call_pickup_type" />
163                                                         <enum name="aoc_s_type" />
164                                                         <enum name="aoc_d_type" />
165                                                         <enum name="aoc_e_type" />
166                                                         <enum name="dahdichannel_type" />
167                                                         <enum name="mcid_type" />
168                                                         <enum name="session_timeout_type" />
169                                                         <enum name="cdr_read_message_type" />
170                                                         <enum name="cdr_write_message_type" />
171                                                         <enum name="cdr_prop_write_message_type" />
172                                                         <enum name="corosync_ping_message_type" />
173                                                         <enum name="agi_exec_start_type" />
174                                                         <enum name="agi_exec_end_type" />
175                                                         <enum name="agi_async_start_type" />
176                                                         <enum name="agi_async_exec_type" />
177                                                         <enum name="agi_async_end_type" />
178                                                         <enum name="queue_caller_join_type" />
179                                                         <enum name="queue_caller_leave_type" />
180                                                         <enum name="queue_caller_abandon_type" />
181                                                         <enum name="queue_member_status_type" />
182                                                         <enum name="queue_member_added_type" />
183                                                         <enum name="queue_member_removed_type" />
184                                                         <enum name="queue_member_pause_type" />
185                                                         <enum name="queue_member_penalty_type" />
186                                                         <enum name="queue_member_ringinuse_type" />
187                                                         <enum name="queue_agent_called_type" />
188                                                         <enum name="queue_agent_connect_type" />
189                                                         <enum name="queue_agent_complete_type" />
190                                                         <enum name="queue_agent_dump_type" />
191                                                         <enum name="queue_agent_ringnoanswer_type" />
192                                                         <enum name="meetme_join_type" />
193                                                         <enum name="meetme_leave_type" />
194                                                         <enum name="meetme_end_type" />
195                                                         <enum name="meetme_mute_type" />
196                                                         <enum name="meetme_talking_type" />
197                                                         <enum name="meetme_talk_request_type" />
198                                                         <enum name="appcdr_message_type" />
199                                                         <enum name="forkcdr_message_type" />
200                                                         <enum name="cdr_sync_message_type" />
201                                                 </enumlist>
202                                         </description>
203                                 </configOption>
204                         </configObject>
205                 </configFile>
206         </configInfo>
207 ***/
208
209 /*!
210  * \page stasis-impl Stasis Implementation Notes
211  *
212  * \par Reference counting
213  *
214  * Stasis introduces a number of objects, which are tightly related to one
215  * another. Because we rely on ref-counting for memory management, understanding
216  * these relationships is important to understanding this code.
217  *
218  * \code{.txt}
219  *
220  *   stasis_topic <----> stasis_subscription
221  *             ^          ^
222  *              \        /
223  *               \      /
224  *               dispatch
225  *                  |
226  *                  |
227  *                  v
228  *            stasis_message
229  *                  |
230  *                  |
231  *                  v
232  *          stasis_message_type
233  *
234  * \endcode
235  *
236  * The most troubling thing in this chart is the cyclic reference between
237  * stasis_topic and stasis_subscription. This is both unfortunate, and
238  * necessary. Topics need the subscription in order to dispatch messages;
239  * subscriptions need the topics to unsubscribe and check subscription status.
240  *
241  * The cycle is broken by stasis_unsubscribe(). The unsubscribe will remove the
242  * topic's reference to a subscription. When the subcription is destroyed, it
243  * will remove its reference to the topic.
244  *
245  * This means that until a subscription has be explicitly unsubscribed, it will
246  * not be destroyed. Neither will a topic be destroyed while it has subscribers.
247  * The destructors of both have assertions regarding this to catch ref-counting
248  * problems where a subscription or topic has had an extra ao2_cleanup().
249  *
250  * The \ref dispatch object is a transient object, which is posted to a
251  * subscription's taskprocessor to send a message to the subscriber. They have
252  * short life cycles, allocated on one thread, destroyed on another.
253  *
254  * During shutdown, or the deletion of a domain object, there are a flurry of
255  * ao2_cleanup()s on subscriptions and topics, as the final in-flight messages
256  * are processed. Any one of these cleanups could be the one to actually destroy
257  * a given object, so care must be taken to ensure that an object isn't
258  * referenced after an ao2_cleanup(). This includes the implicit ao2_unlock()
259  * that might happen when a RAII_VAR() goes out of scope.
260  *
261  * \par Typical life cycles
262  *
263  *  \li stasis_topic - There are several topics which live for the duration of
264  *      the Asterisk process (ast_channel_topic_all(), etc.) but most of these
265  *      are actually fed by shorter-lived topics whose lifetime is associated
266  *      with some domain object (like ast_channel_topic() for a given
267  *      ast_channel).
268  *
269  *  \li stasis_subscription - Subscriptions have a similar mix of lifetimes as
270  *      topics, for similar reasons.
271  *
272  *  \li dispatch - Very short lived; just long enough to post a message to a
273  *      subscriber.
274  *
275  *  \li stasis_message - Short to intermediate lifetimes, but that is mostly
276  *      irrelevant. Messages are strictly data and have no behavior associated
277  *      with them, so it doesn't really matter if/when they are destroyed. By
278  *      design, a component could hold a ref to a message forever without any
279  *      ill consequences (aside from consuming more memory).
280  *
281  *  \li stasis_message_type - Long life cycles, typically only destroyed on
282  *      module unloading or _clean_ process exit.
283  *
284  * \par Subscriber shutdown sequencing
285  *
286  * Subscribers are sensitive to shutdown sequencing, specifically in how the
287  * reference message types. This is fully detailed on the wiki at
288  * https://wiki.asterisk.org/wiki/x/K4BqAQ.
289  *
290  * In short, the lifetime of the \a data (and \a callback, if in a module) must
291  * be held until the stasis_subscription_final_message() has been received.
292  * Depending on the structure of the subscriber code, this can be handled by
293  * using stasis_subscription_final_message() to free resources on the final
294  * message, or using stasis_subscription_join()/stasis_unsubscribe_and_join() to
295  * block until the unsubscribe has completed.
296  */
297
298 /*! Initial size of the subscribers list. */
299 #define INITIAL_SUBSCRIBERS_MAX 4
300
301 /*! The number of buckets to use for topic pools */
302 #define TOPIC_POOL_BUCKETS 57
303
304 /*! Thread pool for topics that don't want a dedicated taskprocessor */
305 static struct ast_threadpool *pool;
306
307 STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type);
308
309 /*! \internal */
310 struct stasis_topic {
311         char *name;
312         /*! Variable length array of the subscribers */
313         AST_VECTOR(, struct stasis_subscription *) subscribers;
314
315         /*! Topics forwarding into this topic */
316         AST_VECTOR(, struct stasis_topic *) upstream_topics;
317 };
318
319 /* Forward declarations for the tightly-coupled subscription object */
320 static int topic_add_subscription(struct stasis_topic *topic,
321         struct stasis_subscription *sub);
322
323 static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
324
325 /*! \brief Lock two topics. */
326 #define topic_lock_both(topic1, topic2) \
327         do { \
328                 ao2_lock(topic1); \
329                 while (ao2_trylock(topic2)) { \
330                         AO2_DEADLOCK_AVOIDANCE(topic1); \
331                 } \
332         } while (0)
333
334 static void topic_dtor(void *obj)
335 {
336         struct stasis_topic *topic = obj;
337
338         /* Subscribers hold a reference to topics, so they should all be
339          * unsubscribed before we get here. */
340         ast_assert(AST_VECTOR_SIZE(&topic->subscribers) == 0);
341
342         ast_free(topic->name);
343         topic->name = NULL;
344
345         AST_VECTOR_FREE(&topic->subscribers);
346         AST_VECTOR_FREE(&topic->upstream_topics);
347 }
348
349 struct stasis_topic *stasis_topic_create(const char *name)
350 {
351         struct stasis_topic *topic;
352         int res = 0;
353
354         topic = ao2_t_alloc(sizeof(*topic), topic_dtor, name);
355         if (!topic) {
356                 return NULL;
357         }
358
359         topic->name = ast_strdup(name);
360         res |= AST_VECTOR_INIT(&topic->subscribers, INITIAL_SUBSCRIBERS_MAX);
361         res |= AST_VECTOR_INIT(&topic->upstream_topics, 0);
362         if (!topic->name || res) {
363                 ao2_cleanup(topic);
364                 return NULL;
365         }
366
367         return topic;
368 }
369
370 const char *stasis_topic_name(const struct stasis_topic *topic)
371 {
372         return topic->name;
373 }
374
375 /*! \internal */
376 struct stasis_subscription {
377         /*! Unique ID for this subscription */
378         char uniqueid[AST_UUID_STR_LEN];
379         /*! Topic subscribed to. */
380         struct stasis_topic *topic;
381         /*! Mailbox for processing incoming messages. */
382         struct ast_taskprocessor *mailbox;
383         /*! Callback function for incoming message processing. */
384         stasis_subscription_cb callback;
385         /*! Data pointer to be handed to the callback. */
386         void *data;
387
388         /*! Condition for joining with subscription. */
389         ast_cond_t join_cond;
390         /*! Flag set when final message for sub has been received.
391          *  Be sure join_lock is held before reading/setting. */
392         int final_message_rxed;
393         /*! Flag set when final message for sub has been processed.
394          *  Be sure join_lock is held before reading/setting. */
395         int final_message_processed;
396 };
397
398 static void subscription_dtor(void *obj)
399 {
400         struct stasis_subscription *sub = obj;
401
402         /* Subscriptions need to be manually unsubscribed before destruction
403          * b/c there's a cyclic reference between topics and subscriptions */
404         ast_assert(!stasis_subscription_is_subscribed(sub));
405         /* If there are any messages in flight to this subscription; that would
406          * be bad. */
407         ast_assert(stasis_subscription_is_done(sub));
408
409         ao2_cleanup(sub->topic);
410         sub->topic = NULL;
411         ast_taskprocessor_unreference(sub->mailbox);
412         sub->mailbox = NULL;
413         ast_cond_destroy(&sub->join_cond);
414 }
415
416 /*!
417  * \brief Invoke the subscription's callback.
418  * \param sub Subscription to invoke.
419  * \param topic Topic message was published to.
420  * \param message Message to send.
421  */
422 static void subscription_invoke(struct stasis_subscription *sub,
423                                   struct stasis_message *message)
424 {
425         /* Notify that the final message has been received */
426         if (stasis_subscription_final_message(sub, message)) {
427                 SCOPED_AO2LOCK(lock, sub);
428
429                 sub->final_message_rxed = 1;
430                 ast_cond_signal(&sub->join_cond);
431         }
432
433         /* Since sub is mostly immutable, no need to lock sub */
434         sub->callback(sub->data, sub, message);
435
436         /* Notify that the final message has been processed */
437         if (stasis_subscription_final_message(sub, message)) {
438                 SCOPED_AO2LOCK(lock, sub);
439
440                 sub->final_message_processed = 1;
441                 ast_cond_signal(&sub->join_cond);
442         }
443 }
444
445 static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
446 static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
447
448 void stasis_subscription_cb_noop(void *data, struct stasis_subscription *sub, struct stasis_message *message)
449 {
450 }
451
452 struct stasis_subscription *internal_stasis_subscribe(
453         struct stasis_topic *topic,
454         stasis_subscription_cb callback,
455         void *data,
456         int needs_mailbox,
457         int use_thread_pool)
458 {
459         RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup);
460
461         if (!topic) {
462                 return NULL;
463         }
464
465         /* The ao2 lock is used for join_cond. */
466         sub = ao2_t_alloc(sizeof(*sub), subscription_dtor, stasis_topic_name(topic));
467         if (!sub) {
468                 return NULL;
469         }
470         ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid));
471
472         if (needs_mailbox) {
473                 char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
474
475                 /* Create name with seq number appended. */
476                 ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "sub%c:%s",
477                         use_thread_pool ? 'p' : 'm',
478                         stasis_topic_name(topic));
479
480                 /*
481                  * With a small number of subscribers, a thread-per-sub is
482                  * acceptable. For a large number of subscribers, a thread
483                  * pool should be used.
484                  */
485                 if (use_thread_pool) {
486                         sub->mailbox = ast_threadpool_serializer(tps_name, pool);
487                 } else {
488                         sub->mailbox = ast_taskprocessor_get(tps_name, TPS_REF_DEFAULT);
489                 }
490                 if (!sub->mailbox) {
491                         return NULL;
492                 }
493                 ast_taskprocessor_set_local(sub->mailbox, sub);
494                 /* Taskprocessor has a reference */
495                 ao2_ref(sub, +1);
496         }
497
498         ao2_ref(topic, +1);
499         sub->topic = topic;
500         sub->callback = callback;
501         sub->data = data;
502         ast_cond_init(&sub->join_cond, NULL);
503
504         if (topic_add_subscription(topic, sub) != 0) {
505                 return NULL;
506         }
507         send_subscription_subscribe(topic, sub);
508
509         ao2_ref(sub, +1);
510         return sub;
511 }
512
513 struct stasis_subscription *stasis_subscribe(
514         struct stasis_topic *topic,
515         stasis_subscription_cb callback,
516         void *data)
517 {
518         return internal_stasis_subscribe(topic, callback, data, 1, 0);
519 }
520
521 struct stasis_subscription *stasis_subscribe_pool(
522         struct stasis_topic *topic,
523         stasis_subscription_cb callback,
524         void *data)
525 {
526         return internal_stasis_subscribe(topic, callback, data, 1, 1);
527 }
528
529 static int sub_cleanup(void *data)
530 {
531         struct stasis_subscription *sub = data;
532         ao2_cleanup(sub);
533         return 0;
534 }
535
536 struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
537 {
538         /* The subscription may be the last ref to this topic. Hold
539          * the topic ref open until after the unlock. */
540         RAII_VAR(struct stasis_topic *, topic,
541                 ao2_bump(sub ? sub->topic : NULL), ao2_cleanup);
542
543         if (!sub) {
544                 return NULL;
545         }
546
547         /* We have to remove the subscription first, to ensure the unsubscribe
548          * is the final message */
549         if (topic_remove_subscription(sub->topic, sub) != 0) {
550                 ast_log(LOG_ERROR,
551                         "Internal error: subscription has invalid topic\n");
552                 return NULL;
553         }
554
555         /* Now let everyone know about the unsubscribe */
556         send_subscription_unsubscribe(topic, sub);
557
558         /* When all that's done, remove the ref the mailbox has on the sub */
559         if (sub->mailbox) {
560                 ast_taskprocessor_push(sub->mailbox, sub_cleanup, sub);
561         }
562
563         /* Unsubscribing unrefs the subscription */
564         ao2_cleanup(sub);
565         return NULL;
566 }
567
568 int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscription,
569         long low_water, long high_water)
570 {
571         int res = -1;
572
573         if (subscription) {
574                 res = ast_taskprocessor_alert_set_levels(subscription->mailbox,
575                         low_water, high_water);
576         }
577         return res;
578 }
579
580 void stasis_subscription_join(struct stasis_subscription *subscription)
581 {
582         if (subscription) {
583                 SCOPED_AO2LOCK(lock, subscription);
584
585                 /* Wait until the processed flag has been set */
586                 while (!subscription->final_message_processed) {
587                         ast_cond_wait(&subscription->join_cond,
588                                 ao2_object_get_lockaddr(subscription));
589                 }
590         }
591 }
592
593 int stasis_subscription_is_done(struct stasis_subscription *subscription)
594 {
595         if (subscription) {
596                 SCOPED_AO2LOCK(lock, subscription);
597
598                 return subscription->final_message_rxed;
599         }
600
601         /* Null subscription is about as done as you can get */
602         return 1;
603 }
604
605 struct stasis_subscription *stasis_unsubscribe_and_join(
606         struct stasis_subscription *subscription)
607 {
608         if (!subscription) {
609                 return NULL;
610         }
611
612         /* Bump refcount to hold it past the unsubscribe */
613         ao2_ref(subscription, +1);
614         stasis_unsubscribe(subscription);
615         stasis_subscription_join(subscription);
616         /* Now decrement the refcount back */
617         ao2_cleanup(subscription);
618         return NULL;
619 }
620
621 int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
622 {
623         if (sub) {
624                 size_t i;
625                 struct stasis_topic *topic = sub->topic;
626                 SCOPED_AO2LOCK(lock_topic, topic);
627
628                 for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
629                         if (AST_VECTOR_GET(&topic->subscribers, i) == sub) {
630                                 return 1;
631                         }
632                 }
633         }
634
635         return 0;
636 }
637
638 const char *stasis_subscription_uniqueid(const struct stasis_subscription *sub)
639 {
640         return sub->uniqueid;
641 }
642
643 int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
644 {
645         struct stasis_subscription_change *change;
646
647         if (stasis_message_type(msg) != stasis_subscription_change_type()) {
648                 return 0;
649         }
650
651         change = stasis_message_data(msg);
652         if (strcmp("Unsubscribe", change->description)) {
653                 return 0;
654         }
655
656         if (strcmp(stasis_subscription_uniqueid(sub), change->uniqueid)) {
657                 return 0;
658         }
659
660         return 1;
661 }
662
663 /*!
664  * \brief Add a subscriber to a topic.
665  * \param topic Topic
666  * \param sub Subscriber
667  * \return 0 on success
668  * \return Non-zero on error
669  */
670 static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
671 {
672         size_t idx;
673         SCOPED_AO2LOCK(lock, topic);
674
675         /* The reference from the topic to the subscription is shared with
676          * the owner of the subscription, which will explicitly unsubscribe
677          * to release it.
678          *
679          * If we bumped the refcount here, the owner would have to unsubscribe
680          * and cleanup, which is a bit awkward. */
681         AST_VECTOR_APPEND(&topic->subscribers, sub);
682
683         for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
684                 topic_add_subscription(
685                         AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
686         }
687
688         return 0;
689 }
690
691 static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
692 {
693         size_t idx;
694         SCOPED_AO2LOCK(lock_topic, topic);
695
696         for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
697                 topic_remove_subscription(
698                         AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
699         }
700
701         return AST_VECTOR_REMOVE_ELEM_UNORDERED(&topic->subscribers, sub,
702                 AST_VECTOR_ELEM_CLEANUP_NOOP);
703 }
704
705 /*!
706  * \internal \brief Dispatch a message to a subscriber asynchronously
707  * \param local \ref ast_taskprocessor_local object
708  * \return 0
709  */
710 static int dispatch_exec_async(struct ast_taskprocessor_local *local)
711 {
712         struct stasis_subscription *sub = local->local_data;
713         struct stasis_message *message = local->data;
714
715         subscription_invoke(sub, message);
716         ao2_cleanup(message);
717
718         return 0;
719 }
720
721 /*!
722  * \internal \brief Data passed to \ref dispatch_exec_sync to synchronize
723  * a published message to a subscriber
724  */
725 struct sync_task_data {
726         ast_mutex_t lock;
727         ast_cond_t cond;
728         int complete;
729         void *task_data;
730 };
731
732 /*!
733  * \internal \brief Dispatch a message to a subscriber synchronously
734  * \param local \ref ast_taskprocessor_local object
735  * \return 0
736  */
737 static int dispatch_exec_sync(struct ast_taskprocessor_local *local)
738 {
739         struct stasis_subscription *sub = local->local_data;
740         struct sync_task_data *std = local->data;
741         struct stasis_message *message = std->task_data;
742
743         subscription_invoke(sub, message);
744         ao2_cleanup(message);
745
746         ast_mutex_lock(&std->lock);
747         std->complete = 1;
748         ast_cond_signal(&std->cond);
749         ast_mutex_unlock(&std->lock);
750
751         return 0;
752 }
753
754 /*!
755  * \internal \brief Dispatch a message to a subscriber
756  * \param sub The subscriber to dispatch to
757  * \param message The message to send
758  * \param synchronous If non-zero, synchronize on the subscriber receiving
759  * the message
760  */
761 static void dispatch_message(struct stasis_subscription *sub,
762         struct stasis_message *message,
763         int synchronous)
764 {
765         if (!sub->mailbox) {
766                 /* Dispatch directly */
767                 subscription_invoke(sub, message);
768                 return;
769         }
770
771         /* Bump the message for the taskprocessor push. This will get de-ref'd
772          * by the task processor callback.
773          */
774         ao2_bump(message);
775         if (!synchronous) {
776                 if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec_async, message)) {
777                         /* Push failed; ugh. */
778                         ast_log(LOG_ERROR, "Dropping async dispatch\n");
779                         ao2_cleanup(message);
780                 }
781         } else {
782                 struct sync_task_data std;
783
784                 ast_mutex_init(&std.lock);
785                 ast_cond_init(&std.cond, NULL);
786                 std.complete = 0;
787                 std.task_data = message;
788
789                 if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec_sync, &std)) {
790                         /* Push failed; ugh. */
791                         ast_log(LOG_ERROR, "Dropping sync dispatch\n");
792                         ao2_cleanup(message);
793                         ast_mutex_destroy(&std.lock);
794                         ast_cond_destroy(&std.cond);
795                         return;
796                 }
797
798                 ast_mutex_lock(&std.lock);
799                 while (!std.complete) {
800                         ast_cond_wait(&std.cond, &std.lock);
801                 }
802                 ast_mutex_unlock(&std.lock);
803
804                 ast_mutex_destroy(&std.lock);
805                 ast_cond_destroy(&std.cond);
806         }
807 }
808
809 /*!
810  * \internal \brief Publish a message to a topic's subscribers
811  * \brief topic The topic to publish to
812  * \brief message The message to publish
813  * \brief sync_sub An optional subscriber of the topic to publish synchronously
814  * to
815  */
816 static void publish_msg(struct stasis_topic *topic,
817         struct stasis_message *message, struct stasis_subscription *sync_sub)
818 {
819         size_t i;
820
821         ast_assert(topic != NULL);
822         ast_assert(message != NULL);
823
824         /*
825          * The topic may be unref'ed by the subscription invocation.
826          * Make sure we hold onto a reference while dispatching.
827          */
828         ao2_ref(topic, +1);
829         ao2_lock(topic);
830         for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
831                 struct stasis_subscription *sub = AST_VECTOR_GET(&topic->subscribers, i);
832
833                 ast_assert(sub != NULL);
834
835                 dispatch_message(sub, message, (sub == sync_sub));
836         }
837         ao2_unlock(topic);
838         ao2_ref(topic, -1);
839 }
840
841 void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
842 {
843         publish_msg(topic, message, NULL);
844 }
845
846 void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message)
847 {
848         ast_assert(sub != NULL);
849
850         publish_msg(sub->topic, message, sub);
851 }
852
853 /*!
854  * \brief Forwarding information
855  *
856  * Any message posted to \a from_topic is forwarded to \a to_topic.
857  *
858  * In cases where both the \a from_topic and \a to_topic need to be locked,
859  * always lock the \a to_topic first, then the \a from_topic. Lest you deadlock.
860  */
861 struct stasis_forward {
862         /*! Originating topic */
863         struct stasis_topic *from_topic;
864         /*! Destination topic */
865         struct stasis_topic *to_topic;
866 };
867
868 static void forward_dtor(void *obj)
869 {
870         struct stasis_forward *forward = obj;
871
872         ao2_cleanup(forward->from_topic);
873         forward->from_topic = NULL;
874         ao2_cleanup(forward->to_topic);
875         forward->to_topic = NULL;
876 }
877
878 struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward)
879 {
880         int idx;
881         struct stasis_topic *from;
882         struct stasis_topic *to;
883
884         if (!forward) {
885                 return NULL;
886         }
887
888         from = forward->from_topic;
889         to = forward->to_topic;
890
891         if (from && to) {
892                 topic_lock_both(to, from);
893                 AST_VECTOR_REMOVE_ELEM_UNORDERED(&to->upstream_topics, from,
894                         AST_VECTOR_ELEM_CLEANUP_NOOP);
895
896                 for (idx = 0; idx < AST_VECTOR_SIZE(&to->subscribers); ++idx) {
897                         topic_remove_subscription(from, AST_VECTOR_GET(&to->subscribers, idx));
898                 }
899                 ao2_unlock(from);
900                 ao2_unlock(to);
901         }
902
903         ao2_cleanup(forward);
904
905         return NULL;
906 }
907
908 struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic,
909         struct stasis_topic *to_topic)
910 {
911         int res;
912         size_t idx;
913         RAII_VAR(struct stasis_forward *, forward, NULL, ao2_cleanup);
914
915         if (!from_topic || !to_topic) {
916                 return NULL;
917         }
918
919         forward = ao2_alloc_options(sizeof(*forward), forward_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
920         if (!forward) {
921                 return NULL;
922         }
923
924         /* Forwards to ourselves are implicit. */
925         if (to_topic == from_topic) {
926                 return ao2_bump(forward);
927         }
928
929         forward->from_topic = ao2_bump(from_topic);
930         forward->to_topic = ao2_bump(to_topic);
931
932         topic_lock_both(to_topic, from_topic);
933         res = AST_VECTOR_APPEND(&to_topic->upstream_topics, from_topic);
934         if (res != 0) {
935                 ao2_unlock(from_topic);
936                 ao2_unlock(to_topic);
937                 return NULL;
938         }
939
940         for (idx = 0; idx < AST_VECTOR_SIZE(&to_topic->subscribers); ++idx) {
941                 topic_add_subscription(from_topic, AST_VECTOR_GET(&to_topic->subscribers, idx));
942         }
943         ao2_unlock(from_topic);
944         ao2_unlock(to_topic);
945
946         return ao2_bump(forward);
947 }
948
949 static void subscription_change_dtor(void *obj)
950 {
951         struct stasis_subscription_change *change = obj;
952
953         ast_string_field_free_memory(change);
954         ao2_cleanup(change->topic);
955 }
956
957 static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, const char *uniqueid, const char *description)
958 {
959         struct stasis_subscription_change *change;
960
961         change = ao2_alloc(sizeof(struct stasis_subscription_change), subscription_change_dtor);
962         if (!change || ast_string_field_init(change, 128)) {
963                 ao2_cleanup(change);
964                 return NULL;
965         }
966
967         ast_string_field_set(change, uniqueid, uniqueid);
968         ast_string_field_set(change, description, description);
969         ao2_ref(topic, +1);
970         change->topic = topic;
971
972         return change;
973 }
974
975 static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
976 {
977         struct stasis_subscription_change *change;
978         struct stasis_message *msg;
979
980         /* This assumes that we have already unsubscribed */
981         ast_assert(stasis_subscription_is_subscribed(sub));
982
983         if (!stasis_subscription_change_type()) {
984                 return;
985         }
986
987         change = subscription_change_alloc(topic, sub->uniqueid, "Subscribe");
988         if (!change) {
989                 return;
990         }
991
992         msg = stasis_message_create(stasis_subscription_change_type(), change);
993         if (!msg) {
994                 ao2_cleanup(change);
995                 return;
996         }
997
998         stasis_publish(topic, msg);
999         ao2_cleanup(msg);
1000         ao2_cleanup(change);
1001 }
1002
1003 static void send_subscription_unsubscribe(struct stasis_topic *topic,
1004         struct stasis_subscription *sub)
1005 {
1006         struct stasis_subscription_change *change;
1007         struct stasis_message *msg;
1008
1009         /* This assumes that we have already unsubscribed */
1010         ast_assert(!stasis_subscription_is_subscribed(sub));
1011
1012         if (!stasis_subscription_change_type()) {
1013                 return;
1014         }
1015
1016         change = subscription_change_alloc(topic, sub->uniqueid, "Unsubscribe");
1017         if (!change) {
1018                 return;
1019         }
1020
1021         msg = stasis_message_create(stasis_subscription_change_type(), change);
1022         if (!msg) {
1023                 ao2_cleanup(change);
1024                 return;
1025         }
1026
1027         stasis_publish(topic, msg);
1028
1029         /* Now we have to dispatch to the subscription itself */
1030         dispatch_message(sub, msg, 0);
1031
1032         ao2_cleanup(msg);
1033         ao2_cleanup(change);
1034 }
1035
1036 struct topic_pool_entry {
1037         struct stasis_forward *forward;
1038         struct stasis_topic *topic;
1039 };
1040
1041 static void topic_pool_entry_dtor(void *obj)
1042 {
1043         struct topic_pool_entry *entry = obj;
1044
1045         entry->forward = stasis_forward_cancel(entry->forward);
1046         ao2_cleanup(entry->topic);
1047         entry->topic = NULL;
1048 }
1049
1050 static struct topic_pool_entry *topic_pool_entry_alloc(void)
1051 {
1052         return ao2_alloc_options(sizeof(struct topic_pool_entry), topic_pool_entry_dtor,
1053                 AO2_ALLOC_OPT_LOCK_NOLOCK);
1054 }
1055
1056 struct stasis_topic_pool {
1057         struct ao2_container *pool_container;
1058         struct stasis_topic *pool_topic;
1059 };
1060
1061 static void topic_pool_dtor(void *obj)
1062 {
1063         struct stasis_topic_pool *pool = obj;
1064
1065         ao2_cleanup(pool->pool_container);
1066         pool->pool_container = NULL;
1067         ao2_cleanup(pool->pool_topic);
1068         pool->pool_topic = NULL;
1069 }
1070
1071 static int topic_pool_entry_hash(const void *obj, const int flags)
1072 {
1073         const struct topic_pool_entry *object;
1074         const char *key;
1075
1076         switch (flags & OBJ_SEARCH_MASK) {
1077         case OBJ_SEARCH_KEY:
1078                 key = obj;
1079                 break;
1080         case OBJ_SEARCH_OBJECT:
1081                 object = obj;
1082                 key = stasis_topic_name(object->topic);
1083                 break;
1084         default:
1085                 /* Hash can only work on something with a full key. */
1086                 ast_assert(0);
1087                 return 0;
1088         }
1089         return ast_str_case_hash(key);
1090 }
1091
1092 static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
1093 {
1094         const struct topic_pool_entry *object_left = obj;
1095         const struct topic_pool_entry *object_right = arg;
1096         const char *right_key = arg;
1097         int cmp;
1098
1099         switch (flags & OBJ_SEARCH_MASK) {
1100         case OBJ_SEARCH_OBJECT:
1101                 right_key = stasis_topic_name(object_right->topic);
1102                 /* Fall through */
1103         case OBJ_SEARCH_KEY:
1104                 cmp = strcasecmp(stasis_topic_name(object_left->topic), right_key);
1105                 break;
1106         case OBJ_SEARCH_PARTIAL_KEY:
1107                 /* Not supported by container */
1108                 ast_assert(0);
1109                 cmp = -1;
1110                 break;
1111         default:
1112                 /*
1113                  * What arg points to is specific to this traversal callback
1114                  * and has no special meaning to astobj2.
1115                  */
1116                 cmp = 0;
1117                 break;
1118         }
1119         if (cmp) {
1120                 return 0;
1121         }
1122         /*
1123          * At this point the traversal callback is identical to a sorted
1124          * container.
1125          */
1126         return CMP_MATCH;
1127 }
1128
1129 struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic)
1130 {
1131         struct stasis_topic_pool *pool;
1132
1133         pool = ao2_alloc_options(sizeof(*pool), topic_pool_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
1134         if (!pool) {
1135                 return NULL;
1136         }
1137
1138         pool->pool_container = ao2_container_alloc(TOPIC_POOL_BUCKETS,
1139                 topic_pool_entry_hash, topic_pool_entry_cmp);
1140         if (!pool->pool_container) {
1141                 ao2_cleanup(pool);
1142                 return NULL;
1143         }
1144         ao2_ref(pooled_topic, +1);
1145         pool->pool_topic = pooled_topic;
1146
1147         return pool;
1148 }
1149
1150 struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
1151 {
1152         RAII_VAR(struct topic_pool_entry *, topic_pool_entry, NULL, ao2_cleanup);
1153         SCOPED_AO2LOCK(topic_container_lock, pool->pool_container);
1154
1155         topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1156         if (topic_pool_entry) {
1157                 return topic_pool_entry->topic;
1158         }
1159
1160         topic_pool_entry = topic_pool_entry_alloc();
1161         if (!topic_pool_entry) {
1162                 return NULL;
1163         }
1164
1165         topic_pool_entry->topic = stasis_topic_create(topic_name);
1166         if (!topic_pool_entry->topic) {
1167                 return NULL;
1168         }
1169
1170         topic_pool_entry->forward = stasis_forward_all(topic_pool_entry->topic, pool->pool_topic);
1171         if (!topic_pool_entry->forward) {
1172                 return NULL;
1173         }
1174
1175         if (!ao2_link_flags(pool->pool_container, topic_pool_entry, OBJ_NOLOCK)) {
1176                 return NULL;
1177         }
1178
1179         return topic_pool_entry->topic;
1180 }
1181
1182 void stasis_log_bad_type_access(const char *name)
1183 {
1184 #ifdef AST_DEVMODE
1185         ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name);
1186 #endif
1187 }
1188
1189 /*! \brief A multi object blob data structure to carry user event stasis messages */
1190 struct ast_multi_object_blob {
1191         struct ast_json *blob;                             /*< A blob of JSON data */
1192         AST_VECTOR(, void *) snapshots[STASIS_UMOS_MAX];   /*< Vector of snapshots for each type */
1193 };
1194
1195 /*!
1196  * \internal
1197  * \brief Destructor for \ref ast_multi_object_blob objects
1198  */
1199 static void multi_object_blob_dtor(void *obj)
1200 {
1201         struct ast_multi_object_blob *multi = obj;
1202         int type;
1203         int i;
1204
1205         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1206                 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
1207                         ao2_cleanup(AST_VECTOR_GET(&multi->snapshots[type], i));
1208                 }
1209                 AST_VECTOR_FREE(&multi->snapshots[type]);
1210         }
1211         ast_json_unref(multi->blob);
1212 }
1213
1214 /*! \brief Create a stasis user event multi object blob */
1215 struct ast_multi_object_blob *ast_multi_object_blob_create(struct ast_json *blob)
1216 {
1217         int type;
1218         RAII_VAR(struct ast_multi_object_blob *, multi,
1219                         ao2_alloc(sizeof(*multi), multi_object_blob_dtor),
1220                         ao2_cleanup);
1221
1222         ast_assert(blob != NULL);
1223
1224         if (!multi) {
1225                 return NULL;
1226         }
1227
1228         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1229                 if (AST_VECTOR_INIT(&multi->snapshots[type], 0)) {
1230                         return NULL;
1231                 }
1232         }
1233
1234         multi->blob = ast_json_ref(blob);
1235
1236         ao2_ref(multi, +1);
1237         return multi;
1238 }
1239
1240 /*! \brief Add an object (snapshot) to the blob */
1241 void ast_multi_object_blob_add(struct ast_multi_object_blob *multi,
1242         enum stasis_user_multi_object_snapshot_type type, void *object)
1243 {
1244         if (!multi || !object) {
1245                 return;
1246         }
1247         AST_VECTOR_APPEND(&multi->snapshots[type],object);
1248 }
1249
1250 /*! \brief Publish single channel user event (for app_userevent compatibility) */
1251 void ast_multi_object_blob_single_channel_publish(struct ast_channel *chan,
1252         struct stasis_message_type *type, struct ast_json *blob)
1253 {
1254         RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
1255         RAII_VAR(struct ast_channel_snapshot *, channel_snapshot, NULL, ao2_cleanup);
1256         RAII_VAR(struct ast_multi_object_blob *, multi, NULL, ao2_cleanup);
1257
1258         if (!type) {
1259                 return;
1260         }
1261
1262         multi = ast_multi_object_blob_create(blob);
1263         if (!multi) {
1264                 return;
1265         }
1266
1267         channel_snapshot = ast_channel_snapshot_create(chan);
1268         ao2_ref(channel_snapshot, +1);
1269         ast_multi_object_blob_add(multi, STASIS_UMOS_CHANNEL, channel_snapshot);
1270
1271         message = stasis_message_create(type, multi);
1272         if (message) {
1273                 /* app_userevent still publishes to channel */
1274                 stasis_publish(ast_channel_topic(chan), message);
1275         }
1276 }
1277
1278 /*! \internal \brief convert multi object blob to ari json */
1279 static struct ast_json *multi_user_event_to_json(
1280         struct stasis_message *message,
1281         const struct stasis_message_sanitizer *sanitize)
1282 {
1283         RAII_VAR(struct ast_json *, out, NULL, ast_json_unref);
1284         struct ast_multi_object_blob *multi = stasis_message_data(message);
1285         struct ast_json *blob = multi->blob;
1286         const struct timeval *tv = stasis_message_timestamp(message);
1287         enum stasis_user_multi_object_snapshot_type type;
1288         int i;
1289
1290         out = ast_json_object_create();
1291         if (!out) {
1292                 return NULL;
1293         }
1294
1295         ast_json_object_set(out, "type", ast_json_string_create("ChannelUserevent"));
1296         ast_json_object_set(out, "timestamp", ast_json_timeval(*tv, NULL));
1297         ast_json_object_set(out, "eventname", ast_json_string_create(ast_json_string_get((ast_json_object_get(blob, "eventname")))));
1298         ast_json_object_set(out, "userevent", ast_json_deep_copy(blob));
1299
1300         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1301                 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
1302                         struct ast_json *json_object = NULL;
1303                         char *name = NULL;
1304                         void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
1305
1306                         switch (type) {
1307                         case STASIS_UMOS_CHANNEL:
1308                                 json_object = ast_channel_snapshot_to_json(snapshot, sanitize);
1309                                 name = "channel";
1310                                 break;
1311                         case STASIS_UMOS_BRIDGE:
1312                                 json_object = ast_bridge_snapshot_to_json(snapshot, sanitize);
1313                                 name = "bridge";
1314                                 break;
1315                         case STASIS_UMOS_ENDPOINT:
1316                                 json_object = ast_endpoint_snapshot_to_json(snapshot, sanitize);
1317                                 name = "endpoint";
1318                                 break;
1319                         }
1320                         if (json_object) {
1321                                 ast_json_object_set(out, name, json_object);
1322                         }
1323                 }
1324         }
1325         return ast_json_ref(out);
1326 }
1327
1328 /*! \internal \brief convert multi object blob to ami string */
1329 static struct ast_str *multi_object_blob_to_ami(void *obj)
1330 {
1331         struct ast_str *ami_str=ast_str_create(1024);
1332         struct ast_str *ami_snapshot;
1333         const struct ast_multi_object_blob *multi = obj;
1334         enum stasis_user_multi_object_snapshot_type type;
1335         int i;
1336
1337         if (!ami_str) {
1338                 return NULL;
1339         }
1340         if (!multi) {
1341                 ast_free(ami_str);
1342                 return NULL;
1343         }
1344
1345         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1346                 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
1347                         char *name = "";
1348                         void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
1349                         ami_snapshot = NULL;
1350
1351                         if (i > 0) {
1352                                 ast_asprintf(&name, "%d", i + 1);
1353                         }
1354
1355                         switch (type) {
1356                         case STASIS_UMOS_CHANNEL:
1357                                 ami_snapshot = ast_manager_build_channel_state_string_prefix(snapshot, name);
1358                                 break;
1359
1360                         case STASIS_UMOS_BRIDGE:
1361                                 ami_snapshot = ast_manager_build_bridge_state_string_prefix(snapshot, name);
1362                                 break;
1363
1364                         case STASIS_UMOS_ENDPOINT:
1365                                 /* currently not sending endpoint snapshots to AMI */
1366                                 break;
1367                         }
1368                         if (ami_snapshot) {
1369                                 ast_str_append(&ami_str, 0, "%s", ast_str_buffer(ami_snapshot));
1370                                 ast_free(ami_snapshot);
1371                         }
1372                 }
1373         }
1374
1375         return ami_str;
1376 }
1377
1378 /*! \internal \brief Callback to pass only user defined parameters from blob */
1379 static int userevent_exclusion_cb(const char *key)
1380 {
1381         if (!strcmp("eventname", key)) {
1382                 return 1;
1383         }
1384         return 0;
1385 }
1386
1387 static struct ast_manager_event_blob *multi_user_event_to_ami(
1388         struct stasis_message *message)
1389 {
1390         RAII_VAR(struct ast_str *, object_string, NULL, ast_free);
1391         RAII_VAR(struct ast_str *, body, NULL, ast_free);
1392         struct ast_multi_object_blob *multi = stasis_message_data(message);
1393         const char *eventname;
1394
1395         eventname = ast_json_string_get(ast_json_object_get(multi->blob, "eventname"));
1396         body = ast_manager_str_from_json_object(multi->blob, userevent_exclusion_cb);
1397         object_string = multi_object_blob_to_ami(multi);
1398         if (!object_string || !body) {
1399                 return NULL;
1400         }
1401
1402         return ast_manager_event_blob_create(EVENT_FLAG_USER, "UserEvent",
1403                 "%s"
1404                 "UserEvent: %s\r\n"
1405                 "%s",
1406                 ast_str_buffer(object_string),
1407                 eventname,
1408                 ast_str_buffer(body));
1409 }
1410
1411 /*! \brief A structure to hold global configuration-related options */
1412 struct stasis_declined_config {
1413         /*! The list of message types to decline */
1414         struct ao2_container *declined;
1415 };
1416
1417 /*! \brief Threadpool configuration options */
1418 struct stasis_threadpool_conf {
1419         /*! Initial size of the thread pool */
1420         int initial_size;
1421         /*! Time, in seconds, before we expire a thread */
1422         int idle_timeout_sec;
1423         /*! Maximum number of thread to allow */
1424         int max_size;
1425 };
1426
1427 struct stasis_config {
1428         /*! Thread pool configuration options */
1429         struct stasis_threadpool_conf *threadpool_options;
1430         /*! Declined message types */
1431         struct stasis_declined_config *declined_message_types;
1432 };
1433
1434 static struct aco_type threadpool_option = {
1435         .type = ACO_GLOBAL,
1436         .name = "threadpool",
1437         .item_offset = offsetof(struct stasis_config, threadpool_options),
1438         .category = "^threadpool$",
1439         .category_match = ACO_WHITELIST,
1440 };
1441
1442 static struct aco_type *threadpool_options[] = ACO_TYPES(&threadpool_option);
1443
1444 /*! \brief An aco_type structure to link the "declined_message_types" category to the stasis_declined_config type */
1445 static struct aco_type declined_option = {
1446         .type = ACO_GLOBAL,
1447         .name = "declined_message_types",
1448         .item_offset = offsetof(struct stasis_config, declined_message_types),
1449         .category_match = ACO_WHITELIST,
1450         .category = "^declined_message_types$",
1451 };
1452
1453 struct aco_type *declined_options[] = ACO_TYPES(&declined_option);
1454
1455 struct aco_file stasis_conf = {
1456         .filename = "stasis.conf",
1457         .types = ACO_TYPES(&declined_option, &threadpool_option),
1458 };
1459
1460 /*! \brief A global object container that will contain the stasis_config that gets swapped out on reloads */
1461 static AO2_GLOBAL_OBJ_STATIC(globals);
1462
1463 static void *stasis_config_alloc(void);
1464
1465 /*! \brief Register information about the configs being processed by this module */
1466 CONFIG_INFO_CORE("stasis", cfg_info, globals, stasis_config_alloc,
1467         .files = ACO_FILES(&stasis_conf),
1468 );
1469
1470 static void stasis_declined_config_destructor(void *obj)
1471 {
1472         struct stasis_declined_config *declined = obj;
1473
1474         ao2_cleanup(declined->declined);
1475 }
1476
1477 static void stasis_config_destructor(void *obj)
1478 {
1479         struct stasis_config *cfg = obj;
1480
1481         ao2_cleanup(cfg->declined_message_types);
1482         ast_free(cfg->threadpool_options);
1483 }
1484
1485 static void *stasis_config_alloc(void)
1486 {
1487         struct stasis_config *cfg;
1488
1489         if (!(cfg = ao2_alloc(sizeof(*cfg), stasis_config_destructor))) {
1490                 return NULL;
1491         }
1492
1493         cfg->threadpool_options = ast_calloc(1, sizeof(*cfg->threadpool_options));
1494         if (!cfg->threadpool_options) {
1495                 ao2_ref(cfg, -1);
1496                 return NULL;
1497         }
1498
1499         cfg->declined_message_types = ao2_alloc(sizeof(*cfg->declined_message_types),
1500                 stasis_declined_config_destructor);
1501         if (!cfg->declined_message_types) {
1502                 ao2_ref(cfg, -1);
1503                 return NULL;
1504         }
1505
1506         cfg->declined_message_types->declined = ast_str_container_alloc(13);
1507         if (!cfg->declined_message_types->declined) {
1508                 ao2_ref(cfg, -1);
1509                 return NULL;
1510         }
1511
1512         return cfg;
1513 }
1514
1515 int stasis_message_type_declined(const char *name)
1516 {
1517         RAII_VAR(struct stasis_config *, cfg, ao2_global_obj_ref(globals), ao2_cleanup);
1518         char *name_in_declined;
1519         int res;
1520
1521         if (!cfg || !cfg->declined_message_types) {
1522                 return 0;
1523         }
1524
1525         name_in_declined = ao2_find(cfg->declined_message_types->declined, name, OBJ_SEARCH_KEY);
1526         res = name_in_declined ? 1 : 0;
1527         ao2_cleanup(name_in_declined);
1528         if (res) {
1529                 ast_log(LOG_NOTICE, "Declining to allocate Stasis message type '%s' due to configuration\n", name);
1530         }
1531         return res;
1532 }
1533
1534 static int declined_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
1535 {
1536         struct stasis_declined_config *declined = obj;
1537
1538         if (ast_strlen_zero(var->value)) {
1539                 return 0;
1540         }
1541
1542         if (ast_str_container_add(declined->declined, var->value)) {
1543                 return -1;
1544         }
1545
1546         return 0;
1547 }
1548
1549 /*!
1550  * @{ \brief Define multi user event message type(s).
1551  */
1552
1553 STASIS_MESSAGE_TYPE_DEFN(ast_multi_user_event_type,
1554         .to_json = multi_user_event_to_json,
1555         .to_ami = multi_user_event_to_ami,
1556         );
1557
1558 /*! @} */
1559
1560 /*! \brief Cleanup function for graceful shutdowns */
1561 static void stasis_cleanup(void)
1562 {
1563         ast_threadpool_shutdown(pool);
1564         pool = NULL;
1565         STASIS_MESSAGE_TYPE_CLEANUP(stasis_subscription_change_type);
1566         STASIS_MESSAGE_TYPE_CLEANUP(ast_multi_user_event_type);
1567         aco_info_destroy(&cfg_info);
1568         ao2_global_obj_release(globals);
1569 }
1570
1571 int stasis_init(void)
1572 {
1573         RAII_VAR(struct stasis_config *, cfg, NULL, ao2_cleanup);
1574         int cache_init;
1575         struct ast_threadpool_options threadpool_opts = { 0, };
1576
1577         /* Be sure the types are cleaned up after the message bus */
1578         ast_register_cleanup(stasis_cleanup);
1579
1580         if (aco_info_init(&cfg_info)) {
1581                 return -1;
1582         }
1583
1584         aco_option_register_custom(&cfg_info, "decline", ACO_EXACT,
1585                 declined_options, "", declined_handler, 0);
1586         aco_option_register(&cfg_info, "initial_size", ACO_EXACT,
1587                 threadpool_options, "5", OPT_INT_T, PARSE_IN_RANGE,
1588                 FLDSET(struct stasis_threadpool_conf, initial_size), 0,
1589                 INT_MAX);
1590         aco_option_register(&cfg_info, "idle_timeout_sec", ACO_EXACT,
1591                 threadpool_options, "20", OPT_INT_T, PARSE_IN_RANGE,
1592                 FLDSET(struct stasis_threadpool_conf, idle_timeout_sec), 0,
1593                 INT_MAX);
1594         aco_option_register(&cfg_info, "max_size", ACO_EXACT,
1595                 threadpool_options, "50", OPT_INT_T, PARSE_IN_RANGE,
1596                 FLDSET(struct stasis_threadpool_conf, max_size), 0,
1597                 INT_MAX);
1598
1599         if (aco_process_config(&cfg_info, 0) == ACO_PROCESS_ERROR) {
1600                 struct stasis_config *default_cfg = stasis_config_alloc();
1601
1602                 if (!default_cfg) {
1603                         return -1;
1604                 }
1605
1606                 if (aco_set_defaults(&threadpool_option, "threadpool", default_cfg->threadpool_options)) {
1607                         ast_log(LOG_ERROR, "Failed to initialize defaults on Stasis configuration object\n");
1608                         ao2_ref(default_cfg, -1);
1609                         return -1;
1610                 }
1611
1612                 if (aco_set_defaults(&declined_option, "declined_message_types", default_cfg->declined_message_types)) {
1613                         ast_log(LOG_ERROR, "Failed to load stasis.conf and failed to initialize defaults.\n");
1614                         return -1;
1615                 }
1616
1617                 ast_log(LOG_NOTICE, "Could not load Stasis configuration; using defaults\n");
1618                 ao2_global_obj_replace_unref(globals, default_cfg);
1619                 cfg = default_cfg;
1620         } else {
1621                 cfg = ao2_global_obj_ref(globals);
1622                 if (!cfg) {
1623                         ast_log(LOG_ERROR, "Failed to obtain Stasis configuration object\n");
1624                         return -1;
1625                 }
1626         }
1627
1628         threadpool_opts.version = AST_THREADPOOL_OPTIONS_VERSION;
1629         threadpool_opts.initial_size = cfg->threadpool_options->initial_size;
1630         threadpool_opts.auto_increment = 1;
1631         threadpool_opts.max_size = cfg->threadpool_options->max_size;
1632         threadpool_opts.idle_timeout = cfg->threadpool_options->idle_timeout_sec;
1633         pool = ast_threadpool_create("stasis-core", NULL, &threadpool_opts);
1634         if (!pool) {
1635                 ast_log(LOG_ERROR, "Failed to create 'stasis-core' threadpool\n");
1636                 return -1;
1637         }
1638
1639         cache_init = stasis_cache_init();
1640         if (cache_init != 0) {
1641                 return -1;
1642         }
1643
1644         if (STASIS_MESSAGE_TYPE_INIT(stasis_subscription_change_type) != 0) {
1645                 return -1;
1646         }
1647         if (STASIS_MESSAGE_TYPE_INIT(ast_multi_user_event_type) != 0) {
1648                 return -1;
1649         }
1650
1651         return 0;
1652 }
1653