res/stasis: Add CLI commands for displaying/debugging ARI apps
[asterisk/asterisk.git] / res / res_stasis.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2012 - 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis application support.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  *
25  * <code>res_stasis.so</code> brings together the various components of the
26  * Stasis application infrastructure.
27  *
28  * First, there's the Stasis application handler, stasis_app_exec(). This is
29  * called by <code>app_stasis.so</code> to give control of a channel to the
30  * Stasis application code from the dialplan.
31  *
32  * While a channel is in stasis_app_exec(), it has a \ref stasis_app_control
33  * object, which may be used to control the channel.
34  *
35  * To control the channel, commands may be sent to channel using
36  * stasis_app_send_command() and stasis_app_send_async_command().
37  *
38  * Alongside this, applications may be registered/unregistered using
39  * stasis_app_register()/stasis_app_unregister(). While a channel is in Stasis,
40  * events received on the channel's topic are converted to JSON and forwarded to
41  * the \ref stasis_app_cb. The application may also subscribe to the channel to
42  * continue to receive messages even after the channel has left Stasis, but it
43  * will not be able to control it.
44  *
45  * Given all the stuff that comes together in this module, it's been broken up
46  * into several pieces that are in <code>res/stasis/</code> and compiled into
47  * <code>res_stasis.so</code>.
48  */
49
50 /*** MODULEINFO
51         <support_level>core</support_level>
52  ***/
53
54 #include "asterisk.h"
55
56 #include "asterisk/astobj2.h"
57 #include "asterisk/callerid.h"
58 #include "asterisk/module.h"
59 #include "asterisk/stasis_app_impl.h"
60 #include "asterisk/stasis_channels.h"
61 #include "asterisk/stasis_bridges.h"
62 #include "asterisk/stasis_endpoints.h"
63 #include "asterisk/stasis_message_router.h"
64 #include "asterisk/strings.h"
65 #include "stasis/app.h"
66 #include "stasis/control.h"
67 #include "stasis/messaging.h"
68 #include "stasis/cli.h"
69 #include "stasis/stasis_bridge.h"
70 #include "asterisk/core_unreal.h"
71 #include "asterisk/musiconhold.h"
72 #include "asterisk/causes.h"
73 #include "asterisk/stringfields.h"
74 #include "asterisk/bridge_after.h"
75 #include "asterisk/format_cache.h"
76
77 /*! Time to wait for a frame in the application */
78 #define MAX_WAIT_MS 200
79
80 /*!
81  * \brief Number of buckets for the Stasis application hash table.  Remember to
82  * keep it a prime number!
83  */
84 #define APPS_NUM_BUCKETS 127
85
86 /*!
87  * \brief Number of buckets for the Stasis application hash table.  Remember to
88  * keep it a prime number!
89  */
90 #define CONTROLS_NUM_BUCKETS 127
91
92 /*!
93  * \brief Number of buckets for the Stasis bridges hash table.  Remember to
94  * keep it a prime number!
95  */
96 #define BRIDGES_NUM_BUCKETS 127
97
98 /*!
99  * \brief Stasis application container.
100  */
101 struct ao2_container *apps_registry;
102
103 struct ao2_container *app_controls;
104
105 struct ao2_container *app_bridges;
106
107 struct ao2_container *app_bridges_moh;
108
109 struct ao2_container *app_bridges_playback;
110
111 /*!
112  * \internal \brief List of registered event sources.
113  */
114 AST_RWLIST_HEAD_STATIC(event_sources, stasis_app_event_source);
115
116 static struct ast_json *stasis_end_to_json(struct stasis_message *message,
117                 const struct stasis_message_sanitizer *sanitize)
118 {
119         struct ast_channel_blob *payload = stasis_message_data(message);
120
121         if (sanitize && sanitize->channel_snapshot &&
122                         sanitize->channel_snapshot(payload->snapshot)) {
123                 return NULL;
124         }
125
126         return ast_json_pack("{s: s, s: o, s: o}",
127                 "type", "StasisEnd",
128                 "timestamp", ast_json_timeval(ast_tvnow(), NULL),
129                 "channel", ast_channel_snapshot_to_json(payload->snapshot, sanitize));
130 }
131
132 STASIS_MESSAGE_TYPE_DEFN_LOCAL(end_message_type,
133         .to_json = stasis_end_to_json);
134
135 struct start_message_blob {
136         struct ast_channel_snapshot *channel;           /*!< Channel that is entering Stasis() */
137         struct ast_channel_snapshot *replace_channel;   /*!< Channel that is being replaced (optional) */
138         struct ast_json *blob;                          /*!< JSON blob containing timestamp and args */
139 };
140
141 static struct ast_json *stasis_start_to_json(struct stasis_message *message,
142                 const struct stasis_message_sanitizer *sanitize)
143 {
144         struct start_message_blob *payload = stasis_message_data(message);
145         struct ast_json *msg;
146
147         if (sanitize && sanitize->channel_snapshot &&
148                         sanitize->channel_snapshot(payload->channel)) {
149                 return NULL;
150         }
151
152         msg = ast_json_pack("{s: s, s: o, s: o, s: o}",
153                 "type", "StasisStart",
154                 "timestamp", ast_json_copy(ast_json_object_get(payload->blob, "timestamp")),
155                 "args", ast_json_deep_copy(ast_json_object_get(payload->blob, "args")),
156                 "channel", ast_channel_snapshot_to_json(payload->channel, NULL));
157         if (!msg) {
158                 ast_log(LOG_ERROR, "Failed to pack JSON for StasisStart message\n");
159                 return NULL;
160         }
161
162         if (payload->replace_channel) {
163                 int res = ast_json_object_set(msg, "replace_channel",
164                         ast_channel_snapshot_to_json(payload->replace_channel, NULL));
165
166                 if (res) {
167                         ast_json_unref(msg);
168                         ast_log(LOG_ERROR, "Failed to append JSON for StasisStart message\n");
169                         return NULL;
170                 }
171         }
172
173         return msg;
174 }
175
176 STASIS_MESSAGE_TYPE_DEFN_LOCAL(start_message_type,
177         .to_json = stasis_start_to_json);
178
179 const char *stasis_app_name(const struct stasis_app *app)
180 {
181         return app_name(app);
182 }
183
184 /*! AO2 hash function for \ref app */
185 static int app_hash(const void *obj, const int flags)
186 {
187         const struct stasis_app *app;
188         const char *key;
189
190         switch (flags & OBJ_SEARCH_MASK) {
191         case OBJ_SEARCH_KEY:
192                 key = obj;
193                 break;
194         case OBJ_SEARCH_OBJECT:
195                 app = obj;
196                 key = stasis_app_name(app);
197                 break;
198         default:
199                 /* Hash can only work on something with a full key. */
200                 ast_assert(0);
201                 return 0;
202         }
203         return ast_str_hash(key);
204 }
205
206 /*! AO2 comparison function for \ref app */
207 static int app_compare(void *obj, void *arg, int flags)
208 {
209         const struct stasis_app *object_left = obj;
210         const struct stasis_app *object_right = arg;
211         const char *right_key = arg;
212         int cmp;
213
214         switch (flags & OBJ_SEARCH_MASK) {
215         case OBJ_SEARCH_OBJECT:
216                 right_key = stasis_app_name(object_right);
217                 /* Fall through */
218         case OBJ_SEARCH_KEY:
219                 cmp = strcmp(stasis_app_name(object_left), right_key);
220                 break;
221         case OBJ_SEARCH_PARTIAL_KEY:
222                 /*
223                  * We could also use a partial key struct containing a length
224                  * so strlen() does not get called for every comparison instead.
225                  */
226                 cmp = strncmp(stasis_app_name(object_left), right_key, strlen(right_key));
227                 break;
228         default:
229                 /*
230                  * What arg points to is specific to this traversal callback
231                  * and has no special meaning to astobj2.
232                  */
233                 cmp = 0;
234                 break;
235         }
236         if (cmp) {
237                 return 0;
238         }
239         /*
240          * At this point the traversal callback is identical to a sorted
241          * container.
242          */
243         return CMP_MATCH;
244 }
245
246 /*! AO2 hash function for \ref stasis_app_control */
247 static int control_hash(const void *obj, const int flags)
248 {
249         const struct stasis_app_control *control;
250         const char *key;
251
252         switch (flags & OBJ_SEARCH_MASK) {
253         case OBJ_SEARCH_KEY:
254                 key = obj;
255                 break;
256         case OBJ_SEARCH_OBJECT:
257                 control = obj;
258                 key = stasis_app_control_get_channel_id(control);
259                 break;
260         default:
261                 /* Hash can only work on something with a full key. */
262                 ast_assert(0);
263                 return 0;
264         }
265         return ast_str_hash(key);
266 }
267
268 /*! AO2 comparison function for \ref stasis_app_control */
269 static int control_compare(void *obj, void *arg, int flags)
270 {
271         const struct stasis_app_control *object_left = obj;
272         const struct stasis_app_control *object_right = arg;
273         const char *right_key = arg;
274         int cmp;
275
276         switch (flags & OBJ_SEARCH_MASK) {
277         case OBJ_SEARCH_OBJECT:
278                 right_key = stasis_app_control_get_channel_id(object_right);
279                 /* Fall through */
280         case OBJ_SEARCH_KEY:
281                 cmp = strcmp(stasis_app_control_get_channel_id(object_left), right_key);
282                 break;
283         case OBJ_SEARCH_PARTIAL_KEY:
284                 /*
285                  * We could also use a partial key struct containing a length
286                  * so strlen() does not get called for every comparison instead.
287                  */
288                 cmp = strncmp(stasis_app_control_get_channel_id(object_left), right_key, strlen(right_key));
289                 break;
290         default:
291                 /*
292                  * What arg points to is specific to this traversal callback
293                  * and has no special meaning to astobj2.
294                  */
295                 cmp = 0;
296                 break;
297         }
298         if (cmp) {
299                 return 0;
300         }
301         /*
302          * At this point the traversal callback is identical to a sorted
303          * container.
304          */
305         return CMP_MATCH;
306 }
307
308 static int cleanup_cb(void *obj, void *arg, int flags)
309 {
310         struct stasis_app *app = obj;
311
312         if (!app_is_finished(app)) {
313                 return 0;
314         }
315
316         ast_verb(1, "Shutting down application '%s'\n", stasis_app_name(app));
317         app_shutdown(app);
318
319         return CMP_MATCH;
320
321 }
322
323 /*!
324  * \brief Clean up any old apps that we don't need any more.
325  */
326 static void cleanup(void)
327 {
328         ao2_callback(apps_registry, OBJ_MULTIPLE | OBJ_NODATA | OBJ_UNLINK,
329                 cleanup_cb, NULL);
330 }
331
332 struct stasis_app_control *stasis_app_control_create(struct ast_channel *chan)
333 {
334         return control_create(chan, NULL);
335 }
336
337 struct stasis_app_control *stasis_app_control_find_by_channel(
338         const struct ast_channel *chan)
339 {
340         if (chan == NULL) {
341                 return NULL;
342         }
343
344         return stasis_app_control_find_by_channel_id(
345                 ast_channel_uniqueid(chan));
346 }
347
348 struct stasis_app_control *stasis_app_control_find_by_channel_id(
349         const char *channel_id)
350 {
351         return ao2_find(app_controls, channel_id, OBJ_SEARCH_KEY);
352 }
353
354 /*! AO2 hash function for bridges container  */
355 static int bridges_hash(const void *obj, const int flags)
356 {
357         const struct ast_bridge *bridge;
358         const char *key;
359
360         switch (flags & OBJ_SEARCH_MASK) {
361         case OBJ_SEARCH_KEY:
362                 key = obj;
363                 break;
364         case OBJ_SEARCH_OBJECT:
365                 bridge = obj;
366                 key = bridge->uniqueid;
367                 break;
368         default:
369                 /* Hash can only work on something with a full key. */
370                 ast_assert(0);
371                 return 0;
372         }
373         return ast_str_hash(key);
374 }
375
376 /*! AO2 comparison function for bridges container */
377 static int bridges_compare(void *obj, void *arg, int flags)
378 {
379         const struct ast_bridge *object_left = obj;
380         const struct ast_bridge *object_right = arg;
381         const char *right_key = arg;
382         int cmp;
383
384         switch (flags & OBJ_SEARCH_MASK) {
385         case OBJ_SEARCH_OBJECT:
386                 right_key = object_right->uniqueid;
387                 /* Fall through */
388         case OBJ_SEARCH_KEY:
389                 cmp = strcmp(object_left->uniqueid, right_key);
390                 break;
391         case OBJ_SEARCH_PARTIAL_KEY:
392                 /*
393                  * We could also use a partial key struct containing a length
394                  * so strlen() does not get called for every comparison instead.
395                  */
396                 cmp = strncmp(object_left->uniqueid, right_key, strlen(right_key));
397                 break;
398         default:
399                 /*
400                  * What arg points to is specific to this traversal callback
401                  * and has no special meaning to astobj2.
402                  */
403                 cmp = 0;
404                 break;
405         }
406         if (cmp) {
407                 return 0;
408         }
409         /*
410          * At this point the traversal callback is identical to a sorted
411          * container.
412          */
413         return CMP_MATCH;
414 }
415
416 /*!
417  *  Used with app_bridges_moh and app_bridge_control, they provide links
418  *  between bridges and channels used for ARI application purposes
419  */
420 struct stasis_app_bridge_channel_wrapper {
421         AST_DECLARE_STRING_FIELDS(
422                 AST_STRING_FIELD(channel_id);
423                 AST_STRING_FIELD(bridge_id);
424         );
425 };
426
427 static void stasis_app_bridge_channel_wrapper_destructor(void *obj)
428 {
429         struct stasis_app_bridge_channel_wrapper *wrapper = obj;
430         ast_string_field_free_memory(wrapper);
431 }
432
433 /*! AO2 hash function for the bridges moh container */
434 static int bridges_channel_hash_fn(const void *obj, const int flags)
435 {
436         const struct stasis_app_bridge_channel_wrapper *wrapper;
437         const char *key;
438
439         switch (flags & OBJ_SEARCH_MASK) {
440         case OBJ_SEARCH_KEY:
441                 key = obj;
442                 break;
443         case OBJ_SEARCH_OBJECT:
444                 wrapper = obj;
445                 key = wrapper->bridge_id;
446                 break;
447         default:
448                 /* Hash can only work on something with a full key. */
449                 ast_assert(0);
450                 return 0;
451         }
452         return ast_str_hash(key);
453 }
454
455 static int bridges_channel_sort_fn(const void *obj_left, const void *obj_right, const int flags)
456 {
457         const struct stasis_app_bridge_channel_wrapper *left = obj_left;
458         const struct stasis_app_bridge_channel_wrapper *right = obj_right;
459         const char *right_key = obj_right;
460         int cmp;
461
462         switch (flags & OBJ_SEARCH_MASK) {
463         case OBJ_SEARCH_OBJECT:
464                 right_key = right->bridge_id;
465                 /* Fall through */
466         case OBJ_SEARCH_KEY:
467                 cmp = strcmp(left->bridge_id, right_key);
468                 break;
469         case OBJ_SEARCH_PARTIAL_KEY:
470                 cmp = strncmp(left->bridge_id, right_key, strlen(right_key));
471                 break;
472         default:
473                 /* Sort can only work on something with a full or partial key. */
474                 ast_assert(0);
475                 cmp = 0;
476                 break;
477         }
478         return cmp;
479 }
480
481 /*! Removes the bridge to music on hold channel link */
482 static void remove_bridge_moh(char *bridge_id)
483 {
484         ao2_find(app_bridges_moh, bridge_id, OBJ_SEARCH_KEY | OBJ_UNLINK | OBJ_NODATA);
485         ast_free(bridge_id);
486 }
487
488 /*! After bridge failure callback for moh channels */
489 static void moh_after_bridge_cb_failed(enum ast_bridge_after_cb_reason reason, void *data)
490 {
491         char *bridge_id = data;
492
493         remove_bridge_moh(bridge_id);
494 }
495
496 /*! After bridge callback for moh channels */
497 static void moh_after_bridge_cb(struct ast_channel *chan, void *data)
498 {
499         char *bridge_id = data;
500
501         remove_bridge_moh(bridge_id);
502 }
503
504 /*! Request a bridge MOH channel */
505 static struct ast_channel *prepare_bridge_moh_channel(void)
506 {
507         RAII_VAR(struct ast_format_cap *, cap, NULL, ao2_cleanup);
508
509         cap = ast_format_cap_alloc(AST_FORMAT_CAP_FLAG_DEFAULT);
510         if (!cap) {
511                 return NULL;
512         }
513
514         ast_format_cap_append(cap, ast_format_slin, 0);
515
516         return ast_request("Announcer", cap, NULL, NULL, "ARI_MOH", NULL);
517 }
518
519 /*! Provides the moh channel with a thread so it can actually play its music */
520 static void *moh_channel_thread(void *data)
521 {
522         struct ast_channel *moh_channel = data;
523
524         while (!ast_safe_sleep(moh_channel, 1000)) {
525         }
526
527         ast_moh_stop(moh_channel);
528         ast_hangup(moh_channel);
529
530         return NULL;
531 }
532
533 /*!
534  * \internal
535  * \brief Creates, pushes, and links a channel for playing music on hold to bridge
536  *
537  * \param bridge Which bridge this moh channel exists for
538  *
539  * \retval NULL if the channel could not be created, pushed, or linked
540  * \retval Reference to the channel on success
541  */
542 static struct ast_channel *bridge_moh_create(struct ast_bridge *bridge)
543 {
544         RAII_VAR(struct stasis_app_bridge_channel_wrapper *, new_wrapper, NULL, ao2_cleanup);
545         RAII_VAR(char *, bridge_id, ast_strdup(bridge->uniqueid), ast_free);
546         struct ast_channel *chan;
547         pthread_t threadid;
548
549         if (!bridge_id) {
550                 return NULL;
551         }
552
553         chan = prepare_bridge_moh_channel();
554         if (!chan) {
555                 return NULL;
556         }
557
558         if (stasis_app_channel_unreal_set_internal(chan)) {
559                 ast_hangup(chan);
560                 return NULL;
561         }
562
563         /* The after bridge callback assumes responsibility of the bridge_id. */
564         if (ast_bridge_set_after_callback(chan,
565                 moh_after_bridge_cb, moh_after_bridge_cb_failed, bridge_id)) {
566                 ast_hangup(chan);
567                 return NULL;
568         }
569         bridge_id = NULL;
570
571         if (ast_unreal_channel_push_to_bridge(chan, bridge,
572                 AST_BRIDGE_CHANNEL_FLAG_IMMOVABLE | AST_BRIDGE_CHANNEL_FLAG_LONELY)) {
573                 ast_hangup(chan);
574                 return NULL;
575         }
576
577         new_wrapper = ao2_alloc_options(sizeof(*new_wrapper),
578                 stasis_app_bridge_channel_wrapper_destructor, AO2_ALLOC_OPT_LOCK_NOLOCK);
579         if (!new_wrapper) {
580                 ast_hangup(chan);
581                 return NULL;
582         }
583
584         if (ast_string_field_init(new_wrapper, 32)) {
585                 ast_hangup(chan);
586                 return NULL;
587         }
588         ast_string_field_set(new_wrapper, bridge_id, bridge->uniqueid);
589         ast_string_field_set(new_wrapper, channel_id, ast_channel_uniqueid(chan));
590
591         if (!ao2_link_flags(app_bridges_moh, new_wrapper, OBJ_NOLOCK)) {
592                 ast_hangup(chan);
593                 return NULL;
594         }
595
596         if (ast_pthread_create_detached(&threadid, NULL, moh_channel_thread, chan)) {
597                 ast_log(LOG_ERROR, "Failed to create channel thread. Abandoning MOH channel creation.\n");
598                 ao2_unlink_flags(app_bridges_moh, new_wrapper, OBJ_NOLOCK);
599                 ast_hangup(chan);
600                 return NULL;
601         }
602
603         return chan;
604 }
605
606 struct ast_channel *stasis_app_bridge_moh_channel(struct ast_bridge *bridge)
607 {
608         RAII_VAR(struct stasis_app_bridge_channel_wrapper *, moh_wrapper, NULL, ao2_cleanup);
609
610         {
611                 SCOPED_AO2LOCK(lock, app_bridges_moh);
612
613                 moh_wrapper = ao2_find(app_bridges_moh, bridge->uniqueid, OBJ_SEARCH_KEY | OBJ_NOLOCK);
614                 if (!moh_wrapper) {
615                         return bridge_moh_create(bridge);
616                 }
617         }
618
619         return ast_channel_get_by_name(moh_wrapper->channel_id);
620 }
621
622 int stasis_app_bridge_moh_stop(struct ast_bridge *bridge)
623 {
624         RAII_VAR(struct stasis_app_bridge_channel_wrapper *, moh_wrapper, NULL, ao2_cleanup);
625         struct ast_channel *chan;
626
627         moh_wrapper = ao2_find(app_bridges_moh, bridge->uniqueid, OBJ_SEARCH_KEY | OBJ_UNLINK);
628         if (!moh_wrapper) {
629                 return -1;
630         }
631
632         chan = ast_channel_get_by_name(moh_wrapper->channel_id);
633         if (!chan) {
634                 return -1;
635         }
636
637         ast_moh_stop(chan);
638         ast_softhangup(chan, AST_CAUSE_NORMAL_CLEARING);
639         ao2_cleanup(chan);
640
641         return 0;
642 }
643
644 /*! Removes the bridge to playback channel link */
645 static void remove_bridge_playback(char *bridge_id)
646 {
647         struct stasis_app_bridge_channel_wrapper *wrapper;
648         struct stasis_app_control *control;
649
650         wrapper = ao2_find(app_bridges_playback, bridge_id, OBJ_SEARCH_KEY | OBJ_UNLINK);
651
652         if (wrapper) {
653                 control = stasis_app_control_find_by_channel_id(wrapper->channel_id);
654                 if (control) {
655                         ao2_unlink(app_controls, control);
656                         ao2_ref(control, -1);
657                 }
658                 ao2_ref(wrapper, -1);
659         }
660         ast_free(bridge_id);
661 }
662
663 static void playback_after_bridge_cb_failed(enum ast_bridge_after_cb_reason reason, void *data)
664 {
665         char *bridge_id = data;
666
667         remove_bridge_playback(bridge_id);
668 }
669
670 static void playback_after_bridge_cb(struct ast_channel *chan, void *data)
671 {
672         char *bridge_id = data;
673
674         remove_bridge_playback(bridge_id);
675 }
676
677 int stasis_app_bridge_playback_channel_add(struct ast_bridge *bridge,
678         struct ast_channel *chan,
679         struct stasis_app_control *control)
680 {
681         RAII_VAR(struct stasis_app_bridge_channel_wrapper *, new_wrapper, NULL, ao2_cleanup);
682         char *bridge_id = ast_strdup(bridge->uniqueid);
683
684         if (!bridge_id) {
685                 return -1;
686         }
687
688         if (ast_bridge_set_after_callback(chan,
689                 playback_after_bridge_cb, playback_after_bridge_cb_failed, bridge_id)) {
690                 ast_free(bridge_id);
691                 return -1;
692         }
693
694         new_wrapper = ao2_alloc_options(sizeof(*new_wrapper),
695                 stasis_app_bridge_channel_wrapper_destructor, AO2_ALLOC_OPT_LOCK_NOLOCK);
696         if (!new_wrapper) {
697                 return -1;
698         }
699
700         if (ast_string_field_init(new_wrapper, 32)) {
701                 return -1;
702         }
703
704         ast_string_field_set(new_wrapper, bridge_id, bridge->uniqueid);
705         ast_string_field_set(new_wrapper, channel_id, ast_channel_uniqueid(chan));
706
707         if (!ao2_link(app_bridges_playback, new_wrapper)) {
708                 return -1;
709         }
710
711         ao2_link(app_controls, control);
712         return 0;
713 }
714
715 void stasis_app_bridge_playback_channel_remove(char *bridge_id,
716         struct stasis_app_control *control)
717 {
718         struct stasis_app_bridge_channel_wrapper *wrapper;
719
720         wrapper = ao2_find(app_bridges_playback, bridge_id, OBJ_SEARCH_KEY | OBJ_UNLINK);
721         if (wrapper) {
722                 /* If wrapper is not found, then that means the after bridge callback has been
723                  * called or is in progress. No need to unlink the control here since that has
724                  * been done or is about to be done in the after bridge callback
725                  */
726                 ao2_unlink(app_controls, control);
727                 ao2_ref(wrapper, -1);
728         }
729 }
730
731 struct ast_channel *stasis_app_bridge_playback_channel_find(struct ast_bridge *bridge)
732 {
733         struct stasis_app_bridge_channel_wrapper *playback_wrapper;
734         struct ast_channel *chan;
735
736         playback_wrapper = ao2_find(app_bridges_playback, bridge->uniqueid, OBJ_SEARCH_KEY);
737         if (!playback_wrapper) {
738                 return NULL;
739         }
740
741         chan = ast_channel_get_by_name(playback_wrapper->channel_id);
742         ao2_ref(playback_wrapper, -1);
743         return chan;
744 }
745
746 struct ast_bridge *stasis_app_bridge_find_by_id(
747         const char *bridge_id)
748 {
749         return ao2_find(app_bridges, bridge_id, OBJ_SEARCH_KEY);
750 }
751
752
753 /*!
754  * \brief In addition to running ao2_cleanup(), this function also removes the
755  * object from the app_controls container.
756  */
757 static void control_unlink(struct stasis_app_control *control)
758 {
759         if (!control) {
760                 return;
761         }
762
763         ao2_unlink(app_controls, control);
764         ao2_cleanup(control);
765 }
766
767 static struct ast_bridge *bridge_create_common(const char *type, const char *name, const char *id, int invisible)
768 {
769         struct ast_bridge *bridge;
770         char *requested_type, *requested_types = ast_strdupa(S_OR(type, "mixing"));
771         int capabilities = 0;
772         int flags = AST_BRIDGE_FLAG_MERGE_INHIBIT_FROM | AST_BRIDGE_FLAG_MERGE_INHIBIT_TO
773                 | AST_BRIDGE_FLAG_SWAP_INHIBIT_FROM | AST_BRIDGE_FLAG_SWAP_INHIBIT_TO
774                 | AST_BRIDGE_FLAG_TRANSFER_BRIDGE_ONLY;
775
776         if (invisible) {
777                 flags |= AST_BRIDGE_FLAG_INVISIBLE;
778         }
779
780         while ((requested_type = strsep(&requested_types, ","))) {
781                 requested_type = ast_strip(requested_type);
782
783                 if (!strcmp(requested_type, "mixing")) {
784                         capabilities |= STASIS_BRIDGE_MIXING_CAPABILITIES;
785                         flags |= AST_BRIDGE_FLAG_SMART;
786                 } else if (!strcmp(requested_type, "holding")) {
787                         capabilities |= AST_BRIDGE_CAPABILITY_HOLDING;
788                 } else if (!strcmp(requested_type, "dtmf_events") ||
789                         !strcmp(requested_type, "proxy_media")) {
790                         capabilities &= ~AST_BRIDGE_CAPABILITY_NATIVE;
791                 }
792         }
793
794         if (!capabilities
795                 /* Holding and mixing capabilities don't mix. */
796                 || ((capabilities & AST_BRIDGE_CAPABILITY_HOLDING)
797                         && (capabilities & (STASIS_BRIDGE_MIXING_CAPABILITIES)))) {
798                 return NULL;
799         }
800
801         bridge = bridge_stasis_new(capabilities, flags, name, id);
802         if (bridge) {
803                 if (!ao2_link(app_bridges, bridge)) {
804                         ast_bridge_destroy(bridge, 0);
805                         bridge = NULL;
806                 }
807         }
808         return bridge;
809 }
810
811 struct ast_bridge *stasis_app_bridge_create(const char *type, const char *name, const char *id)
812 {
813         return bridge_create_common(type, name, id, 0);
814 }
815
816 struct ast_bridge *stasis_app_bridge_create_invisible(const char *type, const char *name, const char *id)
817 {
818         return bridge_create_common(type, name, id, 1);
819 }
820
821 void stasis_app_bridge_destroy(const char *bridge_id)
822 {
823         struct ast_bridge *bridge = stasis_app_bridge_find_by_id(bridge_id);
824         if (!bridge) {
825                 return;
826         }
827         ao2_unlink(app_bridges, bridge);
828         ast_bridge_destroy(bridge, 0);
829 }
830
831 struct replace_channel_store {
832         struct ast_channel_snapshot *snapshot;
833         char *app;
834 };
835
836 static void replace_channel_destroy(void *obj)
837 {
838         struct replace_channel_store *replace = obj;
839
840         ao2_cleanup(replace->snapshot);
841         ast_free(replace->app);
842         ast_free(replace);
843 }
844
845 static const struct ast_datastore_info replace_channel_store_info = {
846         .type = "replace-channel-store",
847         .destroy = replace_channel_destroy,
848 };
849
850 static struct replace_channel_store *get_replace_channel_store(struct ast_channel *chan, int no_create)
851 {
852         struct ast_datastore *datastore;
853
854         SCOPED_CHANNELLOCK(lock, chan);
855         datastore = ast_channel_datastore_find(chan, &replace_channel_store_info, NULL);
856         if (!datastore) {
857                 if (no_create) {
858                         return NULL;
859                 }
860
861                 datastore = ast_datastore_alloc(&replace_channel_store_info, NULL);
862                 if (!datastore) {
863                         return NULL;
864                 }
865                 ast_channel_datastore_add(chan, datastore);
866         }
867
868         if (!datastore->data) {
869                 datastore->data = ast_calloc(1, sizeof(struct replace_channel_store));
870         }
871         return datastore->data;
872 }
873
874 int app_set_replace_channel_snapshot(struct ast_channel *chan, struct ast_channel_snapshot *replace_snapshot)
875 {
876         struct replace_channel_store *replace = get_replace_channel_store(chan, 0);
877
878         if (!replace) {
879                 return -1;
880         }
881
882         ao2_replace(replace->snapshot, replace_snapshot);
883         return 0;
884 }
885
886 int app_set_replace_channel_app(struct ast_channel *chan, const char *replace_app)
887 {
888         struct replace_channel_store *replace = get_replace_channel_store(chan, 0);
889
890         if (!replace) {
891                 return -1;
892         }
893
894         ast_free(replace->app);
895         replace->app = NULL;
896
897         if (replace_app) {
898                 replace->app = ast_strdup(replace_app);
899                 if (!replace->app) {
900                         return -1;
901                 }
902         }
903
904         return 0;
905 }
906
907 static struct ast_channel_snapshot *get_replace_channel_snapshot(struct ast_channel *chan)
908 {
909         struct replace_channel_store *replace = get_replace_channel_store(chan, 1);
910         struct ast_channel_snapshot *replace_channel_snapshot;
911
912         if (!replace) {
913                 return NULL;
914         }
915
916         replace_channel_snapshot = replace->snapshot;
917         replace->snapshot = NULL;
918
919         return replace_channel_snapshot;
920 }
921
922 char *app_get_replace_channel_app(struct ast_channel *chan)
923 {
924         struct replace_channel_store *replace = get_replace_channel_store(chan, 1);
925         char *replace_channel_app;
926
927         if (!replace) {
928                 return NULL;
929         }
930
931         replace_channel_app = replace->app;
932         replace->app = NULL;
933
934         return replace_channel_app;
935 }
936
937 static void start_message_blob_dtor(void *obj)
938 {
939         struct start_message_blob *payload = obj;
940
941         ao2_cleanup(payload->channel);
942         ao2_cleanup(payload->replace_channel);
943         ast_json_unref(payload->blob);
944 }
945
946 static int send_start_msg_snapshots(struct ast_channel *chan, struct stasis_app *app,
947         int argc, char *argv[], struct ast_channel_snapshot *snapshot,
948         struct ast_channel_snapshot *replace_channel_snapshot)
949 {
950         RAII_VAR(struct ast_json *, json_blob, NULL, ast_json_unref);
951         struct ast_json *json_args;
952         RAII_VAR(struct start_message_blob *, payload, NULL, ao2_cleanup);
953         struct stasis_message *msg;
954         int i;
955
956         if (app_subscribe_channel(app, chan)) {
957                 ast_log(LOG_ERROR, "Error subscribing app '%s' to channel '%s'\n",
958                         app_name(app), ast_channel_name(chan));
959                 return -1;
960         }
961
962         payload = ao2_alloc(sizeof(*payload), start_message_blob_dtor);
963         if (!payload) {
964                 ast_log(LOG_ERROR, "Error packing JSON for StasisStart message\n");
965                 return -1;
966         }
967
968         payload->channel = ao2_bump(snapshot);
969         payload->replace_channel = ao2_bump(replace_channel_snapshot);
970
971         json_blob = ast_json_pack("{s: s, s: o, s: []}",
972                 "app", app_name(app),
973                 "timestamp", ast_json_timeval(ast_tvnow(), NULL),
974                 "args");
975         if (!json_blob) {
976                 ast_log(LOG_ERROR, "Error packing JSON for StasisStart message\n");
977                 return -1;
978         }
979
980         /* Append arguments to args array */
981         json_args = ast_json_object_get(json_blob, "args");
982         ast_assert(json_args != NULL);
983         for (i = 0; i < argc; ++i) {
984                 int r = ast_json_array_append(json_args,
985                                               ast_json_string_create(argv[i]));
986                 if (r != 0) {
987                         ast_log(LOG_ERROR, "Error appending to StasisStart message\n");
988                         return -1;
989                 }
990         }
991
992         payload->blob = ast_json_ref(json_blob);
993
994         msg = stasis_message_create(start_message_type(), payload);
995         if (!msg) {
996                 ast_log(LOG_ERROR, "Error sending StasisStart message\n");
997                 return -1;
998         }
999
1000         if (replace_channel_snapshot) {
1001                 app_unsubscribe_channel_id(app, replace_channel_snapshot->uniqueid);
1002         }
1003         stasis_publish(ast_app_get_topic(app), msg);
1004         ao2_ref(msg, -1);
1005         return 0;
1006 }
1007
1008 static int send_start_msg(struct stasis_app *app, struct ast_channel *chan,
1009         int argc, char *argv[])
1010 {
1011         RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
1012         RAII_VAR(struct ast_channel_snapshot *, replace_channel_snapshot,
1013                 NULL, ao2_cleanup);
1014
1015         ast_assert(chan != NULL);
1016
1017         replace_channel_snapshot = get_replace_channel_snapshot(chan);
1018
1019         /* Set channel info */
1020         ast_channel_lock(chan);
1021         snapshot = ast_channel_snapshot_create(chan);
1022         ast_channel_unlock(chan);
1023         if (!snapshot) {
1024                 return -1;
1025         }
1026         return send_start_msg_snapshots(chan, app, argc, argv, snapshot, replace_channel_snapshot);
1027 }
1028
1029 static void remove_masquerade_store(struct ast_channel *chan);
1030
1031 int app_send_end_msg(struct stasis_app *app, struct ast_channel *chan)
1032 {
1033         struct stasis_message_sanitizer *sanitize = stasis_app_get_sanitizer();
1034         struct ast_json *blob;
1035         struct stasis_message *msg;
1036
1037         if (sanitize && sanitize->channel
1038                 && sanitize->channel(chan)) {
1039                 return 0;
1040         }
1041
1042         blob = ast_json_pack("{s: s}", "app", app_name(app));
1043         if (!blob) {
1044                 ast_log(LOG_ERROR, "Error packing JSON for StasisEnd message\n");
1045                 return -1;
1046         }
1047
1048         remove_masquerade_store(chan);
1049         app_unsubscribe_channel(app, chan);
1050         msg = ast_channel_blob_create(chan, end_message_type(), blob);
1051         if (msg) {
1052                 stasis_publish(ast_app_get_topic(app), msg);
1053         }
1054         ao2_cleanup(msg);
1055         ast_json_unref(blob);
1056
1057         return 0;
1058 }
1059
1060 static int masq_match_cb(void *obj, void *data, int flags)
1061 {
1062         struct stasis_app_control *control = obj;
1063         struct ast_channel *chan = data;
1064
1065         if (!strcmp(ast_channel_uniqueid(chan),
1066                 stasis_app_control_get_channel_id(control))) {
1067                 return CMP_MATCH;
1068         }
1069
1070         return 0;
1071 }
1072
1073 static void channel_stolen_cb(void *data, struct ast_channel *old_chan, struct ast_channel *new_chan)
1074 {
1075         struct stasis_app_control *control;
1076
1077         /* find control */
1078         control = ao2_callback(app_controls, 0, masq_match_cb, old_chan);
1079         if (!control) {
1080                 ast_log(LOG_ERROR, "Could not find control for masqueraded channel\n");
1081                 return;
1082         }
1083
1084         /* send the StasisEnd message to the app */
1085         stasis_app_channel_set_stasis_end_published(new_chan);
1086         app_send_end_msg(control_app(control), new_chan);
1087
1088         /* remove the datastore */
1089         remove_masquerade_store(old_chan);
1090
1091         ao2_cleanup(control);
1092 }
1093
1094 static void channel_replaced_cb(void *data, struct ast_channel *old_chan, struct ast_channel *new_chan)
1095 {
1096         RAII_VAR(struct ast_channel_snapshot *, new_snapshot, NULL, ao2_cleanup);
1097         RAII_VAR(struct ast_channel_snapshot *, old_snapshot, NULL, ao2_cleanup);
1098         struct stasis_app_control *control;
1099
1100         /* At this point, new_chan is the channel pointer that is in Stasis() and
1101          * has the unknown channel's name in it while old_chan is the channel pointer
1102          * that is not in Stasis(), but has the guts of the channel that Stasis() knows
1103          * about */
1104
1105         /* grab a snapshot for the channel that is jumping into Stasis() */
1106         new_snapshot = ast_channel_snapshot_get_latest(ast_channel_uniqueid(new_chan));
1107         if (!new_snapshot) {
1108                 ast_log(LOG_ERROR, "Could not get snapshot for masquerading channel\n");
1109                 return;
1110         }
1111
1112         /* grab a snapshot for the channel that has been kicked out of Stasis() */
1113         old_snapshot = ast_channel_snapshot_get_latest(ast_channel_uniqueid(old_chan));
1114         if (!old_snapshot) {
1115                 ast_log(LOG_ERROR, "Could not get snapshot for masqueraded channel\n");
1116                 return;
1117         }
1118
1119         /* find, unlink, and relink control since the channel has a new name and
1120          * its hash has likely changed */
1121         control = ao2_callback(app_controls, OBJ_UNLINK, masq_match_cb, new_chan);
1122         if (!control) {
1123                 ast_log(LOG_ERROR, "Could not find control for masquerading channel\n");
1124                 return;
1125         }
1126         ao2_link(app_controls, control);
1127
1128
1129         /* send the StasisStart with replace_channel to the app */
1130         send_start_msg_snapshots(new_chan, control_app(control), 0, NULL, new_snapshot,
1131                 old_snapshot);
1132         /* send the StasisEnd message to the app */
1133         app_send_end_msg(control_app(control), old_chan);
1134
1135         ao2_cleanup(control);
1136 }
1137
1138 static const struct ast_datastore_info masquerade_store_info = {
1139         .type = "stasis-masqerade",
1140         .chan_fixup = channel_stolen_cb,
1141         .chan_breakdown = channel_replaced_cb,
1142 };
1143
1144 static int has_masquerade_store(struct ast_channel *chan)
1145 {
1146         SCOPED_CHANNELLOCK(lock, chan);
1147         return !!ast_channel_datastore_find(chan, &masquerade_store_info, NULL);
1148 }
1149
1150 static int add_masquerade_store(struct ast_channel *chan)
1151 {
1152         struct ast_datastore *datastore;
1153
1154         SCOPED_CHANNELLOCK(lock, chan);
1155         if (ast_channel_datastore_find(chan, &masquerade_store_info, NULL)) {
1156                 return 0;
1157         }
1158
1159         datastore = ast_datastore_alloc(&masquerade_store_info, NULL);
1160         if (!datastore) {
1161                 return -1;
1162         }
1163
1164         ast_channel_datastore_add(chan, datastore);
1165
1166         return 0;
1167 }
1168
1169 static void remove_masquerade_store(struct ast_channel *chan)
1170 {
1171         struct ast_datastore *datastore;
1172
1173         SCOPED_CHANNELLOCK(lock, chan);
1174         datastore = ast_channel_datastore_find(chan, &masquerade_store_info, NULL);
1175         if (!datastore) {
1176                 return;
1177         }
1178
1179         ast_channel_datastore_remove(chan, datastore);
1180         ast_datastore_free(datastore);
1181 }
1182
1183 void stasis_app_control_execute_until_exhausted(struct ast_channel *chan, struct stasis_app_control *control)
1184 {
1185         while (!control_is_done(control)) {
1186                 int command_count;
1187                 command_count = control_dispatch_all(control, chan);
1188
1189                 ao2_lock(control);
1190
1191                 if (control_command_count(control)) {
1192                         /* If the command queue isn't empty, something added to the queue before it was locked. */
1193                         ao2_unlock(control);
1194                         continue;
1195                 }
1196
1197                 if (command_count == 0 || ast_channel_fdno(chan) == -1) {
1198                         control_mark_done(control);
1199                         ao2_unlock(control);
1200                         break;
1201                 }
1202                 ao2_unlock(control);
1203         }
1204 }
1205
1206 int stasis_app_control_is_done(struct stasis_app_control *control)
1207 {
1208         return control_is_done(control);
1209 }
1210
1211 void stasis_app_control_flush_queue(struct stasis_app_control *control)
1212 {
1213         control_flush_queue(control);
1214 }
1215
1216 struct ast_datastore_info set_end_published_info = {
1217         .type = "stasis_end_published",
1218 };
1219
1220 void stasis_app_channel_set_stasis_end_published(struct ast_channel *chan)
1221 {
1222         struct ast_datastore *datastore;
1223
1224         datastore = ast_datastore_alloc(&set_end_published_info, NULL);
1225         if (datastore) {
1226                 ast_channel_lock(chan);
1227                 ast_channel_datastore_add(chan, datastore);
1228                 ast_channel_unlock(chan);
1229         }
1230 }
1231
1232 int stasis_app_channel_is_stasis_end_published(struct ast_channel *chan)
1233 {
1234         struct ast_datastore *datastore;
1235
1236         ast_channel_lock(chan);
1237         datastore = ast_channel_datastore_find(chan, &set_end_published_info, NULL);
1238         ast_channel_unlock(chan);
1239
1240         return datastore ? 1 : 0;
1241 }
1242
1243 static void remove_stasis_end_published(struct ast_channel *chan)
1244 {
1245         struct ast_datastore *datastore;
1246
1247         ast_channel_lock(chan);
1248         datastore = ast_channel_datastore_find(chan, &set_end_published_info, NULL);
1249         if (datastore) {
1250                 ast_channel_datastore_remove(chan, datastore);
1251                 ast_datastore_free(datastore);
1252         }
1253         ast_channel_unlock(chan);
1254 }
1255
1256 /*! /brief Stasis dialplan application callback */
1257 int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc,
1258                     char *argv[])
1259 {
1260         SCOPED_MODULE_USE(ast_module_info->self);
1261
1262         RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
1263         RAII_VAR(struct stasis_app_control *, control, NULL, control_unlink);
1264         struct ast_bridge *bridge = NULL;
1265         int res = 0;
1266         int needs_depart;
1267
1268         ast_assert(chan != NULL);
1269
1270         /* Just in case there's a lingering indication that the channel has had a stasis
1271          * end published on it, remove that now.
1272          */
1273         remove_stasis_end_published(chan);
1274
1275         if (!apps_registry) {
1276                 return -1;
1277         }
1278
1279         app = ao2_find(apps_registry, app_name, OBJ_SEARCH_KEY);
1280         if (!app) {
1281                 ast_log(LOG_ERROR,
1282                         "Stasis app '%s' not registered\n", app_name);
1283                 return -1;
1284         }
1285         if (!app_is_active(app)) {
1286                 ast_log(LOG_ERROR,
1287                         "Stasis app '%s' not active\n", app_name);
1288                 return -1;
1289         }
1290
1291         control = control_create(chan, app);
1292         if (!control) {
1293                 ast_log(LOG_ERROR, "Allocated failed\n");
1294                 return -1;
1295         }
1296         ao2_link(app_controls, control);
1297
1298         if (add_masquerade_store(chan)) {
1299                 ast_log(LOG_ERROR, "Failed to attach masquerade detector\n");
1300                 return -1;
1301         }
1302
1303         res = send_start_msg(app, chan, argc, argv);
1304         if (res != 0) {
1305                 ast_log(LOG_ERROR,
1306                         "Error sending start message to '%s'\n", app_name);
1307                 remove_masquerade_store(chan);
1308                 return -1;
1309         }
1310
1311         /* Pull queued prestart commands and execute */
1312         control_prestart_dispatch_all(control, chan);
1313
1314         while (!control_is_done(control)) {
1315                 RAII_VAR(struct ast_frame *, f, NULL, ast_frame_dtor);
1316                 int r;
1317                 int command_count;
1318                 RAII_VAR(struct ast_bridge *, last_bridge, NULL, ao2_cleanup);
1319
1320                 /* Check to see if a bridge absorbed our hangup frame */
1321                 if (ast_check_hangup_locked(chan)) {
1322                         control_mark_done(control);
1323                         break;
1324                 }
1325
1326                 last_bridge = bridge;
1327                 bridge = ao2_bump(stasis_app_get_bridge(control));
1328
1329                 if (bridge != last_bridge) {
1330                         app_unsubscribe_bridge(app, last_bridge);
1331                         if (bridge) {
1332                                 app_subscribe_bridge(app, bridge);
1333                         }
1334                 }
1335
1336                 if (bridge) {
1337                         /* Bridge/dial is handling channel frames */
1338                         control_wait(control);
1339                         control_dispatch_all(control, chan);
1340                         continue;
1341                 }
1342
1343                 r = ast_waitfor(chan, MAX_WAIT_MS);
1344
1345                 if (r < 0) {
1346                         ast_debug(3, "%s: Poll error\n",
1347                                   ast_channel_uniqueid(chan));
1348                         control_mark_done(control);
1349                         break;
1350                 }
1351
1352                 command_count = control_dispatch_all(control, chan);
1353
1354                 if (command_count > 0 && ast_channel_fdno(chan) == -1) {
1355                         /* Command drained the channel; wait for next frame */
1356                         continue;
1357                 }
1358
1359                 if (r == 0) {
1360                         /* Timeout */
1361                         continue;
1362                 }
1363
1364                 f = ast_read(chan);
1365                 if (!f) {
1366                         /* Continue on in the dialplan */
1367                         ast_debug(3, "%s: Hangup (no more frames)\n",
1368                                 ast_channel_uniqueid(chan));
1369                         control_mark_done(control);
1370                         break;
1371                 }
1372
1373                 if (f->frametype == AST_FRAME_CONTROL) {
1374                         if (f->subclass.integer == AST_CONTROL_HANGUP) {
1375                                 /* Continue on in the dialplan */
1376                                 ast_debug(3, "%s: Hangup\n",
1377                                         ast_channel_uniqueid(chan));
1378                                 control_mark_done(control);
1379                                 break;
1380                         }
1381                 }
1382         }
1383
1384         ast_channel_lock(chan);
1385         needs_depart = (ast_channel_internal_bridge_channel(chan) != NULL);
1386         ast_channel_unlock(chan);
1387         if (needs_depart) {
1388                 ast_bridge_depart(chan);
1389         }
1390
1391         app_unsubscribe_bridge(app, stasis_app_get_bridge(control));
1392         ao2_cleanup(bridge);
1393
1394         /* Only publish a stasis_end event if it hasn't already been published */
1395         if (!stasis_app_channel_is_stasis_end_published(chan)) {
1396                 /* A masquerade has occurred and this message will be wrong so it
1397                  * has already been sent elsewhere. */
1398                 res = has_masquerade_store(chan) && app_send_end_msg(app, chan);
1399                 if (res != 0) {
1400                         ast_log(LOG_ERROR,
1401                                 "Error sending end message to %s\n", app_name);
1402                         return res;
1403                 }
1404         } else {
1405                 remove_stasis_end_published(chan);
1406         }
1407
1408         control_flush_queue(control);
1409
1410         /* Stop any lingering silence generator */
1411         control_silence_stop_now(control);
1412
1413         /* There's an off chance that app is ready for cleanup. Go ahead
1414          * and clean up, just in case
1415          */
1416         cleanup();
1417
1418         /* The control needs to be removed from the controls container in
1419          * case a new PBX is started and ends up coming back into Stasis.
1420          */
1421         ao2_cleanup(app);
1422         app = NULL;
1423         control_unlink(control);
1424         control = NULL;
1425
1426         if (!ast_channel_pbx(chan)) {
1427                 int chan_hungup;
1428
1429                 /* The ASYNCGOTO softhangup flag may have broken the channel out of
1430                  * its bridge to run dialplan, so if there's no pbx on the channel
1431                  * let it run dialplan here. Otherwise, it will run when this
1432                  * application exits. */
1433                 ast_channel_lock(chan);
1434                 ast_channel_clear_softhangup(chan, AST_SOFTHANGUP_ASYNCGOTO);
1435                 chan_hungup = ast_check_hangup(chan);
1436                 ast_channel_unlock(chan);
1437
1438                 if (!chan_hungup) {
1439                         struct ast_pbx_args pbx_args;
1440
1441                         memset(&pbx_args, 0, sizeof(pbx_args));
1442                         pbx_args.no_hangup_chan = 1;
1443
1444                         res = ast_pbx_run_args(chan, &pbx_args);
1445                 }
1446         }
1447
1448         return res;
1449 }
1450
1451 int stasis_app_send(const char *app_name, struct ast_json *message)
1452 {
1453         RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
1454
1455         if (!apps_registry) {
1456                 return -1;
1457         }
1458
1459         app = ao2_find(apps_registry, app_name, OBJ_SEARCH_KEY);
1460         if (!app) {
1461                 /* XXX We can do a better job handling late binding, queueing up
1462                  * the call for a few seconds to wait for the app to register.
1463                  */
1464                 ast_log(LOG_WARNING,
1465                         "Stasis app '%s' not registered\n", app_name);
1466                 return -1;
1467         }
1468         app_send(app, message);
1469         return 0;
1470 }
1471
1472 static struct stasis_app *find_app_by_name(const char *app_name)
1473 {
1474         struct stasis_app *res = NULL;
1475
1476         if (!apps_registry) {
1477                 return NULL;
1478         }
1479
1480         if (!ast_strlen_zero(app_name)) {
1481                 res = ao2_find(apps_registry, app_name, OBJ_SEARCH_KEY);
1482         }
1483
1484         if (!res) {
1485                 ast_log(LOG_WARNING, "Could not find app '%s'\n",
1486                         app_name ? : "(null)");
1487         }
1488         return res;
1489 }
1490
1491 struct stasis_app *stasis_app_get_by_name(const char *name)
1492 {
1493         return find_app_by_name(name);
1494 }
1495
1496 static int append_name(void *obj, void *arg, int flags)
1497 {
1498         struct stasis_app *app = obj;
1499         struct ao2_container *apps = arg;
1500
1501         ast_str_container_add(apps, stasis_app_name(app));
1502         return 0;
1503 }
1504
1505 struct ao2_container *stasis_app_get_all(void)
1506 {
1507         RAII_VAR(struct ao2_container *, apps, NULL, ao2_cleanup);
1508
1509         if (!apps_registry) {
1510                 return NULL;
1511         }
1512
1513         apps = ast_str_container_alloc(1);
1514         if (!apps) {
1515                 return NULL;
1516         }
1517
1518         ao2_callback(apps_registry, OBJ_NODATA, append_name, apps);
1519
1520         return ao2_bump(apps);
1521 }
1522
1523 static int __stasis_app_register(const char *app_name, stasis_app_cb handler, void *data, int all_events)
1524 {
1525         RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
1526
1527         if (!apps_registry) {
1528                 return -1;
1529         }
1530
1531         ao2_lock(apps_registry);
1532         app = ao2_find(apps_registry, app_name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1533         if (app) {
1534                 app_update(app, handler, data);
1535         } else {
1536                 app = app_create(app_name, handler, data, all_events ? STASIS_APP_SUBSCRIBE_ALL : STASIS_APP_SUBSCRIBE_MANUAL);
1537                 if (app) {
1538                         if (all_events) {
1539                                 struct stasis_app_event_source *source;
1540                                 SCOPED_LOCK(lock, &event_sources, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
1541
1542                                 AST_LIST_TRAVERSE(&event_sources, source, next) {
1543                                         if (!source->subscribe) {
1544                                                 continue;
1545                                         }
1546
1547                                         source->subscribe(app, NULL);
1548                                 }
1549                         }
1550                         ao2_link_flags(apps_registry, app, OBJ_NOLOCK);
1551                 } else {
1552                         ao2_unlock(apps_registry);
1553                         return -1;
1554                 }
1555         }
1556
1557         /* We lazily clean up the apps_registry, because it's good enough to
1558          * prevent memory leaks, and we're lazy.
1559          */
1560         cleanup();
1561         ao2_unlock(apps_registry);
1562         return 0;
1563 }
1564
1565 int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data)
1566 {
1567         return __stasis_app_register(app_name, handler, data, 0);
1568 }
1569
1570 int stasis_app_register_all(const char *app_name, stasis_app_cb handler, void *data)
1571 {
1572         return __stasis_app_register(app_name, handler, data, 1);
1573 }
1574
1575 void stasis_app_unregister(const char *app_name)
1576 {
1577         RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
1578
1579         if (!app_name) {
1580                 return;
1581         }
1582
1583         if (!apps_registry) {
1584                 return;
1585         }
1586
1587         app = ao2_find(apps_registry, app_name, OBJ_SEARCH_KEY);
1588         if (!app) {
1589                 ast_log(LOG_ERROR,
1590                         "Stasis app '%s' not registered\n", app_name);
1591                 return;
1592         }
1593
1594         app_deactivate(app);
1595
1596         /* There's a decent chance that app is ready for cleanup. Go ahead
1597          * and clean up, just in case
1598          */
1599         cleanup();
1600 }
1601
1602 void stasis_app_register_event_source(struct stasis_app_event_source *obj)
1603 {
1604         SCOPED_LOCK(lock, &event_sources, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
1605         AST_LIST_INSERT_TAIL(&event_sources, obj, next);
1606         /* only need to bump the module ref on non-core sources because the
1607            core ones are [un]registered by this module. */
1608         if (!stasis_app_is_core_event_source(obj)) {
1609                 ast_module_ref(ast_module_info->self);
1610         }
1611 }
1612
1613 void stasis_app_unregister_event_source(struct stasis_app_event_source *obj)
1614 {
1615         struct stasis_app_event_source *source;
1616         SCOPED_LOCK(lock, &event_sources, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
1617         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&event_sources, source, next) {
1618                 if (source == obj) {
1619                         AST_RWLIST_REMOVE_CURRENT(next);
1620                         if (!stasis_app_is_core_event_source(obj)) {
1621                                 ast_module_unref(ast_module_info->self);
1622                         }
1623                         break;
1624                 }
1625         }
1626         AST_RWLIST_TRAVERSE_SAFE_END;
1627 }
1628
1629 /*!
1630  * \internal
1631  * \brief Convert event source data to JSON.
1632  *
1633  * Calls each event source that has a "to_json" handler allowing each
1634  * source to add data to the given JSON object.
1635  *
1636  * \param app application associated with the event source
1637  * \param json a json object to "fill"
1638  *
1639  * \retval The given json object.
1640  */
1641 static struct ast_json *app_event_sources_to_json(
1642         const struct stasis_app *app, struct ast_json *json)
1643 {
1644         struct stasis_app_event_source *source;
1645         SCOPED_LOCK(lock, &event_sources, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
1646         AST_LIST_TRAVERSE(&event_sources, source, next) {
1647                 if (source->to_json) {
1648                         source->to_json(app, json);
1649                 }
1650         }
1651         return json;
1652 }
1653
1654 static struct ast_json *stasis_app_object_to_json(struct stasis_app *app)
1655 {
1656         if (!app) {
1657                 return NULL;
1658         }
1659
1660         return app_event_sources_to_json(app, app_to_json(app));
1661 }
1662
1663 struct ast_json *stasis_app_to_json(const char *app_name)
1664 {
1665         RAII_VAR(struct stasis_app *, app, find_app_by_name(app_name), ao2_cleanup);
1666
1667         return stasis_app_object_to_json(app);
1668 }
1669
1670 /*!
1671  * \internal
1672  * \brief Finds an event source that matches a uri scheme.
1673  *
1674  * Uri(s) should begin with a particular scheme that can be matched
1675  * against an event source.
1676  *
1677  * \param uri uri containing a scheme to match
1678  *
1679  * \retval an event source if found, NULL otherwise.
1680  */
1681 static struct stasis_app_event_source *app_event_source_find(const char *uri)
1682 {
1683         struct stasis_app_event_source *source;
1684         SCOPED_LOCK(lock, &event_sources, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
1685         AST_LIST_TRAVERSE(&event_sources, source, next) {
1686                 if (ast_begins_with(uri, source->scheme)) {
1687                         return source;
1688                 }
1689         }
1690         return NULL;
1691 }
1692
1693 /*!
1694  * \internal
1695  * \brief Callback for subscription handling
1696  *
1697  * \param app [un]subscribing application
1698  * \param uri scheme:id of an event source
1699  * \param event_source being [un]subscribed [from]to
1700  *
1701  * \retval stasis_app_subscribe_res return code.
1702  */
1703 typedef enum stasis_app_subscribe_res (*app_subscription_handler)(
1704         struct stasis_app *app, const char *uri,
1705         struct stasis_app_event_source *event_source);
1706
1707 /*!
1708  * \internal
1709  * \brief Subscriptions handler for application [un]subscribing.
1710  *
1711  * \param app_name Name of the application to subscribe.
1712  * \param event_source_uris URIs for the event sources to subscribe to.
1713  * \param event_sources_count Array size of event_source_uris.
1714  * \param json Optional output pointer for JSON representation of the app
1715  *             after adding the subscription.
1716  * \param handler [un]subscribe handler
1717  *
1718  * \retval stasis_app_subscribe_res return code.
1719  */
1720 static enum stasis_app_subscribe_res app_handle_subscriptions(
1721         const char *app_name, const char **event_source_uris,
1722         int event_sources_count, struct ast_json **json,
1723         app_subscription_handler handler)
1724 {
1725         RAII_VAR(struct stasis_app *, app, find_app_by_name(app_name), ao2_cleanup);
1726         int i;
1727
1728         if (!app) {
1729                 return STASIS_ASR_APP_NOT_FOUND;
1730         }
1731
1732         for (i = 0; i < event_sources_count; ++i) {
1733                 const char *uri = event_source_uris[i];
1734                 enum stasis_app_subscribe_res res = STASIS_ASR_INTERNAL_ERROR;
1735                 struct stasis_app_event_source *event_source;
1736
1737                 if (!(event_source = app_event_source_find(uri))) {
1738                         ast_log(LOG_WARNING, "Invalid scheme: %s\n", uri);
1739                         return STASIS_ASR_EVENT_SOURCE_BAD_SCHEME;
1740                 }
1741
1742                 if (handler &&
1743                     ((res = handler(app, uri, event_source)))) {
1744                         return res;
1745                 }
1746         }
1747
1748         if (json) {
1749                 ast_debug(3, "%s: Successful; setting results\n", app_name);
1750                 *json = stasis_app_object_to_json(app);
1751         }
1752
1753         return STASIS_ASR_OK;
1754 }
1755
1756 enum stasis_app_subscribe_res stasis_app_subscribe_channel(const char *app_name,
1757         struct ast_channel *chan)
1758 {
1759         RAII_VAR(struct stasis_app *, app, find_app_by_name(app_name), ao2_cleanup);
1760         int res;
1761
1762         if (!app) {
1763                 return STASIS_ASR_APP_NOT_FOUND;
1764         }
1765
1766         ast_debug(3, "%s: Subscribing to %s\n", app_name, ast_channel_uniqueid(chan));
1767
1768         res = app_subscribe_channel(app, chan);
1769         if (res != 0) {
1770                 ast_log(LOG_ERROR, "Error subscribing app '%s' to channel '%s'\n",
1771                         app_name, ast_channel_uniqueid(chan));
1772                 return STASIS_ASR_INTERNAL_ERROR;
1773         }
1774
1775         return STASIS_ASR_OK;
1776 }
1777
1778
1779 /*!
1780  * \internal
1781  * \brief Subscribe an app to an event source.
1782  *
1783  * \param app subscribing application
1784  * \param uri scheme:id of an event source
1785  * \param event_source being subscribed to
1786  *
1787  * \retval stasis_app_subscribe_res return code.
1788  */
1789 static enum stasis_app_subscribe_res app_subscribe(
1790         struct stasis_app *app, const char *uri,
1791         struct stasis_app_event_source *event_source)
1792 {
1793         const char *app_name = stasis_app_name(app);
1794         RAII_VAR(void *, obj, NULL, ao2_cleanup);
1795
1796         ast_debug(3, "%s: Checking %s\n", app_name, uri);
1797
1798         if (!ast_strlen_zero(uri + strlen(event_source->scheme)) &&
1799             (!event_source->find || (!(obj = event_source->find(app, uri + strlen(event_source->scheme)))))) {
1800                 ast_log(LOG_WARNING, "Event source not found: %s\n", uri);
1801                 return STASIS_ASR_EVENT_SOURCE_NOT_FOUND;
1802         }
1803
1804         ast_debug(3, "%s: Subscribing to %s\n", app_name, uri);
1805
1806         if (!event_source->subscribe || (event_source->subscribe(app, obj))) {
1807                 ast_log(LOG_WARNING, "Error subscribing app '%s' to '%s'\n",
1808                         app_name, uri);
1809                 return STASIS_ASR_INTERNAL_ERROR;
1810         }
1811
1812         return STASIS_ASR_OK;
1813 }
1814
1815 enum stasis_app_subscribe_res stasis_app_subscribe(const char *app_name,
1816         const char **event_source_uris, int event_sources_count,
1817         struct ast_json **json)
1818 {
1819         return app_handle_subscriptions(
1820                 app_name, event_source_uris, event_sources_count,
1821                 json, app_subscribe);
1822 }
1823
1824 /*!
1825  * \internal
1826  * \brief Unsubscribe an app from an event source.
1827  *
1828  * \param app application to unsubscribe
1829  * \param uri scheme:id of an event source
1830  * \param event_source being unsubscribed from
1831  *
1832  * \retval stasis_app_subscribe_res return code.
1833  */
1834 static enum stasis_app_subscribe_res app_unsubscribe(
1835         struct stasis_app *app, const char *uri,
1836         struct stasis_app_event_source *event_source)
1837 {
1838         const char *app_name = stasis_app_name(app);
1839         const char *id = uri + strlen(event_source->scheme);
1840
1841         if (!event_source->is_subscribed ||
1842             (!event_source->is_subscribed(app, id))) {
1843                 return STASIS_ASR_EVENT_SOURCE_NOT_FOUND;
1844         }
1845
1846         ast_debug(3, "%s: Unsubscribing from %s\n", app_name, uri);
1847
1848         if (!event_source->unsubscribe || (event_source->unsubscribe(app, id))) {
1849                 ast_log(LOG_WARNING, "Error unsubscribing app '%s' to '%s'\n",
1850                         app_name, uri);
1851                 return -1;
1852         }
1853         return 0;
1854 }
1855
1856 enum stasis_app_subscribe_res stasis_app_unsubscribe(const char *app_name,
1857         const char **event_source_uris, int event_sources_count,
1858         struct ast_json **json)
1859 {
1860         return app_handle_subscriptions(
1861                 app_name, event_source_uris, event_sources_count,
1862                 json, app_unsubscribe);
1863 }
1864
1865 enum stasis_app_user_event_res stasis_app_user_event(const char *app_name,
1866         const char *event_name,
1867         const char **source_uris, int sources_count,
1868         struct ast_json *json_variables)
1869 {
1870         RAII_VAR(struct stasis_app *, app, find_app_by_name(app_name), ao2_cleanup);
1871         RAII_VAR(struct ast_json *, blob, NULL, ast_json_unref);
1872         RAII_VAR(struct ast_multi_object_blob *, multi, NULL, ao2_cleanup);
1873         RAII_VAR(void *, obj, NULL, ao2_cleanup);
1874         RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
1875         enum stasis_app_user_event_res res = STASIS_APP_USER_INTERNAL_ERROR;
1876         struct ast_json *json_value;
1877         int have_channel = 0;
1878         int i;
1879
1880         if (!app) {
1881                 ast_log(LOG_WARNING, "App %s not found\n", app_name);
1882                 return STASIS_APP_USER_APP_NOT_FOUND;
1883         }
1884
1885         if (!ast_multi_user_event_type()) {
1886                 return res;
1887         }
1888
1889         blob = json_variables;
1890         if (!blob) {
1891                 blob = ast_json_pack("{}");
1892         } else {
1893                 ast_json_ref(blob);
1894         }
1895         json_value = ast_json_string_create(event_name);
1896         if (!json_value) {
1897                 ast_log(LOG_ERROR, "unable to create json string\n");
1898                 return res;
1899         }
1900         if (ast_json_object_set(blob, "eventname", json_value)) {
1901                 ast_log(LOG_ERROR, "unable to set eventname to blob\n");
1902                 return res;
1903         }
1904
1905         multi = ast_multi_object_blob_create(blob);
1906
1907         for (i = 0; i < sources_count; ++i) {
1908                 const char *uri = source_uris[i];
1909                 void *snapshot=NULL;
1910                 enum stasis_user_multi_object_snapshot_type type;
1911
1912                 if (ast_begins_with(uri, "channel:")) {
1913                         type = STASIS_UMOS_CHANNEL;
1914                         snapshot = ast_channel_snapshot_get_latest(uri + 8);
1915                         have_channel = 1;
1916                 } else if (ast_begins_with(uri, "bridge:")) {
1917                         type = STASIS_UMOS_BRIDGE;
1918                         snapshot = ast_bridge_snapshot_get_latest(uri + 7);
1919                 } else if (ast_begins_with(uri, "endpoint:")) {
1920                         type = STASIS_UMOS_ENDPOINT;
1921                         snapshot = ast_endpoint_latest_snapshot(uri + 9, NULL);
1922                 } else {
1923                         ast_log(LOG_WARNING, "Invalid scheme: %s\n", uri);
1924                         return STASIS_APP_USER_EVENT_SOURCE_BAD_SCHEME;
1925                 }
1926                 if (!snapshot) {
1927                         ast_log(LOG_ERROR, "Unable to get snapshot for %s\n", uri);
1928                         return STASIS_APP_USER_EVENT_SOURCE_NOT_FOUND;
1929                 }
1930                 ast_multi_object_blob_add(multi, type, snapshot);
1931         }
1932
1933         message = stasis_message_create(ast_multi_user_event_type(), multi);
1934         if (!message) {
1935                 ast_log(LOG_ERROR, "Unable to create stasis user event message\n");
1936                 return res;
1937         }
1938
1939         /*
1940          * Publishing to two different topics is normally to be avoided -- except
1941          * in this case both are final destinations with no forwards (only listeners).
1942          * The message has to be delivered to the application topic for ARI, but a
1943          * copy is also delivered directly to the manager for AMI if there is a channel.
1944          */
1945         stasis_publish(ast_app_get_topic(app), message);
1946
1947         if (have_channel) {
1948                 stasis_publish(ast_manager_get_topic(), message);
1949         }
1950
1951         return STASIS_APP_USER_OK;
1952 }
1953
1954 void stasis_app_ref(void)
1955 {
1956         ast_module_ref(ast_module_info->self);
1957 }
1958
1959 void stasis_app_unref(void)
1960 {
1961         ast_module_unref(ast_module_info->self);
1962 }
1963
1964 static int unload_module(void)
1965 {
1966         stasis_app_unregister_event_sources();
1967
1968         cli_cleanup();
1969
1970         messaging_cleanup();
1971
1972         cleanup();
1973         ao2_cleanup(apps_registry);
1974         apps_registry = NULL;
1975
1976         ao2_cleanup(app_controls);
1977         app_controls = NULL;
1978
1979         ao2_cleanup(app_bridges);
1980         app_bridges = NULL;
1981
1982         ao2_cleanup(app_bridges_moh);
1983         app_bridges_moh = NULL;
1984
1985         ao2_cleanup(app_bridges_playback);
1986         app_bridges_playback = NULL;
1987
1988         stasis_app_control_shutdown();
1989
1990         STASIS_MESSAGE_TYPE_CLEANUP(end_message_type);
1991         STASIS_MESSAGE_TYPE_CLEANUP(start_message_type);
1992
1993         return 0;
1994 }
1995
1996 /* \brief Sanitization callback for channel snapshots */
1997 static int channel_snapshot_sanitizer(const struct ast_channel_snapshot *snapshot)
1998 {
1999         if (!snapshot || !(snapshot->tech_properties & AST_CHAN_TP_INTERNAL)) {
2000                 return 0;
2001         }
2002         return 1;
2003 }
2004
2005 /* \brief Sanitization callback for channels */
2006 static int channel_sanitizer(const struct ast_channel *chan)
2007 {
2008         if (!chan || !(ast_channel_tech(chan)->properties & AST_CHAN_TP_INTERNAL)) {
2009                 return 0;
2010         }
2011         return 1;
2012 }
2013
2014 /* \brief Sanitization callback for channel unique IDs */
2015 static int channel_id_sanitizer(const char *id)
2016 {
2017         RAII_VAR(struct ast_channel_snapshot *, snapshot, ast_channel_snapshot_get_latest(id), ao2_cleanup);
2018
2019         return channel_snapshot_sanitizer(snapshot);
2020 }
2021
2022 /* \brief Sanitization callbacks for communication to Stasis applications */
2023 struct stasis_message_sanitizer app_sanitizer = {
2024         .channel_id = channel_id_sanitizer,
2025         .channel_snapshot = channel_snapshot_sanitizer,
2026         .channel = channel_sanitizer,
2027 };
2028
2029 struct stasis_message_sanitizer *stasis_app_get_sanitizer(void)
2030 {
2031         return &app_sanitizer;
2032 }
2033
2034 static const struct ast_datastore_info stasis_internal_channel_info = {
2035         .type = "stasis-internal-channel",
2036 };
2037
2038 static int set_internal_datastore(struct ast_channel *chan)
2039 {
2040         struct ast_datastore *datastore;
2041
2042         datastore = ast_channel_datastore_find(chan, &stasis_internal_channel_info, NULL);
2043         if (!datastore) {
2044                 datastore = ast_datastore_alloc(&stasis_internal_channel_info, NULL);
2045                 if (!datastore) {
2046                         return -1;
2047                 }
2048                 ast_channel_datastore_add(chan, datastore);
2049         }
2050         return 0;
2051 }
2052
2053 int stasis_app_channel_unreal_set_internal(struct ast_channel *chan)
2054 {
2055         struct ast_channel *outchan = NULL, *outowner = NULL;
2056         int res = 0;
2057         struct ast_unreal_pvt *unreal_pvt = ast_channel_tech_pvt(chan);
2058
2059         ao2_ref(unreal_pvt, +1);
2060         ast_unreal_lock_all(unreal_pvt, &outowner, &outchan);
2061         if (outowner) {
2062                 res |= set_internal_datastore(outowner);
2063                 ast_channel_unlock(outowner);
2064                 ast_channel_unref(outowner);
2065         }
2066         if (outchan) {
2067                 res |= set_internal_datastore(outchan);
2068                 ast_channel_unlock(outchan);
2069                 ast_channel_unref(outchan);
2070         }
2071         ao2_unlock(unreal_pvt);
2072         ao2_ref(unreal_pvt, -1);
2073         return res;
2074 }
2075
2076 int stasis_app_channel_set_internal(struct ast_channel *chan)
2077 {
2078         int res;
2079
2080         ast_channel_lock(chan);
2081         res = set_internal_datastore(chan);
2082         ast_channel_unlock(chan);
2083
2084         return res;
2085 }
2086
2087 int stasis_app_channel_is_internal(struct ast_channel *chan)
2088 {
2089         struct ast_datastore *datastore;
2090         int res = 0;
2091
2092         ast_channel_lock(chan);
2093         datastore = ast_channel_datastore_find(chan, &stasis_internal_channel_info, NULL);
2094         if (datastore) {
2095                 res = 1;
2096         }
2097         ast_channel_unlock(chan);
2098
2099         return res;
2100 }
2101
2102 static int load_module(void)
2103 {
2104         if (STASIS_MESSAGE_TYPE_INIT(start_message_type) != 0) {
2105                 return AST_MODULE_LOAD_DECLINE;
2106         }
2107         if (STASIS_MESSAGE_TYPE_INIT(end_message_type) != 0) {
2108                 return AST_MODULE_LOAD_DECLINE;
2109         }
2110         apps_registry = ao2_container_alloc(APPS_NUM_BUCKETS, app_hash, app_compare);
2111         app_controls = ao2_container_alloc(CONTROLS_NUM_BUCKETS, control_hash, control_compare);
2112         app_bridges = ao2_container_alloc(BRIDGES_NUM_BUCKETS, bridges_hash, bridges_compare);
2113         app_bridges_moh = ao2_container_alloc_hash(
2114                 AO2_ALLOC_OPT_LOCK_MUTEX, AO2_CONTAINER_ALLOC_OPT_DUPS_REJECT,
2115                 37, bridges_channel_hash_fn, bridges_channel_sort_fn, NULL);
2116         app_bridges_playback = ao2_container_alloc_hash(
2117                 AO2_ALLOC_OPT_LOCK_MUTEX, AO2_CONTAINER_ALLOC_OPT_DUPS_REJECT,
2118                 37, bridges_channel_hash_fn, bridges_channel_sort_fn, NULL);
2119         if (!apps_registry || !app_controls || !app_bridges || !app_bridges_moh || !app_bridges_playback) {
2120                 unload_module();
2121                 return AST_MODULE_LOAD_FAILURE;
2122         }
2123
2124         if (messaging_init()) {
2125                 unload_module();
2126                 return AST_MODULE_LOAD_FAILURE;
2127         }
2128
2129         if (cli_init()) {
2130                 unload_module();
2131                 return AST_MODULE_LOAD_FAILURE;
2132         }
2133
2134         bridge_stasis_init();
2135
2136         stasis_app_register_event_sources();
2137
2138         return AST_MODULE_LOAD_SUCCESS;
2139 }
2140
2141 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS, "Stasis application support",
2142         .load_pri = AST_MODPRI_APP_DEPEND,
2143         .support_level = AST_MODULE_SUPPORT_CORE,
2144         .load = load_module,
2145         .unload = unload_module,
2146 );