Stasis: Create human friendly taskprocessor/serializer names.
[asterisk/asterisk.git] / main / stasis.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis Message Bus API.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 ASTERISK_REGISTER_FILE();
33
34 #include "asterisk/astobj2.h"
35 #include "asterisk/stasis_internal.h"
36 #include "asterisk/stasis.h"
37 #include "asterisk/taskprocessor.h"
38 #include "asterisk/threadpool.h"
39 #include "asterisk/utils.h"
40 #include "asterisk/uuid.h"
41 #include "asterisk/vector.h"
42 #include "asterisk/stasis_channels.h"
43 #include "asterisk/stasis_bridges.h"
44 #include "asterisk/stasis_endpoints.h"
45 #include "asterisk/config_options.h"
46
47 /*** DOCUMENTATION
48         <managerEvent language="en_US" name="UserEvent">
49                 <managerEventInstance class="EVENT_FLAG_USER">
50                         <synopsis>A user defined event raised from the dialplan.</synopsis>
51                         <syntax>
52                                 <channel_snapshot/>
53                                 <parameter name="UserEvent">
54                                         <para>The event name, as specified in the dialplan.</para>
55                                 </parameter>
56                         </syntax>
57                         <description>
58                                 <para>Event may contain additional arbitrary parameters in addition to optional bridge and endpoint snapshots.  Multiple snapshots of the same type are prefixed with a numeric value.</para>
59                         </description>
60                         <see-also>
61                                 <ref type="application">UserEvent</ref>
62                         </see-also>
63                 </managerEventInstance>
64         </managerEvent>
65         <configInfo name="stasis" language="en_US">
66                 <configFile name="stasis.conf">
67                         <configObject name="threadpool">
68                                 <synopsis>Settings that configure the threadpool Stasis uses to deliver some messages.</synopsis>
69                                 <configOption name="initial_size" default="5">
70                                         <synopsis>Initial number of threads in the message bus threadpool.</synopsis>
71                                 </configOption>
72                                 <configOption name="idle_timeout_sec" default="20">
73                                         <synopsis>Number of seconds before an idle thread is disposed of.</synopsis>
74                                 </configOption>
75                                 <configOption name="max_size" default="50">
76                                         <synopsis>Maximum number of threads in the threadpool.</synopsis>
77                                 </configOption>
78                         </configObject>
79                         <configObject name="declined_message_types">
80                                 <synopsis>Stasis message types for which to decline creation.</synopsis>
81                                 <configOption name="decline">
82                                         <synopsis>The message type to decline.</synopsis>
83                                         <description>
84                                                 <para>This configuration option defines the name of the Stasis
85                                                 message type that Asterisk is forbidden from creating and can be
86                                                 specified as many times as necessary to achieve the desired result.</para>
87                                                 <enumlist>
88                                                         <enum name="stasis_app_recording_snapshot_type" />
89                                                         <enum name="stasis_app_playback_snapshot_type" />
90                                                         <enum name="stasis_test_message_type" />
91                                                         <enum name="confbridge_start_type" />
92                                                         <enum name="confbridge_end_type" />
93                                                         <enum name="confbridge_join_type" />
94                                                         <enum name="confbridge_leave_type" />
95                                                         <enum name="confbridge_start_record_type" />
96                                                         <enum name="confbridge_stop_record_type" />
97                                                         <enum name="confbridge_mute_type" />
98                                                         <enum name="confbridge_unmute_type" />
99                                                         <enum name="confbridge_talking_type" />
100                                                         <enum name="cel_generic_type" />
101                                                         <enum name="ast_bridge_snapshot_type" />
102                                                         <enum name="ast_bridge_merge_message_type" />
103                                                         <enum name="ast_channel_entered_bridge_type" />
104                                                         <enum name="ast_channel_left_bridge_type" />
105                                                         <enum name="ast_blind_transfer_type" />
106                                                         <enum name="ast_attended_transfer_type" />
107                                                         <enum name="ast_endpoint_snapshot_type" />
108                                                         <enum name="ast_endpoint_state_type" />
109                                                         <enum name="ast_device_state_message_type" />
110                                                         <enum name="ast_test_suite_message_type" />
111                                                         <enum name="ast_mwi_state_type" />
112                                                         <enum name="ast_mwi_vm_app_type" />
113                                                         <enum name="ast_format_register_type" />
114                                                         <enum name="ast_format_unregister_type" />
115                                                         <enum name="ast_manager_get_generic_type" />
116                                                         <enum name="ast_parked_call_type" />
117                                                         <enum name="ast_channel_snapshot_type" />
118                                                         <enum name="ast_channel_dial_type" />
119                                                         <enum name="ast_channel_varset_type" />
120                                                         <enum name="ast_channel_hangup_request_type" />
121                                                         <enum name="ast_channel_dtmf_begin_type" />
122                                                         <enum name="ast_channel_dtmf_end_type" />
123                                                         <enum name="ast_channel_hold_type" />
124                                                         <enum name="ast_channel_unhold_type" />
125                                                         <enum name="ast_channel_chanspy_start_type" />
126                                                         <enum name="ast_channel_chanspy_stop_type" />
127                                                         <enum name="ast_channel_fax_type" />
128                                                         <enum name="ast_channel_hangup_handler_type" />
129                                                         <enum name="ast_channel_moh_start_type" />
130                                                         <enum name="ast_channel_moh_stop_type" />
131                                                         <enum name="ast_channel_monitor_start_type" />
132                                                         <enum name="ast_channel_monitor_stop_type" />
133                                                         <enum name="ast_channel_agent_login_type" />
134                                                         <enum name="ast_channel_agent_logoff_type" />
135                                                         <enum name="ast_channel_talking_start" />
136                                                         <enum name="ast_channel_talking_stop" />
137                                                         <enum name="ast_security_event_type" />
138                                                         <enum name="ast_named_acl_change_type" />
139                                                         <enum name="ast_local_bridge_type" />
140                                                         <enum name="ast_local_optimization_begin_type" />
141                                                         <enum name="ast_local_optimization_end_type" />
142                                                         <enum name="stasis_subscription_change_type" />
143                                                         <enum name="ast_multi_user_event_type" />
144                                                         <enum name="stasis_cache_clear_type" />
145                                                         <enum name="stasis_cache_update_type" />
146                                                         <enum name="ast_network_change_type" />
147                                                         <enum name="ast_system_registry_type" />
148                                                         <enum name="ast_cc_available_type" />
149                                                         <enum name="ast_cc_offertimerstart_type" />
150                                                         <enum name="ast_cc_requested_type" />
151                                                         <enum name="ast_cc_requestacknowledged_type" />
152                                                         <enum name="ast_cc_callerstopmonitoring_type" />
153                                                         <enum name="ast_cc_callerstartmonitoring_type" />
154                                                         <enum name="ast_cc_callerrecalling_type" />
155                                                         <enum name="ast_cc_recallcomplete_type" />
156                                                         <enum name="ast_cc_failure_type" />
157                                                         <enum name="ast_cc_monitorfailed_type" />
158                                                         <enum name="ast_presence_state_message_type" />
159                                                         <enum name="ast_rtp_rtcp_sent_type" />
160                                                         <enum name="ast_rtp_rtcp_received_type" />
161                                                         <enum name="ast_call_pickup_type" />
162                                                         <enum name="aoc_s_type" />
163                                                         <enum name="aoc_d_type" />
164                                                         <enum name="aoc_e_type" />
165                                                         <enum name="dahdichannel_type" />
166                                                         <enum name="mcid_type" />
167                                                         <enum name="session_timeout_type" />
168                                                         <enum name="cdr_read_message_type" />
169                                                         <enum name="cdr_write_message_type" />
170                                                         <enum name="cdr_prop_write_message_type" />
171                                                         <enum name="corosync_ping_message_type" />
172                                                         <enum name="agi_exec_start_type" />
173                                                         <enum name="agi_exec_end_type" />
174                                                         <enum name="agi_async_start_type" />
175                                                         <enum name="agi_async_exec_type" />
176                                                         <enum name="agi_async_end_type" />
177                                                         <enum name="queue_caller_join_type" />
178                                                         <enum name="queue_caller_leave_type" />
179                                                         <enum name="queue_caller_abandon_type" />
180                                                         <enum name="queue_member_status_type" />
181                                                         <enum name="queue_member_added_type" />
182                                                         <enum name="queue_member_removed_type" />
183                                                         <enum name="queue_member_pause_type" />
184                                                         <enum name="queue_member_penalty_type" />
185                                                         <enum name="queue_member_ringinuse_type" />
186                                                         <enum name="queue_agent_called_type" />
187                                                         <enum name="queue_agent_connect_type" />
188                                                         <enum name="queue_agent_complete_type" />
189                                                         <enum name="queue_agent_dump_type" />
190                                                         <enum name="queue_agent_ringnoanswer_type" />
191                                                         <enum name="meetme_join_type" />
192                                                         <enum name="meetme_leave_type" />
193                                                         <enum name="meetme_end_type" />
194                                                         <enum name="meetme_mute_type" />
195                                                         <enum name="meetme_talking_type" />
196                                                         <enum name="meetme_talk_request_type" />
197                                                         <enum name="appcdr_message_type" />
198                                                         <enum name="forkcdr_message_type" />
199                                                         <enum name="cdr_sync_message_type" />
200                                                 </enumlist>
201                                         </description>
202                                 </configOption>
203                         </configObject>
204                 </configFile>
205         </configInfo>
206 ***/
207
208 /*!
209  * \page stasis-impl Stasis Implementation Notes
210  *
211  * \par Reference counting
212  *
213  * Stasis introduces a number of objects, which are tightly related to one
214  * another. Because we rely on ref-counting for memory management, understanding
215  * these relationships is important to understanding this code.
216  *
217  * \code{.txt}
218  *
219  *   stasis_topic <----> stasis_subscription
220  *             ^          ^
221  *              \        /
222  *               \      /
223  *               dispatch
224  *                  |
225  *                  |
226  *                  v
227  *            stasis_message
228  *                  |
229  *                  |
230  *                  v
231  *          stasis_message_type
232  *
233  * \endcode
234  *
235  * The most troubling thing in this chart is the cyclic reference between
236  * stasis_topic and stasis_subscription. This is both unfortunate, and
237  * necessary. Topics need the subscription in order to dispatch messages;
238  * subscriptions need the topics to unsubscribe and check subscription status.
239  *
240  * The cycle is broken by stasis_unsubscribe(). The unsubscribe will remove the
241  * topic's reference to a subscription. When the subcription is destroyed, it
242  * will remove its reference to the topic.
243  *
244  * This means that until a subscription has be explicitly unsubscribed, it will
245  * not be destroyed. Neither will a topic be destroyed while it has subscribers.
246  * The destructors of both have assertions regarding this to catch ref-counting
247  * problems where a subscription or topic has had an extra ao2_cleanup().
248  *
249  * The \ref dispatch object is a transient object, which is posted to a
250  * subscription's taskprocessor to send a message to the subscriber. They have
251  * short life cycles, allocated on one thread, destroyed on another.
252  *
253  * During shutdown, or the deletion of a domain object, there are a flurry of
254  * ao2_cleanup()s on subscriptions and topics, as the final in-flight messages
255  * are processed. Any one of these cleanups could be the one to actually destroy
256  * a given object, so care must be taken to ensure that an object isn't
257  * referenced after an ao2_cleanup(). This includes the implicit ao2_unlock()
258  * that might happen when a RAII_VAR() goes out of scope.
259  *
260  * \par Typical life cycles
261  *
262  *  \li stasis_topic - There are several topics which live for the duration of
263  *      the Asterisk process (ast_channel_topic_all(), etc.) but most of these
264  *      are actually fed by shorter-lived topics whose lifetime is associated
265  *      with some domain object (like ast_channel_topic() for a given
266  *      ast_channel).
267  *
268  *  \li stasis_subscription - Subscriptions have a similar mix of lifetimes as
269  *      topics, for similar reasons.
270  *
271  *  \li dispatch - Very short lived; just long enough to post a message to a
272  *      subscriber.
273  *
274  *  \li stasis_message - Short to intermediate lifetimes, but that is mostly
275  *      irrelevant. Messages are strictly data and have no behavior associated
276  *      with them, so it doesn't really matter if/when they are destroyed. By
277  *      design, a component could hold a ref to a message forever without any
278  *      ill consequences (aside from consuming more memory).
279  *
280  *  \li stasis_message_type - Long life cycles, typically only destroyed on
281  *      module unloading or _clean_ process exit.
282  *
283  * \par Subscriber shutdown sequencing
284  *
285  * Subscribers are sensitive to shutdown sequencing, specifically in how the
286  * reference message types. This is fully detailed on the wiki at
287  * https://wiki.asterisk.org/wiki/x/K4BqAQ.
288  *
289  * In short, the lifetime of the \a data (and \a callback, if in a module) must
290  * be held until the stasis_subscription_final_message() has been received.
291  * Depending on the structure of the subscriber code, this can be handled by
292  * using stasis_subscription_final_message() to free resources on the final
293  * message, or using stasis_subscription_join()/stasis_unsubscribe_and_join() to
294  * block until the unsubscribe has completed.
295  */
296
297 /*! Initial size of the subscribers list. */
298 #define INITIAL_SUBSCRIBERS_MAX 4
299
300 /*! The number of buckets to use for topic pools */
301 #define TOPIC_POOL_BUCKETS 57
302
303 /*! Thread pool for topics that don't want a dedicated taskprocessor */
304 static struct ast_threadpool *pool;
305
306 STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type);
307
308 /*! \internal */
309 struct stasis_topic {
310         char *name;
311         /*! Variable length array of the subscribers */
312         AST_VECTOR(, struct stasis_subscription *) subscribers;
313
314         /*! Topics forwarding into this topic */
315         AST_VECTOR(, struct stasis_topic *) upstream_topics;
316 };
317
318 /* Forward declarations for the tightly-coupled subscription object */
319 static int topic_add_subscription(struct stasis_topic *topic,
320         struct stasis_subscription *sub);
321
322 static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
323
324 /*! \brief Lock two topics. */
325 #define topic_lock_both(topic1, topic2) \
326         do { \
327                 ao2_lock(topic1); \
328                 while (ao2_trylock(topic2)) { \
329                         AO2_DEADLOCK_AVOIDANCE(topic1); \
330                 } \
331         } while (0)
332
333 static void topic_dtor(void *obj)
334 {
335         struct stasis_topic *topic = obj;
336
337         /* Subscribers hold a reference to topics, so they should all be
338          * unsubscribed before we get here. */
339         ast_assert(AST_VECTOR_SIZE(&topic->subscribers) == 0);
340
341         ast_free(topic->name);
342         topic->name = NULL;
343
344         AST_VECTOR_FREE(&topic->subscribers);
345         AST_VECTOR_FREE(&topic->upstream_topics);
346 }
347
348 struct stasis_topic *stasis_topic_create(const char *name)
349 {
350         struct stasis_topic *topic;
351         int res = 0;
352
353         topic = ao2_t_alloc(sizeof(*topic), topic_dtor, name);
354         if (!topic) {
355                 return NULL;
356         }
357
358         topic->name = ast_strdup(name);
359         res |= AST_VECTOR_INIT(&topic->subscribers, INITIAL_SUBSCRIBERS_MAX);
360         res |= AST_VECTOR_INIT(&topic->upstream_topics, 0);
361         if (!topic->name || res) {
362                 ao2_cleanup(topic);
363                 return NULL;
364         }
365
366         return topic;
367 }
368
369 const char *stasis_topic_name(const struct stasis_topic *topic)
370 {
371         return topic->name;
372 }
373
374 /*! \internal */
375 struct stasis_subscription {
376         /*! Unique ID for this subscription */
377         char uniqueid[AST_UUID_STR_LEN];
378         /*! Topic subscribed to. */
379         struct stasis_topic *topic;
380         /*! Mailbox for processing incoming messages. */
381         struct ast_taskprocessor *mailbox;
382         /*! Callback function for incoming message processing. */
383         stasis_subscription_cb callback;
384         /*! Data pointer to be handed to the callback. */
385         void *data;
386
387         /*! Condition for joining with subscription. */
388         ast_cond_t join_cond;
389         /*! Flag set when final message for sub has been received.
390          *  Be sure join_lock is held before reading/setting. */
391         int final_message_rxed;
392         /*! Flag set when final message for sub has been processed.
393          *  Be sure join_lock is held before reading/setting. */
394         int final_message_processed;
395 };
396
397 static void subscription_dtor(void *obj)
398 {
399         struct stasis_subscription *sub = obj;
400
401         /* Subscriptions need to be manually unsubscribed before destruction
402          * b/c there's a cyclic reference between topics and subscriptions */
403         ast_assert(!stasis_subscription_is_subscribed(sub));
404         /* If there are any messages in flight to this subscription; that would
405          * be bad. */
406         ast_assert(stasis_subscription_is_done(sub));
407
408         ao2_cleanup(sub->topic);
409         sub->topic = NULL;
410         ast_taskprocessor_unreference(sub->mailbox);
411         sub->mailbox = NULL;
412         ast_cond_destroy(&sub->join_cond);
413 }
414
415 /*!
416  * \brief Invoke the subscription's callback.
417  * \param sub Subscription to invoke.
418  * \param topic Topic message was published to.
419  * \param message Message to send.
420  */
421 static void subscription_invoke(struct stasis_subscription *sub,
422                                   struct stasis_message *message)
423 {
424         /* Notify that the final message has been received */
425         if (stasis_subscription_final_message(sub, message)) {
426                 SCOPED_AO2LOCK(lock, sub);
427
428                 sub->final_message_rxed = 1;
429                 ast_cond_signal(&sub->join_cond);
430         }
431
432         /* Since sub is mostly immutable, no need to lock sub */
433         sub->callback(sub->data, sub, message);
434
435         /* Notify that the final message has been processed */
436         if (stasis_subscription_final_message(sub, message)) {
437                 SCOPED_AO2LOCK(lock, sub);
438
439                 sub->final_message_processed = 1;
440                 ast_cond_signal(&sub->join_cond);
441         }
442 }
443
444 static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
445 static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
446
447 void stasis_subscription_cb_noop(void *data, struct stasis_subscription *sub, struct stasis_message *message)
448 {
449 }
450
451 struct stasis_subscription *internal_stasis_subscribe(
452         struct stasis_topic *topic,
453         stasis_subscription_cb callback,
454         void *data,
455         int needs_mailbox,
456         int use_thread_pool)
457 {
458         RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup);
459
460         if (!topic) {
461                 return NULL;
462         }
463
464         /* The ao2 lock is used for join_cond. */
465         sub = ao2_t_alloc(sizeof(*sub), subscription_dtor, stasis_topic_name(topic));
466         if (!sub) {
467                 return NULL;
468         }
469         ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid));
470
471         if (needs_mailbox) {
472                 char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
473
474                 /* Create name with seq number appended. */
475                 ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "sub%c:%s",
476                         use_thread_pool ? 'p' : 'm',
477                         stasis_topic_name(topic));
478
479                 /*
480                  * With a small number of subscribers, a thread-per-sub is
481                  * acceptable. For a large number of subscribers, a thread
482                  * pool should be used.
483                  */
484                 if (use_thread_pool) {
485                         sub->mailbox = ast_threadpool_serializer(tps_name, pool);
486                 } else {
487                         sub->mailbox = ast_taskprocessor_get(tps_name, TPS_REF_DEFAULT);
488                 }
489                 if (!sub->mailbox) {
490                         return NULL;
491                 }
492                 ast_taskprocessor_set_local(sub->mailbox, sub);
493                 /* Taskprocessor has a reference */
494                 ao2_ref(sub, +1);
495         }
496
497         ao2_ref(topic, +1);
498         sub->topic = topic;
499         sub->callback = callback;
500         sub->data = data;
501         ast_cond_init(&sub->join_cond, NULL);
502
503         if (topic_add_subscription(topic, sub) != 0) {
504                 return NULL;
505         }
506         send_subscription_subscribe(topic, sub);
507
508         ao2_ref(sub, +1);
509         return sub;
510 }
511
512 struct stasis_subscription *stasis_subscribe(
513         struct stasis_topic *topic,
514         stasis_subscription_cb callback,
515         void *data)
516 {
517         return internal_stasis_subscribe(topic, callback, data, 1, 0);
518 }
519
520 struct stasis_subscription *stasis_subscribe_pool(
521         struct stasis_topic *topic,
522         stasis_subscription_cb callback,
523         void *data)
524 {
525         return internal_stasis_subscribe(topic, callback, data, 1, 1);
526 }
527
528 static int sub_cleanup(void *data)
529 {
530         struct stasis_subscription *sub = data;
531         ao2_cleanup(sub);
532         return 0;
533 }
534
535 struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
536 {
537         /* The subscription may be the last ref to this topic. Hold
538          * the topic ref open until after the unlock. */
539         RAII_VAR(struct stasis_topic *, topic,
540                 ao2_bump(sub ? sub->topic : NULL), ao2_cleanup);
541
542         if (!sub) {
543                 return NULL;
544         }
545
546         /* We have to remove the subscription first, to ensure the unsubscribe
547          * is the final message */
548         if (topic_remove_subscription(sub->topic, sub) != 0) {
549                 ast_log(LOG_ERROR,
550                         "Internal error: subscription has invalid topic\n");
551                 return NULL;
552         }
553
554         /* Now let everyone know about the unsubscribe */
555         send_subscription_unsubscribe(topic, sub);
556
557         /* When all that's done, remove the ref the mailbox has on the sub */
558         if (sub->mailbox) {
559                 ast_taskprocessor_push(sub->mailbox, sub_cleanup, sub);
560         }
561
562         /* Unsubscribing unrefs the subscription */
563         ao2_cleanup(sub);
564         return NULL;
565 }
566
567 void stasis_subscription_join(struct stasis_subscription *subscription)
568 {
569         if (subscription) {
570                 SCOPED_AO2LOCK(lock, subscription);
571
572                 /* Wait until the processed flag has been set */
573                 while (!subscription->final_message_processed) {
574                         ast_cond_wait(&subscription->join_cond,
575                                 ao2_object_get_lockaddr(subscription));
576                 }
577         }
578 }
579
580 int stasis_subscription_is_done(struct stasis_subscription *subscription)
581 {
582         if (subscription) {
583                 SCOPED_AO2LOCK(lock, subscription);
584
585                 return subscription->final_message_rxed;
586         }
587
588         /* Null subscription is about as done as you can get */
589         return 1;
590 }
591
592 struct stasis_subscription *stasis_unsubscribe_and_join(
593         struct stasis_subscription *subscription)
594 {
595         if (!subscription) {
596                 return NULL;
597         }
598
599         /* Bump refcount to hold it past the unsubscribe */
600         ao2_ref(subscription, +1);
601         stasis_unsubscribe(subscription);
602         stasis_subscription_join(subscription);
603         /* Now decrement the refcount back */
604         ao2_cleanup(subscription);
605         return NULL;
606 }
607
608 int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
609 {
610         if (sub) {
611                 size_t i;
612                 struct stasis_topic *topic = sub->topic;
613                 SCOPED_AO2LOCK(lock_topic, topic);
614
615                 for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
616                         if (AST_VECTOR_GET(&topic->subscribers, i) == sub) {
617                                 return 1;
618                         }
619                 }
620         }
621
622         return 0;
623 }
624
625 const char *stasis_subscription_uniqueid(const struct stasis_subscription *sub)
626 {
627         return sub->uniqueid;
628 }
629
630 int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
631 {
632         struct stasis_subscription_change *change;
633
634         if (stasis_message_type(msg) != stasis_subscription_change_type()) {
635                 return 0;
636         }
637
638         change = stasis_message_data(msg);
639         if (strcmp("Unsubscribe", change->description)) {
640                 return 0;
641         }
642
643         if (strcmp(stasis_subscription_uniqueid(sub), change->uniqueid)) {
644                 return 0;
645         }
646
647         return 1;
648 }
649
650 /*!
651  * \brief Add a subscriber to a topic.
652  * \param topic Topic
653  * \param sub Subscriber
654  * \return 0 on success
655  * \return Non-zero on error
656  */
657 static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
658 {
659         size_t idx;
660         SCOPED_AO2LOCK(lock, topic);
661
662         /* The reference from the topic to the subscription is shared with
663          * the owner of the subscription, which will explicitly unsubscribe
664          * to release it.
665          *
666          * If we bumped the refcount here, the owner would have to unsubscribe
667          * and cleanup, which is a bit awkward. */
668         AST_VECTOR_APPEND(&topic->subscribers, sub);
669
670         for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
671                 topic_add_subscription(
672                         AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
673         }
674
675         return 0;
676 }
677
678 static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
679 {
680         size_t idx;
681         SCOPED_AO2LOCK(lock_topic, topic);
682
683         for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
684                 topic_remove_subscription(
685                         AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
686         }
687
688         return AST_VECTOR_REMOVE_ELEM_UNORDERED(&topic->subscribers, sub,
689                 AST_VECTOR_ELEM_CLEANUP_NOOP);
690 }
691
692 /*!
693  * \internal \brief Dispatch a message to a subscriber asynchronously
694  * \param local \ref ast_taskprocessor_local object
695  * \return 0
696  */
697 static int dispatch_exec_async(struct ast_taskprocessor_local *local)
698 {
699         struct stasis_subscription *sub = local->local_data;
700         struct stasis_message *message = local->data;
701
702         subscription_invoke(sub, message);
703         ao2_cleanup(message);
704
705         return 0;
706 }
707
708 /*!
709  * \internal \brief Data passed to \ref dispatch_exec_sync to synchronize
710  * a published message to a subscriber
711  */
712 struct sync_task_data {
713         ast_mutex_t lock;
714         ast_cond_t cond;
715         int complete;
716         void *task_data;
717 };
718
719 /*!
720  * \internal \brief Dispatch a message to a subscriber synchronously
721  * \param local \ref ast_taskprocessor_local object
722  * \return 0
723  */
724 static int dispatch_exec_sync(struct ast_taskprocessor_local *local)
725 {
726         struct stasis_subscription *sub = local->local_data;
727         struct sync_task_data *std = local->data;
728         struct stasis_message *message = std->task_data;
729
730         subscription_invoke(sub, message);
731         ao2_cleanup(message);
732
733         ast_mutex_lock(&std->lock);
734         std->complete = 1;
735         ast_cond_signal(&std->cond);
736         ast_mutex_unlock(&std->lock);
737
738         return 0;
739 }
740
741 /*!
742  * \internal \brief Dispatch a message to a subscriber
743  * \param sub The subscriber to dispatch to
744  * \param message The message to send
745  * \param synchronous If non-zero, synchronize on the subscriber receiving
746  * the message
747  */
748 static void dispatch_message(struct stasis_subscription *sub,
749         struct stasis_message *message,
750         int synchronous)
751 {
752         if (!sub->mailbox) {
753                 /* Dispatch directly */
754                 subscription_invoke(sub, message);
755                 return;
756         }
757
758         /* Bump the message for the taskprocessor push. This will get de-ref'd
759          * by the task processor callback.
760          */
761         ao2_bump(message);
762         if (!synchronous) {
763                 if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec_async, message)) {
764                         /* Push failed; ugh. */
765                         ast_log(LOG_ERROR, "Dropping async dispatch\n");
766                         ao2_cleanup(message);
767                 }
768         } else {
769                 struct sync_task_data std;
770
771                 ast_mutex_init(&std.lock);
772                 ast_cond_init(&std.cond, NULL);
773                 std.complete = 0;
774                 std.task_data = message;
775
776                 if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec_sync, &std)) {
777                         /* Push failed; ugh. */
778                         ast_log(LOG_ERROR, "Dropping sync dispatch\n");
779                         ao2_cleanup(message);
780                         ast_mutex_destroy(&std.lock);
781                         ast_cond_destroy(&std.cond);
782                         return;
783                 }
784
785                 ast_mutex_lock(&std.lock);
786                 while (!std.complete) {
787                         ast_cond_wait(&std.cond, &std.lock);
788                 }
789                 ast_mutex_unlock(&std.lock);
790
791                 ast_mutex_destroy(&std.lock);
792                 ast_cond_destroy(&std.cond);
793         }
794 }
795
796 /*!
797  * \internal \brief Publish a message to a topic's subscribers
798  * \brief topic The topic to publish to
799  * \brief message The message to publish
800  * \brief sync_sub An optional subscriber of the topic to publish synchronously
801  * to
802  */
803 static void publish_msg(struct stasis_topic *topic,
804         struct stasis_message *message, struct stasis_subscription *sync_sub)
805 {
806         size_t i;
807
808         ast_assert(topic != NULL);
809         ast_assert(message != NULL);
810
811         /*
812          * The topic may be unref'ed by the subscription invocation.
813          * Make sure we hold onto a reference while dispatching.
814          */
815         ao2_ref(topic, +1);
816         ao2_lock(topic);
817         for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
818                 struct stasis_subscription *sub = AST_VECTOR_GET(&topic->subscribers, i);
819
820                 ast_assert(sub != NULL);
821
822                 dispatch_message(sub, message, (sub == sync_sub));
823         }
824         ao2_unlock(topic);
825         ao2_ref(topic, -1);
826 }
827
828 void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
829 {
830         publish_msg(topic, message, NULL);
831 }
832
833 void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message)
834 {
835         ast_assert(sub != NULL);
836
837         publish_msg(sub->topic, message, sub);
838 }
839
840 /*!
841  * \brief Forwarding information
842  *
843  * Any message posted to \a from_topic is forwarded to \a to_topic.
844  *
845  * In cases where both the \a from_topic and \a to_topic need to be locked,
846  * always lock the \a to_topic first, then the \a from_topic. Lest you deadlock.
847  */
848 struct stasis_forward {
849         /*! Originating topic */
850         struct stasis_topic *from_topic;
851         /*! Destination topic */
852         struct stasis_topic *to_topic;
853 };
854
855 static void forward_dtor(void *obj)
856 {
857         struct stasis_forward *forward = obj;
858
859         ao2_cleanup(forward->from_topic);
860         forward->from_topic = NULL;
861         ao2_cleanup(forward->to_topic);
862         forward->to_topic = NULL;
863 }
864
865 struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward)
866 {
867         int idx;
868         struct stasis_topic *from;
869         struct stasis_topic *to;
870
871         if (!forward) {
872                 return NULL;
873         }
874
875         from = forward->from_topic;
876         to = forward->to_topic;
877
878         if (from && to) {
879                 topic_lock_both(to, from);
880                 AST_VECTOR_REMOVE_ELEM_UNORDERED(&to->upstream_topics, from,
881                         AST_VECTOR_ELEM_CLEANUP_NOOP);
882
883                 for (idx = 0; idx < AST_VECTOR_SIZE(&to->subscribers); ++idx) {
884                         topic_remove_subscription(from, AST_VECTOR_GET(&to->subscribers, idx));
885                 }
886                 ao2_unlock(from);
887                 ao2_unlock(to);
888         }
889
890         ao2_cleanup(forward);
891
892         return NULL;
893 }
894
895 struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic,
896         struct stasis_topic *to_topic)
897 {
898         int res;
899         size_t idx;
900         RAII_VAR(struct stasis_forward *, forward, NULL, ao2_cleanup);
901
902         if (!from_topic || !to_topic) {
903                 return NULL;
904         }
905
906         forward = ao2_alloc_options(sizeof(*forward), forward_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
907         if (!forward) {
908                 return NULL;
909         }
910
911         /* Forwards to ourselves are implicit. */
912         if (to_topic == from_topic) {
913                 return ao2_bump(forward);
914         }
915
916         forward->from_topic = ao2_bump(from_topic);
917         forward->to_topic = ao2_bump(to_topic);
918
919         topic_lock_both(to_topic, from_topic);
920         res = AST_VECTOR_APPEND(&to_topic->upstream_topics, from_topic);
921         if (res != 0) {
922                 ao2_unlock(from_topic);
923                 ao2_unlock(to_topic);
924                 return NULL;
925         }
926
927         for (idx = 0; idx < AST_VECTOR_SIZE(&to_topic->subscribers); ++idx) {
928                 topic_add_subscription(from_topic, AST_VECTOR_GET(&to_topic->subscribers, idx));
929         }
930         ao2_unlock(from_topic);
931         ao2_unlock(to_topic);
932
933         return ao2_bump(forward);
934 }
935
936 static void subscription_change_dtor(void *obj)
937 {
938         struct stasis_subscription_change *change = obj;
939
940         ast_string_field_free_memory(change);
941         ao2_cleanup(change->topic);
942 }
943
944 static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, const char *uniqueid, const char *description)
945 {
946         struct stasis_subscription_change *change;
947
948         change = ao2_alloc(sizeof(struct stasis_subscription_change), subscription_change_dtor);
949         if (!change || ast_string_field_init(change, 128)) {
950                 ao2_cleanup(change);
951                 return NULL;
952         }
953
954         ast_string_field_set(change, uniqueid, uniqueid);
955         ast_string_field_set(change, description, description);
956         ao2_ref(topic, +1);
957         change->topic = topic;
958
959         return change;
960 }
961
962 static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
963 {
964         struct stasis_subscription_change *change;
965         struct stasis_message *msg;
966
967         /* This assumes that we have already unsubscribed */
968         ast_assert(stasis_subscription_is_subscribed(sub));
969
970         if (!stasis_subscription_change_type()) {
971                 return;
972         }
973
974         change = subscription_change_alloc(topic, sub->uniqueid, "Subscribe");
975         if (!change) {
976                 return;
977         }
978
979         msg = stasis_message_create(stasis_subscription_change_type(), change);
980         if (!msg) {
981                 ao2_cleanup(change);
982                 return;
983         }
984
985         stasis_publish(topic, msg);
986         ao2_cleanup(msg);
987         ao2_cleanup(change);
988 }
989
990 static void send_subscription_unsubscribe(struct stasis_topic *topic,
991         struct stasis_subscription *sub)
992 {
993         struct stasis_subscription_change *change;
994         struct stasis_message *msg;
995
996         /* This assumes that we have already unsubscribed */
997         ast_assert(!stasis_subscription_is_subscribed(sub));
998
999         if (!stasis_subscription_change_type()) {
1000                 return;
1001         }
1002
1003         change = subscription_change_alloc(topic, sub->uniqueid, "Unsubscribe");
1004         if (!change) {
1005                 return;
1006         }
1007
1008         msg = stasis_message_create(stasis_subscription_change_type(), change);
1009         if (!msg) {
1010                 ao2_cleanup(change);
1011                 return;
1012         }
1013
1014         stasis_publish(topic, msg);
1015
1016         /* Now we have to dispatch to the subscription itself */
1017         dispatch_message(sub, msg, 0);
1018
1019         ao2_cleanup(msg);
1020         ao2_cleanup(change);
1021 }
1022
1023 struct topic_pool_entry {
1024         struct stasis_forward *forward;
1025         struct stasis_topic *topic;
1026 };
1027
1028 static void topic_pool_entry_dtor(void *obj)
1029 {
1030         struct topic_pool_entry *entry = obj;
1031
1032         entry->forward = stasis_forward_cancel(entry->forward);
1033         ao2_cleanup(entry->topic);
1034         entry->topic = NULL;
1035 }
1036
1037 static struct topic_pool_entry *topic_pool_entry_alloc(void)
1038 {
1039         return ao2_alloc_options(sizeof(struct topic_pool_entry), topic_pool_entry_dtor,
1040                 AO2_ALLOC_OPT_LOCK_NOLOCK);
1041 }
1042
1043 struct stasis_topic_pool {
1044         struct ao2_container *pool_container;
1045         struct stasis_topic *pool_topic;
1046 };
1047
1048 static void topic_pool_dtor(void *obj)
1049 {
1050         struct stasis_topic_pool *pool = obj;
1051
1052         ao2_cleanup(pool->pool_container);
1053         pool->pool_container = NULL;
1054         ao2_cleanup(pool->pool_topic);
1055         pool->pool_topic = NULL;
1056 }
1057
1058 static int topic_pool_entry_hash(const void *obj, const int flags)
1059 {
1060         const struct topic_pool_entry *object;
1061         const char *key;
1062
1063         switch (flags & OBJ_SEARCH_MASK) {
1064         case OBJ_SEARCH_KEY:
1065                 key = obj;
1066                 break;
1067         case OBJ_SEARCH_OBJECT:
1068                 object = obj;
1069                 key = stasis_topic_name(object->topic);
1070                 break;
1071         default:
1072                 /* Hash can only work on something with a full key. */
1073                 ast_assert(0);
1074                 return 0;
1075         }
1076         return ast_str_case_hash(key);
1077 }
1078
1079 static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
1080 {
1081         const struct topic_pool_entry *object_left = obj;
1082         const struct topic_pool_entry *object_right = arg;
1083         const char *right_key = arg;
1084         int cmp;
1085
1086         switch (flags & OBJ_SEARCH_MASK) {
1087         case OBJ_SEARCH_OBJECT:
1088                 right_key = stasis_topic_name(object_right->topic);
1089                 /* Fall through */
1090         case OBJ_SEARCH_KEY:
1091                 cmp = strcasecmp(stasis_topic_name(object_left->topic), right_key);
1092                 break;
1093         case OBJ_SEARCH_PARTIAL_KEY:
1094                 /* Not supported by container */
1095                 ast_assert(0);
1096                 cmp = -1;
1097                 break;
1098         default:
1099                 /*
1100                  * What arg points to is specific to this traversal callback
1101                  * and has no special meaning to astobj2.
1102                  */
1103                 cmp = 0;
1104                 break;
1105         }
1106         if (cmp) {
1107                 return 0;
1108         }
1109         /*
1110          * At this point the traversal callback is identical to a sorted
1111          * container.
1112          */
1113         return CMP_MATCH;
1114 }
1115
1116 struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic)
1117 {
1118         struct stasis_topic_pool *pool;
1119
1120         pool = ao2_alloc_options(sizeof(*pool), topic_pool_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
1121         if (!pool) {
1122                 return NULL;
1123         }
1124
1125         pool->pool_container = ao2_container_alloc(TOPIC_POOL_BUCKETS,
1126                 topic_pool_entry_hash, topic_pool_entry_cmp);
1127         if (!pool->pool_container) {
1128                 ao2_cleanup(pool);
1129                 return NULL;
1130         }
1131         ao2_ref(pooled_topic, +1);
1132         pool->pool_topic = pooled_topic;
1133
1134         return pool;
1135 }
1136
1137 struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
1138 {
1139         RAII_VAR(struct topic_pool_entry *, topic_pool_entry, NULL, ao2_cleanup);
1140         SCOPED_AO2LOCK(topic_container_lock, pool->pool_container);
1141
1142         topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1143         if (topic_pool_entry) {
1144                 return topic_pool_entry->topic;
1145         }
1146
1147         topic_pool_entry = topic_pool_entry_alloc();
1148         if (!topic_pool_entry) {
1149                 return NULL;
1150         }
1151
1152         topic_pool_entry->topic = stasis_topic_create(topic_name);
1153         if (!topic_pool_entry->topic) {
1154                 return NULL;
1155         }
1156
1157         topic_pool_entry->forward = stasis_forward_all(topic_pool_entry->topic, pool->pool_topic);
1158         if (!topic_pool_entry->forward) {
1159                 return NULL;
1160         }
1161
1162         if (!ao2_link_flags(pool->pool_container, topic_pool_entry, OBJ_NOLOCK)) {
1163                 return NULL;
1164         }
1165
1166         return topic_pool_entry->topic;
1167 }
1168
1169 void stasis_log_bad_type_access(const char *name)
1170 {
1171 #ifdef AST_DEVMODE
1172         ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name);
1173 #endif
1174 }
1175
1176 /*! \brief A multi object blob data structure to carry user event stasis messages */
1177 struct ast_multi_object_blob {
1178         struct ast_json *blob;                             /*< A blob of JSON data */
1179         AST_VECTOR(, void *) snapshots[STASIS_UMOS_MAX];   /*< Vector of snapshots for each type */
1180 };
1181
1182 /*!
1183  * \internal
1184  * \brief Destructor for \ref ast_multi_object_blob objects
1185  */
1186 static void multi_object_blob_dtor(void *obj)
1187 {
1188         struct ast_multi_object_blob *multi = obj;
1189         int type;
1190         int i;
1191
1192         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1193                 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
1194                         ao2_cleanup(AST_VECTOR_GET(&multi->snapshots[type], i));
1195                 }
1196                 AST_VECTOR_FREE(&multi->snapshots[type]);
1197         }
1198         ast_json_unref(multi->blob);
1199 }
1200
1201 /*! \brief Create a stasis user event multi object blob */
1202 struct ast_multi_object_blob *ast_multi_object_blob_create(struct ast_json *blob)
1203 {
1204         int type;
1205         RAII_VAR(struct ast_multi_object_blob *, multi,
1206                         ao2_alloc(sizeof(*multi), multi_object_blob_dtor),
1207                         ao2_cleanup);
1208
1209         ast_assert(blob != NULL);
1210
1211         if (!multi) {
1212                 return NULL;
1213         }
1214
1215         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1216                 if (AST_VECTOR_INIT(&multi->snapshots[type], 0)) {
1217                         return NULL;
1218                 }
1219         }
1220
1221         multi->blob = ast_json_ref(blob);
1222
1223         ao2_ref(multi, +1);
1224         return multi;
1225 }
1226
1227 /*! \brief Add an object (snapshot) to the blob */
1228 void ast_multi_object_blob_add(struct ast_multi_object_blob *multi,
1229         enum stasis_user_multi_object_snapshot_type type, void *object)
1230 {
1231         if (!multi || !object) {
1232                 return;
1233         }
1234         AST_VECTOR_APPEND(&multi->snapshots[type],object);
1235 }
1236
1237 /*! \brief Publish single channel user event (for app_userevent compatibility) */
1238 void ast_multi_object_blob_single_channel_publish(struct ast_channel *chan,
1239         struct stasis_message_type *type, struct ast_json *blob)
1240 {
1241         RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
1242         RAII_VAR(struct ast_channel_snapshot *, channel_snapshot, NULL, ao2_cleanup);
1243         RAII_VAR(struct ast_multi_object_blob *, multi, NULL, ao2_cleanup);
1244
1245         if (!type) {
1246                 return;
1247         }
1248
1249         multi = ast_multi_object_blob_create(blob);
1250         if (!multi) {
1251                 return;
1252         }
1253
1254         channel_snapshot = ast_channel_snapshot_create(chan);
1255         ao2_ref(channel_snapshot, +1);
1256         ast_multi_object_blob_add(multi, STASIS_UMOS_CHANNEL, channel_snapshot);
1257
1258         message = stasis_message_create(type, multi);
1259         if (message) {
1260                 /* app_userevent still publishes to channel */
1261                 stasis_publish(ast_channel_topic(chan), message);
1262         }
1263 }
1264
1265 /*! \internal \brief convert multi object blob to ari json */
1266 static struct ast_json *multi_user_event_to_json(
1267         struct stasis_message *message,
1268         const struct stasis_message_sanitizer *sanitize)
1269 {
1270         RAII_VAR(struct ast_json *, out, NULL, ast_json_unref);
1271         struct ast_multi_object_blob *multi = stasis_message_data(message);
1272         struct ast_json *blob = multi->blob;
1273         const struct timeval *tv = stasis_message_timestamp(message);
1274         enum stasis_user_multi_object_snapshot_type type;
1275         int i;
1276
1277         out = ast_json_object_create();
1278         if (!out) {
1279                 return NULL;
1280         }
1281
1282         ast_json_object_set(out, "type", ast_json_string_create("ChannelUserevent"));
1283         ast_json_object_set(out, "timestamp", ast_json_timeval(*tv, NULL));
1284         ast_json_object_set(out, "eventname", ast_json_string_create(ast_json_string_get((ast_json_object_get(blob, "eventname")))));
1285         ast_json_object_set(out, "userevent", ast_json_deep_copy(blob));
1286
1287         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1288                 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
1289                         struct ast_json *json_object = NULL;
1290                         char *name = NULL;
1291                         void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
1292
1293                         switch (type) {
1294                         case STASIS_UMOS_CHANNEL:
1295                                 json_object = ast_channel_snapshot_to_json(snapshot, sanitize);
1296                                 name = "channel";
1297                                 break;
1298                         case STASIS_UMOS_BRIDGE:
1299                                 json_object = ast_bridge_snapshot_to_json(snapshot, sanitize);
1300                                 name = "bridge";
1301                                 break;
1302                         case STASIS_UMOS_ENDPOINT:
1303                                 json_object = ast_endpoint_snapshot_to_json(snapshot, sanitize);
1304                                 name = "endpoint";
1305                                 break;
1306                         }
1307                         if (json_object) {
1308                                 ast_json_object_set(out, name, json_object);
1309                         }
1310                 }
1311         }
1312         return ast_json_ref(out);
1313 }
1314
1315 /*! \internal \brief convert multi object blob to ami string */
1316 static struct ast_str *multi_object_blob_to_ami(void *obj)
1317 {
1318         struct ast_str *ami_str=ast_str_create(1024);
1319         struct ast_str *ami_snapshot;
1320         const struct ast_multi_object_blob *multi = obj;
1321         enum stasis_user_multi_object_snapshot_type type;
1322         int i;
1323
1324         if (!ami_str) {
1325                 return NULL;
1326         }
1327         if (!multi) {
1328                 ast_free(ami_str);
1329                 return NULL;
1330         }
1331
1332         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1333                 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
1334                         char *name = "";
1335                         void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
1336                         ami_snapshot = NULL;
1337
1338                         if (i > 0) {
1339                                 ast_asprintf(&name, "%d", i + 1);
1340                         }
1341
1342                         switch (type) {
1343                         case STASIS_UMOS_CHANNEL:
1344                                 ami_snapshot = ast_manager_build_channel_state_string_prefix(snapshot, name);
1345                                 break;
1346
1347                         case STASIS_UMOS_BRIDGE:
1348                                 ami_snapshot = ast_manager_build_bridge_state_string_prefix(snapshot, name);
1349                                 break;
1350
1351                         case STASIS_UMOS_ENDPOINT:
1352                                 /* currently not sending endpoint snapshots to AMI */
1353                                 break;
1354                         }
1355                         if (ami_snapshot) {
1356                                 ast_str_append(&ami_str, 0, "%s", ast_str_buffer(ami_snapshot));
1357                                 ast_free(ami_snapshot);
1358                         }
1359                 }
1360         }
1361
1362         return ami_str;
1363 }
1364
1365 /*! \internal \brief Callback to pass only user defined parameters from blob */
1366 static int userevent_exclusion_cb(const char *key)
1367 {
1368         if (!strcmp("eventname", key)) {
1369                 return 1;
1370         }
1371         return 0;
1372 }
1373
1374 static struct ast_manager_event_blob *multi_user_event_to_ami(
1375         struct stasis_message *message)
1376 {
1377         RAII_VAR(struct ast_str *, object_string, NULL, ast_free);
1378         RAII_VAR(struct ast_str *, body, NULL, ast_free);
1379         struct ast_multi_object_blob *multi = stasis_message_data(message);
1380         const char *eventname;
1381
1382         eventname = ast_json_string_get(ast_json_object_get(multi->blob, "eventname"));
1383         body = ast_manager_str_from_json_object(multi->blob, userevent_exclusion_cb);
1384         object_string = multi_object_blob_to_ami(multi);
1385         if (!object_string || !body) {
1386                 return NULL;
1387         }
1388
1389         return ast_manager_event_blob_create(EVENT_FLAG_USER, "UserEvent",
1390                 "%s"
1391                 "UserEvent: %s\r\n"
1392                 "%s",
1393                 ast_str_buffer(object_string),
1394                 eventname,
1395                 ast_str_buffer(body));
1396 }
1397
1398 /*! \brief A structure to hold global configuration-related options */
1399 struct stasis_declined_config {
1400         /*! The list of message types to decline */
1401         struct ao2_container *declined;
1402 };
1403
1404 /*! \brief Threadpool configuration options */
1405 struct stasis_threadpool_conf {
1406         /*! Initial size of the thread pool */
1407         int initial_size;
1408         /*! Time, in seconds, before we expire a thread */
1409         int idle_timeout_sec;
1410         /*! Maximum number of thread to allow */
1411         int max_size;
1412 };
1413
1414 struct stasis_config {
1415         /*! Thread pool configuration options */
1416         struct stasis_threadpool_conf *threadpool_options;
1417         /*! Declined message types */
1418         struct stasis_declined_config *declined_message_types;
1419 };
1420
1421 static struct aco_type threadpool_option = {
1422         .type = ACO_GLOBAL,
1423         .name = "threadpool",
1424         .item_offset = offsetof(struct stasis_config, threadpool_options),
1425         .category = "^threadpool$",
1426         .category_match = ACO_WHITELIST,
1427 };
1428
1429 static struct aco_type *threadpool_options[] = ACO_TYPES(&threadpool_option);
1430
1431 /*! \brief An aco_type structure to link the "declined_message_types" category to the stasis_declined_config type */
1432 static struct aco_type declined_option = {
1433         .type = ACO_GLOBAL,
1434         .name = "declined_message_types",
1435         .item_offset = offsetof(struct stasis_config, declined_message_types),
1436         .category_match = ACO_WHITELIST,
1437         .category = "^declined_message_types$",
1438 };
1439
1440 struct aco_type *declined_options[] = ACO_TYPES(&declined_option);
1441
1442 struct aco_file stasis_conf = {
1443         .filename = "stasis.conf",
1444         .types = ACO_TYPES(&declined_option, &threadpool_option),
1445 };
1446
1447 /*! \brief A global object container that will contain the stasis_config that gets swapped out on reloads */
1448 static AO2_GLOBAL_OBJ_STATIC(globals);
1449
1450 static void *stasis_config_alloc(void);
1451
1452 /*! \brief Register information about the configs being processed by this module */
1453 CONFIG_INFO_CORE("stasis", cfg_info, globals, stasis_config_alloc,
1454         .files = ACO_FILES(&stasis_conf),
1455 );
1456
1457 static void stasis_declined_config_destructor(void *obj)
1458 {
1459         struct stasis_declined_config *declined = obj;
1460
1461         ao2_cleanup(declined->declined);
1462 }
1463
1464 static void stasis_config_destructor(void *obj)
1465 {
1466         struct stasis_config *cfg = obj;
1467
1468         ao2_cleanup(cfg->declined_message_types);
1469         ast_free(cfg->threadpool_options);
1470 }
1471
1472 static void *stasis_config_alloc(void)
1473 {
1474         struct stasis_config *cfg;
1475
1476         if (!(cfg = ao2_alloc(sizeof(*cfg), stasis_config_destructor))) {
1477                 return NULL;
1478         }
1479
1480         cfg->threadpool_options = ast_calloc(1, sizeof(*cfg->threadpool_options));
1481         if (!cfg->threadpool_options) {
1482                 ao2_ref(cfg, -1);
1483                 return NULL;
1484         }
1485
1486         cfg->declined_message_types = ao2_alloc(sizeof(*cfg->declined_message_types),
1487                 stasis_declined_config_destructor);
1488         if (!cfg->declined_message_types) {
1489                 ao2_ref(cfg, -1);
1490                 return NULL;
1491         }
1492
1493         cfg->declined_message_types->declined = ast_str_container_alloc(13);
1494         if (!cfg->declined_message_types->declined) {
1495                 ao2_ref(cfg, -1);
1496                 return NULL;
1497         }
1498
1499         return cfg;
1500 }
1501
1502 int stasis_message_type_declined(const char *name)
1503 {
1504         RAII_VAR(struct stasis_config *, cfg, ao2_global_obj_ref(globals), ao2_cleanup);
1505         char *name_in_declined;
1506         int res;
1507
1508         if (!cfg || !cfg->declined_message_types) {
1509                 return 0;
1510         }
1511
1512         name_in_declined = ao2_find(cfg->declined_message_types->declined, name, OBJ_SEARCH_KEY);
1513         res = name_in_declined ? 1 : 0;
1514         ao2_cleanup(name_in_declined);
1515         if (res) {
1516                 ast_log(LOG_NOTICE, "Declining to allocate Stasis message type '%s' due to configuration\n", name);
1517         }
1518         return res;
1519 }
1520
1521 static int declined_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
1522 {
1523         struct stasis_declined_config *declined = obj;
1524
1525         if (ast_strlen_zero(var->value)) {
1526                 return 0;
1527         }
1528
1529         if (ast_str_container_add(declined->declined, var->value)) {
1530                 return -1;
1531         }
1532
1533         return 0;
1534 }
1535
1536 /*!
1537  * @{ \brief Define multi user event message type(s).
1538  */
1539
1540 STASIS_MESSAGE_TYPE_DEFN(ast_multi_user_event_type,
1541         .to_json = multi_user_event_to_json,
1542         .to_ami = multi_user_event_to_ami,
1543         );
1544
1545 /*! @} */
1546
1547 /*! \brief Cleanup function for graceful shutdowns */
1548 static void stasis_cleanup(void)
1549 {
1550         ast_threadpool_shutdown(pool);
1551         pool = NULL;
1552         STASIS_MESSAGE_TYPE_CLEANUP(stasis_subscription_change_type);
1553         STASIS_MESSAGE_TYPE_CLEANUP(ast_multi_user_event_type);
1554         aco_info_destroy(&cfg_info);
1555         ao2_global_obj_release(globals);
1556 }
1557
1558 int stasis_init(void)
1559 {
1560         RAII_VAR(struct stasis_config *, cfg, NULL, ao2_cleanup);
1561         int cache_init;
1562         struct ast_threadpool_options threadpool_opts = { 0, };
1563
1564         /* Be sure the types are cleaned up after the message bus */
1565         ast_register_cleanup(stasis_cleanup);
1566
1567         if (aco_info_init(&cfg_info)) {
1568                 return -1;
1569         }
1570
1571         aco_option_register_custom(&cfg_info, "decline", ACO_EXACT,
1572                 declined_options, "", declined_handler, 0);
1573         aco_option_register(&cfg_info, "initial_size", ACO_EXACT,
1574                 threadpool_options, "5", OPT_INT_T, PARSE_IN_RANGE,
1575                 FLDSET(struct stasis_threadpool_conf, initial_size), 0,
1576                 INT_MAX);
1577         aco_option_register(&cfg_info, "idle_timeout_sec", ACO_EXACT,
1578                 threadpool_options, "20", OPT_INT_T, PARSE_IN_RANGE,
1579                 FLDSET(struct stasis_threadpool_conf, idle_timeout_sec), 0,
1580                 INT_MAX);
1581         aco_option_register(&cfg_info, "max_size", ACO_EXACT,
1582                 threadpool_options, "50", OPT_INT_T, PARSE_IN_RANGE,
1583                 FLDSET(struct stasis_threadpool_conf, max_size), 0,
1584                 INT_MAX);
1585
1586         if (aco_process_config(&cfg_info, 0) == ACO_PROCESS_ERROR) {
1587                 struct stasis_config *default_cfg = stasis_config_alloc();
1588
1589                 if (!default_cfg) {
1590                         return -1;
1591                 }
1592
1593                 if (aco_set_defaults(&threadpool_option, "threadpool", default_cfg->threadpool_options)) {
1594                         ast_log(LOG_ERROR, "Failed to initialize defaults on Stasis configuration object\n");
1595                         ao2_ref(default_cfg, -1);
1596                         return -1;
1597                 }
1598
1599                 if (aco_set_defaults(&declined_option, "declined_message_types", default_cfg->declined_message_types)) {
1600                         ast_log(LOG_ERROR, "Failed to load stasis.conf and failed to initialize defaults.\n");
1601                         return -1;
1602                 }
1603
1604                 ast_log(LOG_NOTICE, "Could not load Stasis configuration; using defaults\n");
1605                 ao2_global_obj_replace_unref(globals, default_cfg);
1606                 cfg = default_cfg;
1607         } else {
1608                 cfg = ao2_global_obj_ref(globals);
1609                 if (!cfg) {
1610                         ast_log(LOG_ERROR, "Failed to obtain Stasis configuration object\n");
1611                         return -1;
1612                 }
1613         }
1614
1615         threadpool_opts.version = AST_THREADPOOL_OPTIONS_VERSION;
1616         threadpool_opts.initial_size = cfg->threadpool_options->initial_size;
1617         threadpool_opts.auto_increment = 1;
1618         threadpool_opts.max_size = cfg->threadpool_options->max_size;
1619         threadpool_opts.idle_timeout = cfg->threadpool_options->idle_timeout_sec;
1620         pool = ast_threadpool_create("stasis-core", NULL, &threadpool_opts);
1621         if (!pool) {
1622                 ast_log(LOG_ERROR, "Failed to create 'stasis-core' threadpool\n");
1623                 return -1;
1624         }
1625
1626         cache_init = stasis_cache_init();
1627         if (cache_init != 0) {
1628                 return -1;
1629         }
1630
1631         if (STASIS_MESSAGE_TYPE_INIT(stasis_subscription_change_type) != 0) {
1632                 return -1;
1633         }
1634         if (STASIS_MESSAGE_TYPE_INIT(ast_multi_user_event_type) != 0) {
1635                 return -1;
1636         }
1637
1638         return 0;
1639 }
1640