stasis: Add setting subscription congestion levels.
[asterisk/asterisk.git] / main / stasis.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis Message Bus API.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 ASTERISK_REGISTER_FILE();
33
34 #include "asterisk/astobj2.h"
35 #include "asterisk/stasis_internal.h"
36 #include "asterisk/stasis.h"
37 #include "asterisk/taskprocessor.h"
38 #include "asterisk/threadpool.h"
39 #include "asterisk/utils.h"
40 #include "asterisk/uuid.h"
41 #include "asterisk/vector.h"
42 #include "asterisk/stasis_channels.h"
43 #include "asterisk/stasis_bridges.h"
44 #include "asterisk/stasis_endpoints.h"
45 #include "asterisk/config_options.h"
46
47 /*** DOCUMENTATION
48         <managerEvent language="en_US" name="UserEvent">
49                 <managerEventInstance class="EVENT_FLAG_USER">
50                         <synopsis>A user defined event raised from the dialplan.</synopsis>
51                         <syntax>
52                                 <channel_snapshot/>
53                                 <parameter name="UserEvent">
54                                         <para>The event name, as specified in the dialplan.</para>
55                                 </parameter>
56                         </syntax>
57                         <description>
58                                 <para>Event may contain additional arbitrary parameters in addition to optional bridge and endpoint snapshots.  Multiple snapshots of the same type are prefixed with a numeric value.</para>
59                         </description>
60                         <see-also>
61                                 <ref type="application">UserEvent</ref>
62                         </see-also>
63                 </managerEventInstance>
64         </managerEvent>
65         <configInfo name="stasis" language="en_US">
66                 <configFile name="stasis.conf">
67                         <configObject name="threadpool">
68                                 <synopsis>Settings that configure the threadpool Stasis uses to deliver some messages.</synopsis>
69                                 <configOption name="initial_size" default="5">
70                                         <synopsis>Initial number of threads in the message bus threadpool.</synopsis>
71                                 </configOption>
72                                 <configOption name="idle_timeout_sec" default="20">
73                                         <synopsis>Number of seconds before an idle thread is disposed of.</synopsis>
74                                 </configOption>
75                                 <configOption name="max_size" default="50">
76                                         <synopsis>Maximum number of threads in the threadpool.</synopsis>
77                                 </configOption>
78                         </configObject>
79                         <configObject name="declined_message_types">
80                                 <synopsis>Stasis message types for which to decline creation.</synopsis>
81                                 <configOption name="decline">
82                                         <synopsis>The message type to decline.</synopsis>
83                                         <description>
84                                                 <para>This configuration option defines the name of the Stasis
85                                                 message type that Asterisk is forbidden from creating and can be
86                                                 specified as many times as necessary to achieve the desired result.</para>
87                                                 <enumlist>
88                                                         <enum name="stasis_app_recording_snapshot_type" />
89                                                         <enum name="stasis_app_playback_snapshot_type" />
90                                                         <enum name="stasis_test_message_type" />
91                                                         <enum name="confbridge_start_type" />
92                                                         <enum name="confbridge_end_type" />
93                                                         <enum name="confbridge_join_type" />
94                                                         <enum name="confbridge_leave_type" />
95                                                         <enum name="confbridge_start_record_type" />
96                                                         <enum name="confbridge_stop_record_type" />
97                                                         <enum name="confbridge_mute_type" />
98                                                         <enum name="confbridge_unmute_type" />
99                                                         <enum name="confbridge_talking_type" />
100                                                         <enum name="cel_generic_type" />
101                                                         <enum name="ast_bridge_snapshot_type" />
102                                                         <enum name="ast_bridge_merge_message_type" />
103                                                         <enum name="ast_channel_entered_bridge_type" />
104                                                         <enum name="ast_channel_left_bridge_type" />
105                                                         <enum name="ast_blind_transfer_type" />
106                                                         <enum name="ast_attended_transfer_type" />
107                                                         <enum name="ast_endpoint_snapshot_type" />
108                                                         <enum name="ast_endpoint_state_type" />
109                                                         <enum name="ast_device_state_message_type" />
110                                                         <enum name="ast_test_suite_message_type" />
111                                                         <enum name="ast_mwi_state_type" />
112                                                         <enum name="ast_mwi_vm_app_type" />
113                                                         <enum name="ast_format_register_type" />
114                                                         <enum name="ast_format_unregister_type" />
115                                                         <enum name="ast_manager_get_generic_type" />
116                                                         <enum name="ast_parked_call_type" />
117                                                         <enum name="ast_channel_snapshot_type" />
118                                                         <enum name="ast_channel_dial_type" />
119                                                         <enum name="ast_channel_varset_type" />
120                                                         <enum name="ast_channel_hangup_request_type" />
121                                                         <enum name="ast_channel_dtmf_begin_type" />
122                                                         <enum name="ast_channel_dtmf_end_type" />
123                                                         <enum name="ast_channel_hold_type" />
124                                                         <enum name="ast_channel_unhold_type" />
125                                                         <enum name="ast_channel_chanspy_start_type" />
126                                                         <enum name="ast_channel_chanspy_stop_type" />
127                                                         <enum name="ast_channel_fax_type" />
128                                                         <enum name="ast_channel_hangup_handler_type" />
129                                                         <enum name="ast_channel_moh_start_type" />
130                                                         <enum name="ast_channel_moh_stop_type" />
131                                                         <enum name="ast_channel_monitor_start_type" />
132                                                         <enum name="ast_channel_monitor_stop_type" />
133                                                         <enum name="ast_channel_agent_login_type" />
134                                                         <enum name="ast_channel_agent_logoff_type" />
135                                                         <enum name="ast_channel_talking_start" />
136                                                         <enum name="ast_channel_talking_stop" />
137                                                         <enum name="ast_security_event_type" />
138                                                         <enum name="ast_named_acl_change_type" />
139                                                         <enum name="ast_local_bridge_type" />
140                                                         <enum name="ast_local_optimization_begin_type" />
141                                                         <enum name="ast_local_optimization_end_type" />
142                                                         <enum name="stasis_subscription_change_type" />
143                                                         <enum name="ast_multi_user_event_type" />
144                                                         <enum name="stasis_cache_clear_type" />
145                                                         <enum name="stasis_cache_update_type" />
146                                                         <enum name="ast_network_change_type" />
147                                                         <enum name="ast_system_registry_type" />
148                                                         <enum name="ast_cc_available_type" />
149                                                         <enum name="ast_cc_offertimerstart_type" />
150                                                         <enum name="ast_cc_requested_type" />
151                                                         <enum name="ast_cc_requestacknowledged_type" />
152                                                         <enum name="ast_cc_callerstopmonitoring_type" />
153                                                         <enum name="ast_cc_callerstartmonitoring_type" />
154                                                         <enum name="ast_cc_callerrecalling_type" />
155                                                         <enum name="ast_cc_recallcomplete_type" />
156                                                         <enum name="ast_cc_failure_type" />
157                                                         <enum name="ast_cc_monitorfailed_type" />
158                                                         <enum name="ast_presence_state_message_type" />
159                                                         <enum name="ast_rtp_rtcp_sent_type" />
160                                                         <enum name="ast_rtp_rtcp_received_type" />
161                                                         <enum name="ast_call_pickup_type" />
162                                                         <enum name="aoc_s_type" />
163                                                         <enum name="aoc_d_type" />
164                                                         <enum name="aoc_e_type" />
165                                                         <enum name="dahdichannel_type" />
166                                                         <enum name="mcid_type" />
167                                                         <enum name="session_timeout_type" />
168                                                         <enum name="cdr_read_message_type" />
169                                                         <enum name="cdr_write_message_type" />
170                                                         <enum name="cdr_prop_write_message_type" />
171                                                         <enum name="corosync_ping_message_type" />
172                                                         <enum name="agi_exec_start_type" />
173                                                         <enum name="agi_exec_end_type" />
174                                                         <enum name="agi_async_start_type" />
175                                                         <enum name="agi_async_exec_type" />
176                                                         <enum name="agi_async_end_type" />
177                                                         <enum name="queue_caller_join_type" />
178                                                         <enum name="queue_caller_leave_type" />
179                                                         <enum name="queue_caller_abandon_type" />
180                                                         <enum name="queue_member_status_type" />
181                                                         <enum name="queue_member_added_type" />
182                                                         <enum name="queue_member_removed_type" />
183                                                         <enum name="queue_member_pause_type" />
184                                                         <enum name="queue_member_penalty_type" />
185                                                         <enum name="queue_member_ringinuse_type" />
186                                                         <enum name="queue_agent_called_type" />
187                                                         <enum name="queue_agent_connect_type" />
188                                                         <enum name="queue_agent_complete_type" />
189                                                         <enum name="queue_agent_dump_type" />
190                                                         <enum name="queue_agent_ringnoanswer_type" />
191                                                         <enum name="meetme_join_type" />
192                                                         <enum name="meetme_leave_type" />
193                                                         <enum name="meetme_end_type" />
194                                                         <enum name="meetme_mute_type" />
195                                                         <enum name="meetme_talking_type" />
196                                                         <enum name="meetme_talk_request_type" />
197                                                         <enum name="appcdr_message_type" />
198                                                         <enum name="forkcdr_message_type" />
199                                                         <enum name="cdr_sync_message_type" />
200                                                 </enumlist>
201                                         </description>
202                                 </configOption>
203                         </configObject>
204                 </configFile>
205         </configInfo>
206 ***/
207
208 /*!
209  * \page stasis-impl Stasis Implementation Notes
210  *
211  * \par Reference counting
212  *
213  * Stasis introduces a number of objects, which are tightly related to one
214  * another. Because we rely on ref-counting for memory management, understanding
215  * these relationships is important to understanding this code.
216  *
217  * \code{.txt}
218  *
219  *   stasis_topic <----> stasis_subscription
220  *             ^          ^
221  *              \        /
222  *               \      /
223  *               dispatch
224  *                  |
225  *                  |
226  *                  v
227  *            stasis_message
228  *                  |
229  *                  |
230  *                  v
231  *          stasis_message_type
232  *
233  * \endcode
234  *
235  * The most troubling thing in this chart is the cyclic reference between
236  * stasis_topic and stasis_subscription. This is both unfortunate, and
237  * necessary. Topics need the subscription in order to dispatch messages;
238  * subscriptions need the topics to unsubscribe and check subscription status.
239  *
240  * The cycle is broken by stasis_unsubscribe(). The unsubscribe will remove the
241  * topic's reference to a subscription. When the subcription is destroyed, it
242  * will remove its reference to the topic.
243  *
244  * This means that until a subscription has be explicitly unsubscribed, it will
245  * not be destroyed. Neither will a topic be destroyed while it has subscribers.
246  * The destructors of both have assertions regarding this to catch ref-counting
247  * problems where a subscription or topic has had an extra ao2_cleanup().
248  *
249  * The \ref dispatch object is a transient object, which is posted to a
250  * subscription's taskprocessor to send a message to the subscriber. They have
251  * short life cycles, allocated on one thread, destroyed on another.
252  *
253  * During shutdown, or the deletion of a domain object, there are a flurry of
254  * ao2_cleanup()s on subscriptions and topics, as the final in-flight messages
255  * are processed. Any one of these cleanups could be the one to actually destroy
256  * a given object, so care must be taken to ensure that an object isn't
257  * referenced after an ao2_cleanup(). This includes the implicit ao2_unlock()
258  * that might happen when a RAII_VAR() goes out of scope.
259  *
260  * \par Typical life cycles
261  *
262  *  \li stasis_topic - There are several topics which live for the duration of
263  *      the Asterisk process (ast_channel_topic_all(), etc.) but most of these
264  *      are actually fed by shorter-lived topics whose lifetime is associated
265  *      with some domain object (like ast_channel_topic() for a given
266  *      ast_channel).
267  *
268  *  \li stasis_subscription - Subscriptions have a similar mix of lifetimes as
269  *      topics, for similar reasons.
270  *
271  *  \li dispatch - Very short lived; just long enough to post a message to a
272  *      subscriber.
273  *
274  *  \li stasis_message - Short to intermediate lifetimes, but that is mostly
275  *      irrelevant. Messages are strictly data and have no behavior associated
276  *      with them, so it doesn't really matter if/when they are destroyed. By
277  *      design, a component could hold a ref to a message forever without any
278  *      ill consequences (aside from consuming more memory).
279  *
280  *  \li stasis_message_type - Long life cycles, typically only destroyed on
281  *      module unloading or _clean_ process exit.
282  *
283  * \par Subscriber shutdown sequencing
284  *
285  * Subscribers are sensitive to shutdown sequencing, specifically in how the
286  * reference message types. This is fully detailed on the wiki at
287  * https://wiki.asterisk.org/wiki/x/K4BqAQ.
288  *
289  * In short, the lifetime of the \a data (and \a callback, if in a module) must
290  * be held until the stasis_subscription_final_message() has been received.
291  * Depending on the structure of the subscriber code, this can be handled by
292  * using stasis_subscription_final_message() to free resources on the final
293  * message, or using stasis_subscription_join()/stasis_unsubscribe_and_join() to
294  * block until the unsubscribe has completed.
295  */
296
297 /*! Initial size of the subscribers list. */
298 #define INITIAL_SUBSCRIBERS_MAX 4
299
300 /*! The number of buckets to use for topic pools */
301 #define TOPIC_POOL_BUCKETS 57
302
303 /*! Thread pool for topics that don't want a dedicated taskprocessor */
304 static struct ast_threadpool *pool;
305
306 STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type);
307
308 /*! \internal */
309 struct stasis_topic {
310         char *name;
311         /*! Variable length array of the subscribers */
312         AST_VECTOR(, struct stasis_subscription *) subscribers;
313
314         /*! Topics forwarding into this topic */
315         AST_VECTOR(, struct stasis_topic *) upstream_topics;
316 };
317
318 /* Forward declarations for the tightly-coupled subscription object */
319 static int topic_add_subscription(struct stasis_topic *topic,
320         struct stasis_subscription *sub);
321
322 static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
323
324 /*! \brief Lock two topics. */
325 #define topic_lock_both(topic1, topic2) \
326         do { \
327                 ao2_lock(topic1); \
328                 while (ao2_trylock(topic2)) { \
329                         AO2_DEADLOCK_AVOIDANCE(topic1); \
330                 } \
331         } while (0)
332
333 static void topic_dtor(void *obj)
334 {
335         struct stasis_topic *topic = obj;
336
337         /* Subscribers hold a reference to topics, so they should all be
338          * unsubscribed before we get here. */
339         ast_assert(AST_VECTOR_SIZE(&topic->subscribers) == 0);
340
341         ast_free(topic->name);
342         topic->name = NULL;
343
344         AST_VECTOR_FREE(&topic->subscribers);
345         AST_VECTOR_FREE(&topic->upstream_topics);
346 }
347
348 struct stasis_topic *stasis_topic_create(const char *name)
349 {
350         struct stasis_topic *topic;
351         int res = 0;
352
353         topic = ao2_t_alloc(sizeof(*topic), topic_dtor, name);
354         if (!topic) {
355                 return NULL;
356         }
357
358         topic->name = ast_strdup(name);
359         res |= AST_VECTOR_INIT(&topic->subscribers, INITIAL_SUBSCRIBERS_MAX);
360         res |= AST_VECTOR_INIT(&topic->upstream_topics, 0);
361         if (!topic->name || res) {
362                 ao2_cleanup(topic);
363                 return NULL;
364         }
365
366         return topic;
367 }
368
369 const char *stasis_topic_name(const struct stasis_topic *topic)
370 {
371         return topic->name;
372 }
373
374 /*! \internal */
375 struct stasis_subscription {
376         /*! Unique ID for this subscription */
377         char uniqueid[AST_UUID_STR_LEN];
378         /*! Topic subscribed to. */
379         struct stasis_topic *topic;
380         /*! Mailbox for processing incoming messages. */
381         struct ast_taskprocessor *mailbox;
382         /*! Callback function for incoming message processing. */
383         stasis_subscription_cb callback;
384         /*! Data pointer to be handed to the callback. */
385         void *data;
386
387         /*! Condition for joining with subscription. */
388         ast_cond_t join_cond;
389         /*! Flag set when final message for sub has been received.
390          *  Be sure join_lock is held before reading/setting. */
391         int final_message_rxed;
392         /*! Flag set when final message for sub has been processed.
393          *  Be sure join_lock is held before reading/setting. */
394         int final_message_processed;
395 };
396
397 static void subscription_dtor(void *obj)
398 {
399         struct stasis_subscription *sub = obj;
400
401         /* Subscriptions need to be manually unsubscribed before destruction
402          * b/c there's a cyclic reference between topics and subscriptions */
403         ast_assert(!stasis_subscription_is_subscribed(sub));
404         /* If there are any messages in flight to this subscription; that would
405          * be bad. */
406         ast_assert(stasis_subscription_is_done(sub));
407
408         ao2_cleanup(sub->topic);
409         sub->topic = NULL;
410         ast_taskprocessor_unreference(sub->mailbox);
411         sub->mailbox = NULL;
412         ast_cond_destroy(&sub->join_cond);
413 }
414
415 /*!
416  * \brief Invoke the subscription's callback.
417  * \param sub Subscription to invoke.
418  * \param topic Topic message was published to.
419  * \param message Message to send.
420  */
421 static void subscription_invoke(struct stasis_subscription *sub,
422                                   struct stasis_message *message)
423 {
424         /* Notify that the final message has been received */
425         if (stasis_subscription_final_message(sub, message)) {
426                 SCOPED_AO2LOCK(lock, sub);
427
428                 sub->final_message_rxed = 1;
429                 ast_cond_signal(&sub->join_cond);
430         }
431
432         /* Since sub is mostly immutable, no need to lock sub */
433         sub->callback(sub->data, sub, message);
434
435         /* Notify that the final message has been processed */
436         if (stasis_subscription_final_message(sub, message)) {
437                 SCOPED_AO2LOCK(lock, sub);
438
439                 sub->final_message_processed = 1;
440                 ast_cond_signal(&sub->join_cond);
441         }
442 }
443
444 static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
445 static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
446
447 void stasis_subscription_cb_noop(void *data, struct stasis_subscription *sub, struct stasis_message *message)
448 {
449 }
450
451 struct stasis_subscription *internal_stasis_subscribe(
452         struct stasis_topic *topic,
453         stasis_subscription_cb callback,
454         void *data,
455         int needs_mailbox,
456         int use_thread_pool)
457 {
458         RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup);
459
460         if (!topic) {
461                 return NULL;
462         }
463
464         /* The ao2 lock is used for join_cond. */
465         sub = ao2_t_alloc(sizeof(*sub), subscription_dtor, stasis_topic_name(topic));
466         if (!sub) {
467                 return NULL;
468         }
469         ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid));
470
471         if (needs_mailbox) {
472                 char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
473
474                 /* Create name with seq number appended. */
475                 ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "sub%c:%s",
476                         use_thread_pool ? 'p' : 'm',
477                         stasis_topic_name(topic));
478
479                 /*
480                  * With a small number of subscribers, a thread-per-sub is
481                  * acceptable. For a large number of subscribers, a thread
482                  * pool should be used.
483                  */
484                 if (use_thread_pool) {
485                         sub->mailbox = ast_threadpool_serializer(tps_name, pool);
486                 } else {
487                         sub->mailbox = ast_taskprocessor_get(tps_name, TPS_REF_DEFAULT);
488                 }
489                 if (!sub->mailbox) {
490                         return NULL;
491                 }
492                 ast_taskprocessor_set_local(sub->mailbox, sub);
493                 /* Taskprocessor has a reference */
494                 ao2_ref(sub, +1);
495         }
496
497         ao2_ref(topic, +1);
498         sub->topic = topic;
499         sub->callback = callback;
500         sub->data = data;
501         ast_cond_init(&sub->join_cond, NULL);
502
503         if (topic_add_subscription(topic, sub) != 0) {
504                 return NULL;
505         }
506         send_subscription_subscribe(topic, sub);
507
508         ao2_ref(sub, +1);
509         return sub;
510 }
511
512 struct stasis_subscription *stasis_subscribe(
513         struct stasis_topic *topic,
514         stasis_subscription_cb callback,
515         void *data)
516 {
517         return internal_stasis_subscribe(topic, callback, data, 1, 0);
518 }
519
520 struct stasis_subscription *stasis_subscribe_pool(
521         struct stasis_topic *topic,
522         stasis_subscription_cb callback,
523         void *data)
524 {
525         return internal_stasis_subscribe(topic, callback, data, 1, 1);
526 }
527
528 static int sub_cleanup(void *data)
529 {
530         struct stasis_subscription *sub = data;
531         ao2_cleanup(sub);
532         return 0;
533 }
534
535 struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
536 {
537         /* The subscription may be the last ref to this topic. Hold
538          * the topic ref open until after the unlock. */
539         RAII_VAR(struct stasis_topic *, topic,
540                 ao2_bump(sub ? sub->topic : NULL), ao2_cleanup);
541
542         if (!sub) {
543                 return NULL;
544         }
545
546         /* We have to remove the subscription first, to ensure the unsubscribe
547          * is the final message */
548         if (topic_remove_subscription(sub->topic, sub) != 0) {
549                 ast_log(LOG_ERROR,
550                         "Internal error: subscription has invalid topic\n");
551                 return NULL;
552         }
553
554         /* Now let everyone know about the unsubscribe */
555         send_subscription_unsubscribe(topic, sub);
556
557         /* When all that's done, remove the ref the mailbox has on the sub */
558         if (sub->mailbox) {
559                 ast_taskprocessor_push(sub->mailbox, sub_cleanup, sub);
560         }
561
562         /* Unsubscribing unrefs the subscription */
563         ao2_cleanup(sub);
564         return NULL;
565 }
566
567 int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscription,
568         long low_water, long high_water)
569 {
570         int res = -1;
571
572         if (subscription) {
573                 res = ast_taskprocessor_alert_set_levels(subscription->mailbox,
574                         low_water, high_water);
575         }
576         return res;
577 }
578
579 void stasis_subscription_join(struct stasis_subscription *subscription)
580 {
581         if (subscription) {
582                 SCOPED_AO2LOCK(lock, subscription);
583
584                 /* Wait until the processed flag has been set */
585                 while (!subscription->final_message_processed) {
586                         ast_cond_wait(&subscription->join_cond,
587                                 ao2_object_get_lockaddr(subscription));
588                 }
589         }
590 }
591
592 int stasis_subscription_is_done(struct stasis_subscription *subscription)
593 {
594         if (subscription) {
595                 SCOPED_AO2LOCK(lock, subscription);
596
597                 return subscription->final_message_rxed;
598         }
599
600         /* Null subscription is about as done as you can get */
601         return 1;
602 }
603
604 struct stasis_subscription *stasis_unsubscribe_and_join(
605         struct stasis_subscription *subscription)
606 {
607         if (!subscription) {
608                 return NULL;
609         }
610
611         /* Bump refcount to hold it past the unsubscribe */
612         ao2_ref(subscription, +1);
613         stasis_unsubscribe(subscription);
614         stasis_subscription_join(subscription);
615         /* Now decrement the refcount back */
616         ao2_cleanup(subscription);
617         return NULL;
618 }
619
620 int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
621 {
622         if (sub) {
623                 size_t i;
624                 struct stasis_topic *topic = sub->topic;
625                 SCOPED_AO2LOCK(lock_topic, topic);
626
627                 for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
628                         if (AST_VECTOR_GET(&topic->subscribers, i) == sub) {
629                                 return 1;
630                         }
631                 }
632         }
633
634         return 0;
635 }
636
637 const char *stasis_subscription_uniqueid(const struct stasis_subscription *sub)
638 {
639         return sub->uniqueid;
640 }
641
642 int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
643 {
644         struct stasis_subscription_change *change;
645
646         if (stasis_message_type(msg) != stasis_subscription_change_type()) {
647                 return 0;
648         }
649
650         change = stasis_message_data(msg);
651         if (strcmp("Unsubscribe", change->description)) {
652                 return 0;
653         }
654
655         if (strcmp(stasis_subscription_uniqueid(sub), change->uniqueid)) {
656                 return 0;
657         }
658
659         return 1;
660 }
661
662 /*!
663  * \brief Add a subscriber to a topic.
664  * \param topic Topic
665  * \param sub Subscriber
666  * \return 0 on success
667  * \return Non-zero on error
668  */
669 static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
670 {
671         size_t idx;
672         SCOPED_AO2LOCK(lock, topic);
673
674         /* The reference from the topic to the subscription is shared with
675          * the owner of the subscription, which will explicitly unsubscribe
676          * to release it.
677          *
678          * If we bumped the refcount here, the owner would have to unsubscribe
679          * and cleanup, which is a bit awkward. */
680         AST_VECTOR_APPEND(&topic->subscribers, sub);
681
682         for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
683                 topic_add_subscription(
684                         AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
685         }
686
687         return 0;
688 }
689
690 static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
691 {
692         size_t idx;
693         SCOPED_AO2LOCK(lock_topic, topic);
694
695         for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
696                 topic_remove_subscription(
697                         AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
698         }
699
700         return AST_VECTOR_REMOVE_ELEM_UNORDERED(&topic->subscribers, sub,
701                 AST_VECTOR_ELEM_CLEANUP_NOOP);
702 }
703
704 /*!
705  * \internal \brief Dispatch a message to a subscriber asynchronously
706  * \param local \ref ast_taskprocessor_local object
707  * \return 0
708  */
709 static int dispatch_exec_async(struct ast_taskprocessor_local *local)
710 {
711         struct stasis_subscription *sub = local->local_data;
712         struct stasis_message *message = local->data;
713
714         subscription_invoke(sub, message);
715         ao2_cleanup(message);
716
717         return 0;
718 }
719
720 /*!
721  * \internal \brief Data passed to \ref dispatch_exec_sync to synchronize
722  * a published message to a subscriber
723  */
724 struct sync_task_data {
725         ast_mutex_t lock;
726         ast_cond_t cond;
727         int complete;
728         void *task_data;
729 };
730
731 /*!
732  * \internal \brief Dispatch a message to a subscriber synchronously
733  * \param local \ref ast_taskprocessor_local object
734  * \return 0
735  */
736 static int dispatch_exec_sync(struct ast_taskprocessor_local *local)
737 {
738         struct stasis_subscription *sub = local->local_data;
739         struct sync_task_data *std = local->data;
740         struct stasis_message *message = std->task_data;
741
742         subscription_invoke(sub, message);
743         ao2_cleanup(message);
744
745         ast_mutex_lock(&std->lock);
746         std->complete = 1;
747         ast_cond_signal(&std->cond);
748         ast_mutex_unlock(&std->lock);
749
750         return 0;
751 }
752
753 /*!
754  * \internal \brief Dispatch a message to a subscriber
755  * \param sub The subscriber to dispatch to
756  * \param message The message to send
757  * \param synchronous If non-zero, synchronize on the subscriber receiving
758  * the message
759  */
760 static void dispatch_message(struct stasis_subscription *sub,
761         struct stasis_message *message,
762         int synchronous)
763 {
764         if (!sub->mailbox) {
765                 /* Dispatch directly */
766                 subscription_invoke(sub, message);
767                 return;
768         }
769
770         /* Bump the message for the taskprocessor push. This will get de-ref'd
771          * by the task processor callback.
772          */
773         ao2_bump(message);
774         if (!synchronous) {
775                 if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec_async, message)) {
776                         /* Push failed; ugh. */
777                         ast_log(LOG_ERROR, "Dropping async dispatch\n");
778                         ao2_cleanup(message);
779                 }
780         } else {
781                 struct sync_task_data std;
782
783                 ast_mutex_init(&std.lock);
784                 ast_cond_init(&std.cond, NULL);
785                 std.complete = 0;
786                 std.task_data = message;
787
788                 if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec_sync, &std)) {
789                         /* Push failed; ugh. */
790                         ast_log(LOG_ERROR, "Dropping sync dispatch\n");
791                         ao2_cleanup(message);
792                         ast_mutex_destroy(&std.lock);
793                         ast_cond_destroy(&std.cond);
794                         return;
795                 }
796
797                 ast_mutex_lock(&std.lock);
798                 while (!std.complete) {
799                         ast_cond_wait(&std.cond, &std.lock);
800                 }
801                 ast_mutex_unlock(&std.lock);
802
803                 ast_mutex_destroy(&std.lock);
804                 ast_cond_destroy(&std.cond);
805         }
806 }
807
808 /*!
809  * \internal \brief Publish a message to a topic's subscribers
810  * \brief topic The topic to publish to
811  * \brief message The message to publish
812  * \brief sync_sub An optional subscriber of the topic to publish synchronously
813  * to
814  */
815 static void publish_msg(struct stasis_topic *topic,
816         struct stasis_message *message, struct stasis_subscription *sync_sub)
817 {
818         size_t i;
819
820         ast_assert(topic != NULL);
821         ast_assert(message != NULL);
822
823         /*
824          * The topic may be unref'ed by the subscription invocation.
825          * Make sure we hold onto a reference while dispatching.
826          */
827         ao2_ref(topic, +1);
828         ao2_lock(topic);
829         for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
830                 struct stasis_subscription *sub = AST_VECTOR_GET(&topic->subscribers, i);
831
832                 ast_assert(sub != NULL);
833
834                 dispatch_message(sub, message, (sub == sync_sub));
835         }
836         ao2_unlock(topic);
837         ao2_ref(topic, -1);
838 }
839
840 void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
841 {
842         publish_msg(topic, message, NULL);
843 }
844
845 void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message)
846 {
847         ast_assert(sub != NULL);
848
849         publish_msg(sub->topic, message, sub);
850 }
851
852 /*!
853  * \brief Forwarding information
854  *
855  * Any message posted to \a from_topic is forwarded to \a to_topic.
856  *
857  * In cases where both the \a from_topic and \a to_topic need to be locked,
858  * always lock the \a to_topic first, then the \a from_topic. Lest you deadlock.
859  */
860 struct stasis_forward {
861         /*! Originating topic */
862         struct stasis_topic *from_topic;
863         /*! Destination topic */
864         struct stasis_topic *to_topic;
865 };
866
867 static void forward_dtor(void *obj)
868 {
869         struct stasis_forward *forward = obj;
870
871         ao2_cleanup(forward->from_topic);
872         forward->from_topic = NULL;
873         ao2_cleanup(forward->to_topic);
874         forward->to_topic = NULL;
875 }
876
877 struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward)
878 {
879         int idx;
880         struct stasis_topic *from;
881         struct stasis_topic *to;
882
883         if (!forward) {
884                 return NULL;
885         }
886
887         from = forward->from_topic;
888         to = forward->to_topic;
889
890         if (from && to) {
891                 topic_lock_both(to, from);
892                 AST_VECTOR_REMOVE_ELEM_UNORDERED(&to->upstream_topics, from,
893                         AST_VECTOR_ELEM_CLEANUP_NOOP);
894
895                 for (idx = 0; idx < AST_VECTOR_SIZE(&to->subscribers); ++idx) {
896                         topic_remove_subscription(from, AST_VECTOR_GET(&to->subscribers, idx));
897                 }
898                 ao2_unlock(from);
899                 ao2_unlock(to);
900         }
901
902         ao2_cleanup(forward);
903
904         return NULL;
905 }
906
907 struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic,
908         struct stasis_topic *to_topic)
909 {
910         int res;
911         size_t idx;
912         RAII_VAR(struct stasis_forward *, forward, NULL, ao2_cleanup);
913
914         if (!from_topic || !to_topic) {
915                 return NULL;
916         }
917
918         forward = ao2_alloc_options(sizeof(*forward), forward_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
919         if (!forward) {
920                 return NULL;
921         }
922
923         /* Forwards to ourselves are implicit. */
924         if (to_topic == from_topic) {
925                 return ao2_bump(forward);
926         }
927
928         forward->from_topic = ao2_bump(from_topic);
929         forward->to_topic = ao2_bump(to_topic);
930
931         topic_lock_both(to_topic, from_topic);
932         res = AST_VECTOR_APPEND(&to_topic->upstream_topics, from_topic);
933         if (res != 0) {
934                 ao2_unlock(from_topic);
935                 ao2_unlock(to_topic);
936                 return NULL;
937         }
938
939         for (idx = 0; idx < AST_VECTOR_SIZE(&to_topic->subscribers); ++idx) {
940                 topic_add_subscription(from_topic, AST_VECTOR_GET(&to_topic->subscribers, idx));
941         }
942         ao2_unlock(from_topic);
943         ao2_unlock(to_topic);
944
945         return ao2_bump(forward);
946 }
947
948 static void subscription_change_dtor(void *obj)
949 {
950         struct stasis_subscription_change *change = obj;
951
952         ast_string_field_free_memory(change);
953         ao2_cleanup(change->topic);
954 }
955
956 static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, const char *uniqueid, const char *description)
957 {
958         struct stasis_subscription_change *change;
959
960         change = ao2_alloc(sizeof(struct stasis_subscription_change), subscription_change_dtor);
961         if (!change || ast_string_field_init(change, 128)) {
962                 ao2_cleanup(change);
963                 return NULL;
964         }
965
966         ast_string_field_set(change, uniqueid, uniqueid);
967         ast_string_field_set(change, description, description);
968         ao2_ref(topic, +1);
969         change->topic = topic;
970
971         return change;
972 }
973
974 static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
975 {
976         struct stasis_subscription_change *change;
977         struct stasis_message *msg;
978
979         /* This assumes that we have already unsubscribed */
980         ast_assert(stasis_subscription_is_subscribed(sub));
981
982         if (!stasis_subscription_change_type()) {
983                 return;
984         }
985
986         change = subscription_change_alloc(topic, sub->uniqueid, "Subscribe");
987         if (!change) {
988                 return;
989         }
990
991         msg = stasis_message_create(stasis_subscription_change_type(), change);
992         if (!msg) {
993                 ao2_cleanup(change);
994                 return;
995         }
996
997         stasis_publish(topic, msg);
998         ao2_cleanup(msg);
999         ao2_cleanup(change);
1000 }
1001
1002 static void send_subscription_unsubscribe(struct stasis_topic *topic,
1003         struct stasis_subscription *sub)
1004 {
1005         struct stasis_subscription_change *change;
1006         struct stasis_message *msg;
1007
1008         /* This assumes that we have already unsubscribed */
1009         ast_assert(!stasis_subscription_is_subscribed(sub));
1010
1011         if (!stasis_subscription_change_type()) {
1012                 return;
1013         }
1014
1015         change = subscription_change_alloc(topic, sub->uniqueid, "Unsubscribe");
1016         if (!change) {
1017                 return;
1018         }
1019
1020         msg = stasis_message_create(stasis_subscription_change_type(), change);
1021         if (!msg) {
1022                 ao2_cleanup(change);
1023                 return;
1024         }
1025
1026         stasis_publish(topic, msg);
1027
1028         /* Now we have to dispatch to the subscription itself */
1029         dispatch_message(sub, msg, 0);
1030
1031         ao2_cleanup(msg);
1032         ao2_cleanup(change);
1033 }
1034
1035 struct topic_pool_entry {
1036         struct stasis_forward *forward;
1037         struct stasis_topic *topic;
1038 };
1039
1040 static void topic_pool_entry_dtor(void *obj)
1041 {
1042         struct topic_pool_entry *entry = obj;
1043
1044         entry->forward = stasis_forward_cancel(entry->forward);
1045         ao2_cleanup(entry->topic);
1046         entry->topic = NULL;
1047 }
1048
1049 static struct topic_pool_entry *topic_pool_entry_alloc(void)
1050 {
1051         return ao2_alloc_options(sizeof(struct topic_pool_entry), topic_pool_entry_dtor,
1052                 AO2_ALLOC_OPT_LOCK_NOLOCK);
1053 }
1054
1055 struct stasis_topic_pool {
1056         struct ao2_container *pool_container;
1057         struct stasis_topic *pool_topic;
1058 };
1059
1060 static void topic_pool_dtor(void *obj)
1061 {
1062         struct stasis_topic_pool *pool = obj;
1063
1064         ao2_cleanup(pool->pool_container);
1065         pool->pool_container = NULL;
1066         ao2_cleanup(pool->pool_topic);
1067         pool->pool_topic = NULL;
1068 }
1069
1070 static int topic_pool_entry_hash(const void *obj, const int flags)
1071 {
1072         const struct topic_pool_entry *object;
1073         const char *key;
1074
1075         switch (flags & OBJ_SEARCH_MASK) {
1076         case OBJ_SEARCH_KEY:
1077                 key = obj;
1078                 break;
1079         case OBJ_SEARCH_OBJECT:
1080                 object = obj;
1081                 key = stasis_topic_name(object->topic);
1082                 break;
1083         default:
1084                 /* Hash can only work on something with a full key. */
1085                 ast_assert(0);
1086                 return 0;
1087         }
1088         return ast_str_case_hash(key);
1089 }
1090
1091 static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
1092 {
1093         const struct topic_pool_entry *object_left = obj;
1094         const struct topic_pool_entry *object_right = arg;
1095         const char *right_key = arg;
1096         int cmp;
1097
1098         switch (flags & OBJ_SEARCH_MASK) {
1099         case OBJ_SEARCH_OBJECT:
1100                 right_key = stasis_topic_name(object_right->topic);
1101                 /* Fall through */
1102         case OBJ_SEARCH_KEY:
1103                 cmp = strcasecmp(stasis_topic_name(object_left->topic), right_key);
1104                 break;
1105         case OBJ_SEARCH_PARTIAL_KEY:
1106                 /* Not supported by container */
1107                 ast_assert(0);
1108                 cmp = -1;
1109                 break;
1110         default:
1111                 /*
1112                  * What arg points to is specific to this traversal callback
1113                  * and has no special meaning to astobj2.
1114                  */
1115                 cmp = 0;
1116                 break;
1117         }
1118         if (cmp) {
1119                 return 0;
1120         }
1121         /*
1122          * At this point the traversal callback is identical to a sorted
1123          * container.
1124          */
1125         return CMP_MATCH;
1126 }
1127
1128 struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic)
1129 {
1130         struct stasis_topic_pool *pool;
1131
1132         pool = ao2_alloc_options(sizeof(*pool), topic_pool_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
1133         if (!pool) {
1134                 return NULL;
1135         }
1136
1137         pool->pool_container = ao2_container_alloc(TOPIC_POOL_BUCKETS,
1138                 topic_pool_entry_hash, topic_pool_entry_cmp);
1139         if (!pool->pool_container) {
1140                 ao2_cleanup(pool);
1141                 return NULL;
1142         }
1143         ao2_ref(pooled_topic, +1);
1144         pool->pool_topic = pooled_topic;
1145
1146         return pool;
1147 }
1148
1149 struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
1150 {
1151         RAII_VAR(struct topic_pool_entry *, topic_pool_entry, NULL, ao2_cleanup);
1152         SCOPED_AO2LOCK(topic_container_lock, pool->pool_container);
1153
1154         topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1155         if (topic_pool_entry) {
1156                 return topic_pool_entry->topic;
1157         }
1158
1159         topic_pool_entry = topic_pool_entry_alloc();
1160         if (!topic_pool_entry) {
1161                 return NULL;
1162         }
1163
1164         topic_pool_entry->topic = stasis_topic_create(topic_name);
1165         if (!topic_pool_entry->topic) {
1166                 return NULL;
1167         }
1168
1169         topic_pool_entry->forward = stasis_forward_all(topic_pool_entry->topic, pool->pool_topic);
1170         if (!topic_pool_entry->forward) {
1171                 return NULL;
1172         }
1173
1174         if (!ao2_link_flags(pool->pool_container, topic_pool_entry, OBJ_NOLOCK)) {
1175                 return NULL;
1176         }
1177
1178         return topic_pool_entry->topic;
1179 }
1180
1181 void stasis_log_bad_type_access(const char *name)
1182 {
1183 #ifdef AST_DEVMODE
1184         ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name);
1185 #endif
1186 }
1187
1188 /*! \brief A multi object blob data structure to carry user event stasis messages */
1189 struct ast_multi_object_blob {
1190         struct ast_json *blob;                             /*< A blob of JSON data */
1191         AST_VECTOR(, void *) snapshots[STASIS_UMOS_MAX];   /*< Vector of snapshots for each type */
1192 };
1193
1194 /*!
1195  * \internal
1196  * \brief Destructor for \ref ast_multi_object_blob objects
1197  */
1198 static void multi_object_blob_dtor(void *obj)
1199 {
1200         struct ast_multi_object_blob *multi = obj;
1201         int type;
1202         int i;
1203
1204         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1205                 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
1206                         ao2_cleanup(AST_VECTOR_GET(&multi->snapshots[type], i));
1207                 }
1208                 AST_VECTOR_FREE(&multi->snapshots[type]);
1209         }
1210         ast_json_unref(multi->blob);
1211 }
1212
1213 /*! \brief Create a stasis user event multi object blob */
1214 struct ast_multi_object_blob *ast_multi_object_blob_create(struct ast_json *blob)
1215 {
1216         int type;
1217         RAII_VAR(struct ast_multi_object_blob *, multi,
1218                         ao2_alloc(sizeof(*multi), multi_object_blob_dtor),
1219                         ao2_cleanup);
1220
1221         ast_assert(blob != NULL);
1222
1223         if (!multi) {
1224                 return NULL;
1225         }
1226
1227         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1228                 if (AST_VECTOR_INIT(&multi->snapshots[type], 0)) {
1229                         return NULL;
1230                 }
1231         }
1232
1233         multi->blob = ast_json_ref(blob);
1234
1235         ao2_ref(multi, +1);
1236         return multi;
1237 }
1238
1239 /*! \brief Add an object (snapshot) to the blob */
1240 void ast_multi_object_blob_add(struct ast_multi_object_blob *multi,
1241         enum stasis_user_multi_object_snapshot_type type, void *object)
1242 {
1243         if (!multi || !object) {
1244                 return;
1245         }
1246         AST_VECTOR_APPEND(&multi->snapshots[type],object);
1247 }
1248
1249 /*! \brief Publish single channel user event (for app_userevent compatibility) */
1250 void ast_multi_object_blob_single_channel_publish(struct ast_channel *chan,
1251         struct stasis_message_type *type, struct ast_json *blob)
1252 {
1253         RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
1254         RAII_VAR(struct ast_channel_snapshot *, channel_snapshot, NULL, ao2_cleanup);
1255         RAII_VAR(struct ast_multi_object_blob *, multi, NULL, ao2_cleanup);
1256
1257         if (!type) {
1258                 return;
1259         }
1260
1261         multi = ast_multi_object_blob_create(blob);
1262         if (!multi) {
1263                 return;
1264         }
1265
1266         channel_snapshot = ast_channel_snapshot_create(chan);
1267         ao2_ref(channel_snapshot, +1);
1268         ast_multi_object_blob_add(multi, STASIS_UMOS_CHANNEL, channel_snapshot);
1269
1270         message = stasis_message_create(type, multi);
1271         if (message) {
1272                 /* app_userevent still publishes to channel */
1273                 stasis_publish(ast_channel_topic(chan), message);
1274         }
1275 }
1276
1277 /*! \internal \brief convert multi object blob to ari json */
1278 static struct ast_json *multi_user_event_to_json(
1279         struct stasis_message *message,
1280         const struct stasis_message_sanitizer *sanitize)
1281 {
1282         RAII_VAR(struct ast_json *, out, NULL, ast_json_unref);
1283         struct ast_multi_object_blob *multi = stasis_message_data(message);
1284         struct ast_json *blob = multi->blob;
1285         const struct timeval *tv = stasis_message_timestamp(message);
1286         enum stasis_user_multi_object_snapshot_type type;
1287         int i;
1288
1289         out = ast_json_object_create();
1290         if (!out) {
1291                 return NULL;
1292         }
1293
1294         ast_json_object_set(out, "type", ast_json_string_create("ChannelUserevent"));
1295         ast_json_object_set(out, "timestamp", ast_json_timeval(*tv, NULL));
1296         ast_json_object_set(out, "eventname", ast_json_string_create(ast_json_string_get((ast_json_object_get(blob, "eventname")))));
1297         ast_json_object_set(out, "userevent", ast_json_deep_copy(blob));
1298
1299         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1300                 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
1301                         struct ast_json *json_object = NULL;
1302                         char *name = NULL;
1303                         void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
1304
1305                         switch (type) {
1306                         case STASIS_UMOS_CHANNEL:
1307                                 json_object = ast_channel_snapshot_to_json(snapshot, sanitize);
1308                                 name = "channel";
1309                                 break;
1310                         case STASIS_UMOS_BRIDGE:
1311                                 json_object = ast_bridge_snapshot_to_json(snapshot, sanitize);
1312                                 name = "bridge";
1313                                 break;
1314                         case STASIS_UMOS_ENDPOINT:
1315                                 json_object = ast_endpoint_snapshot_to_json(snapshot, sanitize);
1316                                 name = "endpoint";
1317                                 break;
1318                         }
1319                         if (json_object) {
1320                                 ast_json_object_set(out, name, json_object);
1321                         }
1322                 }
1323         }
1324         return ast_json_ref(out);
1325 }
1326
1327 /*! \internal \brief convert multi object blob to ami string */
1328 static struct ast_str *multi_object_blob_to_ami(void *obj)
1329 {
1330         struct ast_str *ami_str=ast_str_create(1024);
1331         struct ast_str *ami_snapshot;
1332         const struct ast_multi_object_blob *multi = obj;
1333         enum stasis_user_multi_object_snapshot_type type;
1334         int i;
1335
1336         if (!ami_str) {
1337                 return NULL;
1338         }
1339         if (!multi) {
1340                 ast_free(ami_str);
1341                 return NULL;
1342         }
1343
1344         for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1345                 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
1346                         char *name = "";
1347                         void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
1348                         ami_snapshot = NULL;
1349
1350                         if (i > 0) {
1351                                 ast_asprintf(&name, "%d", i + 1);
1352                         }
1353
1354                         switch (type) {
1355                         case STASIS_UMOS_CHANNEL:
1356                                 ami_snapshot = ast_manager_build_channel_state_string_prefix(snapshot, name);
1357                                 break;
1358
1359                         case STASIS_UMOS_BRIDGE:
1360                                 ami_snapshot = ast_manager_build_bridge_state_string_prefix(snapshot, name);
1361                                 break;
1362
1363                         case STASIS_UMOS_ENDPOINT:
1364                                 /* currently not sending endpoint snapshots to AMI */
1365                                 break;
1366                         }
1367                         if (ami_snapshot) {
1368                                 ast_str_append(&ami_str, 0, "%s", ast_str_buffer(ami_snapshot));
1369                                 ast_free(ami_snapshot);
1370                         }
1371                 }
1372         }
1373
1374         return ami_str;
1375 }
1376
1377 /*! \internal \brief Callback to pass only user defined parameters from blob */
1378 static int userevent_exclusion_cb(const char *key)
1379 {
1380         if (!strcmp("eventname", key)) {
1381                 return 1;
1382         }
1383         return 0;
1384 }
1385
1386 static struct ast_manager_event_blob *multi_user_event_to_ami(
1387         struct stasis_message *message)
1388 {
1389         RAII_VAR(struct ast_str *, object_string, NULL, ast_free);
1390         RAII_VAR(struct ast_str *, body, NULL, ast_free);
1391         struct ast_multi_object_blob *multi = stasis_message_data(message);
1392         const char *eventname;
1393
1394         eventname = ast_json_string_get(ast_json_object_get(multi->blob, "eventname"));
1395         body = ast_manager_str_from_json_object(multi->blob, userevent_exclusion_cb);
1396         object_string = multi_object_blob_to_ami(multi);
1397         if (!object_string || !body) {
1398                 return NULL;
1399         }
1400
1401         return ast_manager_event_blob_create(EVENT_FLAG_USER, "UserEvent",
1402                 "%s"
1403                 "UserEvent: %s\r\n"
1404                 "%s",
1405                 ast_str_buffer(object_string),
1406                 eventname,
1407                 ast_str_buffer(body));
1408 }
1409
1410 /*! \brief A structure to hold global configuration-related options */
1411 struct stasis_declined_config {
1412         /*! The list of message types to decline */
1413         struct ao2_container *declined;
1414 };
1415
1416 /*! \brief Threadpool configuration options */
1417 struct stasis_threadpool_conf {
1418         /*! Initial size of the thread pool */
1419         int initial_size;
1420         /*! Time, in seconds, before we expire a thread */
1421         int idle_timeout_sec;
1422         /*! Maximum number of thread to allow */
1423         int max_size;
1424 };
1425
1426 struct stasis_config {
1427         /*! Thread pool configuration options */
1428         struct stasis_threadpool_conf *threadpool_options;
1429         /*! Declined message types */
1430         struct stasis_declined_config *declined_message_types;
1431 };
1432
1433 static struct aco_type threadpool_option = {
1434         .type = ACO_GLOBAL,
1435         .name = "threadpool",
1436         .item_offset = offsetof(struct stasis_config, threadpool_options),
1437         .category = "^threadpool$",
1438         .category_match = ACO_WHITELIST,
1439 };
1440
1441 static struct aco_type *threadpool_options[] = ACO_TYPES(&threadpool_option);
1442
1443 /*! \brief An aco_type structure to link the "declined_message_types" category to the stasis_declined_config type */
1444 static struct aco_type declined_option = {
1445         .type = ACO_GLOBAL,
1446         .name = "declined_message_types",
1447         .item_offset = offsetof(struct stasis_config, declined_message_types),
1448         .category_match = ACO_WHITELIST,
1449         .category = "^declined_message_types$",
1450 };
1451
1452 struct aco_type *declined_options[] = ACO_TYPES(&declined_option);
1453
1454 struct aco_file stasis_conf = {
1455         .filename = "stasis.conf",
1456         .types = ACO_TYPES(&declined_option, &threadpool_option),
1457 };
1458
1459 /*! \brief A global object container that will contain the stasis_config that gets swapped out on reloads */
1460 static AO2_GLOBAL_OBJ_STATIC(globals);
1461
1462 static void *stasis_config_alloc(void);
1463
1464 /*! \brief Register information about the configs being processed by this module */
1465 CONFIG_INFO_CORE("stasis", cfg_info, globals, stasis_config_alloc,
1466         .files = ACO_FILES(&stasis_conf),
1467 );
1468
1469 static void stasis_declined_config_destructor(void *obj)
1470 {
1471         struct stasis_declined_config *declined = obj;
1472
1473         ao2_cleanup(declined->declined);
1474 }
1475
1476 static void stasis_config_destructor(void *obj)
1477 {
1478         struct stasis_config *cfg = obj;
1479
1480         ao2_cleanup(cfg->declined_message_types);
1481         ast_free(cfg->threadpool_options);
1482 }
1483
1484 static void *stasis_config_alloc(void)
1485 {
1486         struct stasis_config *cfg;
1487
1488         if (!(cfg = ao2_alloc(sizeof(*cfg), stasis_config_destructor))) {
1489                 return NULL;
1490         }
1491
1492         cfg->threadpool_options = ast_calloc(1, sizeof(*cfg->threadpool_options));
1493         if (!cfg->threadpool_options) {
1494                 ao2_ref(cfg, -1);
1495                 return NULL;
1496         }
1497
1498         cfg->declined_message_types = ao2_alloc(sizeof(*cfg->declined_message_types),
1499                 stasis_declined_config_destructor);
1500         if (!cfg->declined_message_types) {
1501                 ao2_ref(cfg, -1);
1502                 return NULL;
1503         }
1504
1505         cfg->declined_message_types->declined = ast_str_container_alloc(13);
1506         if (!cfg->declined_message_types->declined) {
1507                 ao2_ref(cfg, -1);
1508                 return NULL;
1509         }
1510
1511         return cfg;
1512 }
1513
1514 int stasis_message_type_declined(const char *name)
1515 {
1516         RAII_VAR(struct stasis_config *, cfg, ao2_global_obj_ref(globals), ao2_cleanup);
1517         char *name_in_declined;
1518         int res;
1519
1520         if (!cfg || !cfg->declined_message_types) {
1521                 return 0;
1522         }
1523
1524         name_in_declined = ao2_find(cfg->declined_message_types->declined, name, OBJ_SEARCH_KEY);
1525         res = name_in_declined ? 1 : 0;
1526         ao2_cleanup(name_in_declined);
1527         if (res) {
1528                 ast_log(LOG_NOTICE, "Declining to allocate Stasis message type '%s' due to configuration\n", name);
1529         }
1530         return res;
1531 }
1532
1533 static int declined_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
1534 {
1535         struct stasis_declined_config *declined = obj;
1536
1537         if (ast_strlen_zero(var->value)) {
1538                 return 0;
1539         }
1540
1541         if (ast_str_container_add(declined->declined, var->value)) {
1542                 return -1;
1543         }
1544
1545         return 0;
1546 }
1547
1548 /*!
1549  * @{ \brief Define multi user event message type(s).
1550  */
1551
1552 STASIS_MESSAGE_TYPE_DEFN(ast_multi_user_event_type,
1553         .to_json = multi_user_event_to_json,
1554         .to_ami = multi_user_event_to_ami,
1555         );
1556
1557 /*! @} */
1558
1559 /*! \brief Cleanup function for graceful shutdowns */
1560 static void stasis_cleanup(void)
1561 {
1562         ast_threadpool_shutdown(pool);
1563         pool = NULL;
1564         STASIS_MESSAGE_TYPE_CLEANUP(stasis_subscription_change_type);
1565         STASIS_MESSAGE_TYPE_CLEANUP(ast_multi_user_event_type);
1566         aco_info_destroy(&cfg_info);
1567         ao2_global_obj_release(globals);
1568 }
1569
1570 int stasis_init(void)
1571 {
1572         RAII_VAR(struct stasis_config *, cfg, NULL, ao2_cleanup);
1573         int cache_init;
1574         struct ast_threadpool_options threadpool_opts = { 0, };
1575
1576         /* Be sure the types are cleaned up after the message bus */
1577         ast_register_cleanup(stasis_cleanup);
1578
1579         if (aco_info_init(&cfg_info)) {
1580                 return -1;
1581         }
1582
1583         aco_option_register_custom(&cfg_info, "decline", ACO_EXACT,
1584                 declined_options, "", declined_handler, 0);
1585         aco_option_register(&cfg_info, "initial_size", ACO_EXACT,
1586                 threadpool_options, "5", OPT_INT_T, PARSE_IN_RANGE,
1587                 FLDSET(struct stasis_threadpool_conf, initial_size), 0,
1588                 INT_MAX);
1589         aco_option_register(&cfg_info, "idle_timeout_sec", ACO_EXACT,
1590                 threadpool_options, "20", OPT_INT_T, PARSE_IN_RANGE,
1591                 FLDSET(struct stasis_threadpool_conf, idle_timeout_sec), 0,
1592                 INT_MAX);
1593         aco_option_register(&cfg_info, "max_size", ACO_EXACT,
1594                 threadpool_options, "50", OPT_INT_T, PARSE_IN_RANGE,
1595                 FLDSET(struct stasis_threadpool_conf, max_size), 0,
1596                 INT_MAX);
1597
1598         if (aco_process_config(&cfg_info, 0) == ACO_PROCESS_ERROR) {
1599                 struct stasis_config *default_cfg = stasis_config_alloc();
1600
1601                 if (!default_cfg) {
1602                         return -1;
1603                 }
1604
1605                 if (aco_set_defaults(&threadpool_option, "threadpool", default_cfg->threadpool_options)) {
1606                         ast_log(LOG_ERROR, "Failed to initialize defaults on Stasis configuration object\n");
1607                         ao2_ref(default_cfg, -1);
1608                         return -1;
1609                 }
1610
1611                 if (aco_set_defaults(&declined_option, "declined_message_types", default_cfg->declined_message_types)) {
1612                         ast_log(LOG_ERROR, "Failed to load stasis.conf and failed to initialize defaults.\n");
1613                         return -1;
1614                 }
1615
1616                 ast_log(LOG_NOTICE, "Could not load Stasis configuration; using defaults\n");
1617                 ao2_global_obj_replace_unref(globals, default_cfg);
1618                 cfg = default_cfg;
1619         } else {
1620                 cfg = ao2_global_obj_ref(globals);
1621                 if (!cfg) {
1622                         ast_log(LOG_ERROR, "Failed to obtain Stasis configuration object\n");
1623                         return -1;
1624                 }
1625         }
1626
1627         threadpool_opts.version = AST_THREADPOOL_OPTIONS_VERSION;
1628         threadpool_opts.initial_size = cfg->threadpool_options->initial_size;
1629         threadpool_opts.auto_increment = 1;
1630         threadpool_opts.max_size = cfg->threadpool_options->max_size;
1631         threadpool_opts.idle_timeout = cfg->threadpool_options->idle_timeout_sec;
1632         pool = ast_threadpool_create("stasis-core", NULL, &threadpool_opts);
1633         if (!pool) {
1634                 ast_log(LOG_ERROR, "Failed to create 'stasis-core' threadpool\n");
1635                 return -1;
1636         }
1637
1638         cache_init = stasis_cache_init();
1639         if (cache_init != 0) {
1640                 return -1;
1641         }
1642
1643         if (STASIS_MESSAGE_TYPE_INIT(stasis_subscription_change_type) != 0) {
1644                 return -1;
1645         }
1646         if (STASIS_MESSAGE_TYPE_INIT(ast_multi_user_event_type) != 0) {
1647                 return -1;
1648         }
1649
1650         return 0;
1651 }
1652