Multiple revisions 420089-420090,420097
[asterisk/asterisk.git] / res / res_stasis.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2012 - 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis application support.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  *
25  * <code>res_stasis.so</code> brings together the various components of the
26  * Stasis application infrastructure.
27  *
28  * First, there's the Stasis application handler, stasis_app_exec(). This is
29  * called by <code>app_stasis.so</code> to give control of a channel to the
30  * Stasis application code from the dialplan.
31  *
32  * While a channel is in stasis_app_exec(), it has a \ref stasis_app_control
33  * object, which may be used to control the channel.
34  *
35  * To control the channel, commands may be sent to channel using
36  * stasis_app_send_command() and stasis_app_send_async_command().
37  *
38  * Alongside this, applications may be registered/unregistered using
39  * stasis_app_register()/stasis_app_unregister(). While a channel is in Stasis,
40  * events received on the channel's topic are converted to JSON and forwarded to
41  * the \ref stasis_app_cb. The application may also subscribe to the channel to
42  * continue to receive messages even after the channel has left Stasis, but it
43  * will not be able to control it.
44  *
45  * Given all the stuff that comes together in this module, it's been broken up
46  * into several pieces that are in <code>res/stasis/</code> and compiled into
47  * <code>res_stasis.so</code>.
48  */
49
50 /*** MODULEINFO
51         <support_level>core</support_level>
52  ***/
53
54 #include "asterisk.h"
55
56 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
57
58 #include "asterisk/astobj2.h"
59 #include "asterisk/callerid.h"
60 #include "asterisk/module.h"
61 #include "asterisk/stasis_app_impl.h"
62 #include "asterisk/stasis_channels.h"
63 #include "asterisk/stasis_bridges.h"
64 #include "asterisk/stasis_endpoints.h"
65 #include "asterisk/stasis_message_router.h"
66 #include "asterisk/strings.h"
67 #include "stasis/app.h"
68 #include "stasis/control.h"
69 #include "stasis/messaging.h"
70 #include "stasis/stasis_bridge.h"
71 #include "asterisk/core_unreal.h"
72 #include "asterisk/musiconhold.h"
73 #include "asterisk/causes.h"
74 #include "asterisk/stringfields.h"
75 #include "asterisk/bridge_after.h"
76 #include "asterisk/format_cache.h"
77
78 /*! Time to wait for a frame in the application */
79 #define MAX_WAIT_MS 200
80
81 /*!
82  * \brief Number of buckets for the Stasis application hash table.  Remember to
83  * keep it a prime number!
84  */
85 #define APPS_NUM_BUCKETS 127
86
87 /*!
88  * \brief Number of buckets for the Stasis application hash table.  Remember to
89  * keep it a prime number!
90  */
91 #define CONTROLS_NUM_BUCKETS 127
92
93 /*!
94  * \brief Number of buckets for the Stasis bridges hash table.  Remember to
95  * keep it a prime number!
96  */
97 #define BRIDGES_NUM_BUCKETS 127
98
99 /*!
100  * \brief Stasis application container.
101  */
102 struct ao2_container *apps_registry;
103
104 struct ao2_container *app_controls;
105
106 struct ao2_container *app_bridges;
107
108 struct ao2_container *app_bridges_moh;
109
110 struct ao2_container *app_bridges_playback;
111
112 const char *stasis_app_name(const struct stasis_app *app)
113 {
114         return app_name(app);
115 }
116
117 /*! AO2 hash function for \ref app */
118 static int app_hash(const void *obj, const int flags)
119 {
120         const struct stasis_app *app;
121         const char *key;
122
123         switch (flags & OBJ_SEARCH_MASK) {
124         case OBJ_SEARCH_KEY:
125                 key = obj;
126                 break;
127         case OBJ_SEARCH_OBJECT:
128                 app = obj;
129                 key = stasis_app_name(app);
130                 break;
131         default:
132                 /* Hash can only work on something with a full key. */
133                 ast_assert(0);
134                 return 0;
135         }
136         return ast_str_hash(key);
137 }
138
139 /*! AO2 comparison function for \ref app */
140 static int app_compare(void *obj, void *arg, int flags)
141 {
142         const struct stasis_app *object_left = obj;
143         const struct stasis_app *object_right = arg;
144         const char *right_key = arg;
145         int cmp;
146
147         switch (flags & OBJ_SEARCH_MASK) {
148         case OBJ_SEARCH_OBJECT:
149                 right_key = stasis_app_name(object_right);
150                 /* Fall through */
151         case OBJ_SEARCH_KEY:
152                 cmp = strcmp(stasis_app_name(object_left), right_key);
153                 break;
154         case OBJ_SEARCH_PARTIAL_KEY:
155                 /*
156                  * We could also use a partial key struct containing a length
157                  * so strlen() does not get called for every comparison instead.
158                  */
159                 cmp = strncmp(stasis_app_name(object_left), right_key, strlen(right_key));
160                 break;
161         default:
162                 /*
163                  * What arg points to is specific to this traversal callback
164                  * and has no special meaning to astobj2.
165                  */
166                 cmp = 0;
167                 break;
168         }
169         if (cmp) {
170                 return 0;
171         }
172         /*
173          * At this point the traversal callback is identical to a sorted
174          * container.
175          */
176         return CMP_MATCH;
177 }
178
179 /*! AO2 hash function for \ref stasis_app_control */
180 static int control_hash(const void *obj, const int flags)
181 {
182         const struct stasis_app_control *control;
183         const char *key;
184
185         switch (flags & OBJ_SEARCH_MASK) {
186         case OBJ_SEARCH_KEY:
187                 key = obj;
188                 break;
189         case OBJ_SEARCH_OBJECT:
190                 control = obj;
191                 key = stasis_app_control_get_channel_id(control);
192                 break;
193         default:
194                 /* Hash can only work on something with a full key. */
195                 ast_assert(0);
196                 return 0;
197         }
198         return ast_str_hash(key);
199 }
200
201 /*! AO2 comparison function for \ref stasis_app_control */
202 static int control_compare(void *obj, void *arg, int flags)
203 {
204         const struct stasis_app_control *object_left = obj;
205         const struct stasis_app_control *object_right = arg;
206         const char *right_key = arg;
207         int cmp;
208
209         switch (flags & OBJ_SEARCH_MASK) {
210         case OBJ_SEARCH_OBJECT:
211                 right_key = stasis_app_control_get_channel_id(object_right);
212                 /* Fall through */
213         case OBJ_SEARCH_KEY:
214                 cmp = strcmp(stasis_app_control_get_channel_id(object_left), right_key);
215                 break;
216         case OBJ_SEARCH_PARTIAL_KEY:
217                 /*
218                  * We could also use a partial key struct containing a length
219                  * so strlen() does not get called for every comparison instead.
220                  */
221                 cmp = strncmp(stasis_app_control_get_channel_id(object_left), right_key, strlen(right_key));
222                 break;
223         default:
224                 /*
225                  * What arg points to is specific to this traversal callback
226                  * and has no special meaning to astobj2.
227                  */
228                 cmp = 0;
229                 break;
230         }
231         if (cmp) {
232                 return 0;
233         }
234         /*
235          * At this point the traversal callback is identical to a sorted
236          * container.
237          */
238         return CMP_MATCH;
239 }
240
241 static int cleanup_cb(void *obj, void *arg, int flags)
242 {
243         struct stasis_app *app = obj;
244
245         if (!app_is_finished(app)) {
246                 return 0;
247         }
248
249         ast_verb(1, "Shutting down application '%s'\n", stasis_app_name(app));
250         app_shutdown(app);
251
252         return CMP_MATCH;
253
254 }
255
256 /*!
257  * \brief Clean up any old apps that we don't need any more.
258  */
259 static void cleanup(void)
260 {
261         ao2_callback(apps_registry, OBJ_MULTIPLE | OBJ_NODATA | OBJ_UNLINK,
262                 cleanup_cb, NULL);
263 }
264
265 struct stasis_app_control *stasis_app_control_create(struct ast_channel *chan)
266 {
267         return control_create(chan, NULL);
268 }
269
270 struct stasis_app_control *stasis_app_control_find_by_channel(
271         const struct ast_channel *chan)
272 {
273         if (chan == NULL) {
274                 return NULL;
275         }
276
277         return stasis_app_control_find_by_channel_id(
278                 ast_channel_uniqueid(chan));
279 }
280
281 struct stasis_app_control *stasis_app_control_find_by_channel_id(
282         const char *channel_id)
283 {
284         return ao2_find(app_controls, channel_id, OBJ_SEARCH_KEY);
285 }
286
287 /*! AO2 hash function for bridges container  */
288 static int bridges_hash(const void *obj, const int flags)
289 {
290         const struct ast_bridge *bridge;
291         const char *key;
292
293         switch (flags & OBJ_SEARCH_MASK) {
294         case OBJ_SEARCH_KEY:
295                 key = obj;
296                 break;
297         case OBJ_SEARCH_OBJECT:
298                 bridge = obj;
299                 key = bridge->uniqueid;
300                 break;
301         default:
302                 /* Hash can only work on something with a full key. */
303                 ast_assert(0);
304                 return 0;
305         }
306         return ast_str_hash(key);
307 }
308
309 /*! AO2 comparison function for bridges container */
310 static int bridges_compare(void *obj, void *arg, int flags)
311 {
312         const struct ast_bridge *object_left = obj;
313         const struct ast_bridge *object_right = arg;
314         const char *right_key = arg;
315         int cmp;
316
317         switch (flags & OBJ_SEARCH_MASK) {
318         case OBJ_SEARCH_OBJECT:
319                 right_key = object_right->uniqueid;
320                 /* Fall through */
321         case OBJ_SEARCH_KEY:
322                 cmp = strcmp(object_left->uniqueid, right_key);
323                 break;
324         case OBJ_SEARCH_PARTIAL_KEY:
325                 /*
326                  * We could also use a partial key struct containing a length
327                  * so strlen() does not get called for every comparison instead.
328                  */
329                 cmp = strncmp(object_left->uniqueid, right_key, strlen(right_key));
330                 break;
331         default:
332                 /*
333                  * What arg points to is specific to this traversal callback
334                  * and has no special meaning to astobj2.
335                  */
336                 cmp = 0;
337                 break;
338         }
339         if (cmp) {
340                 return 0;
341         }
342         /*
343          * At this point the traversal callback is identical to a sorted
344          * container.
345          */
346         return CMP_MATCH;
347 }
348
349 /*!
350  *  Used with app_bridges_moh and app_bridge_control, they provide links
351  *  between bridges and channels used for ARI application purposes
352  */
353 struct stasis_app_bridge_channel_wrapper {
354         AST_DECLARE_STRING_FIELDS(
355                 AST_STRING_FIELD(channel_id);
356                 AST_STRING_FIELD(bridge_id);
357         );
358 };
359
360 static void stasis_app_bridge_channel_wrapper_destructor(void *obj)
361 {
362         struct stasis_app_bridge_channel_wrapper *wrapper = obj;
363         ast_string_field_free_memory(wrapper);
364 }
365
366 /*! AO2 hash function for the bridges moh container */
367 static int bridges_channel_hash_fn(const void *obj, const int flags)
368 {
369         const struct stasis_app_bridge_channel_wrapper *wrapper;
370         const char *key;
371
372         switch (flags & OBJ_SEARCH_MASK) {
373         case OBJ_SEARCH_KEY:
374                 key = obj;
375                 break;
376         case OBJ_SEARCH_OBJECT:
377                 wrapper = obj;
378                 key = wrapper->bridge_id;
379                 break;
380         default:
381                 /* Hash can only work on something with a full key. */
382                 ast_assert(0);
383                 return 0;
384         }
385         return ast_str_hash(key);
386 }
387
388 static int bridges_channel_sort_fn(const void *obj_left, const void *obj_right, const int flags)
389 {
390         const struct stasis_app_bridge_channel_wrapper *left = obj_left;
391         const struct stasis_app_bridge_channel_wrapper *right = obj_right;
392         const char *right_key = obj_right;
393         int cmp;
394
395         switch (flags & OBJ_SEARCH_MASK) {
396         case OBJ_SEARCH_OBJECT:
397                 right_key = right->bridge_id;
398                 /* Fall through */
399         case OBJ_SEARCH_KEY:
400                 cmp = strcmp(left->bridge_id, right_key);
401                 break;
402         case OBJ_SEARCH_PARTIAL_KEY:
403                 cmp = strncmp(left->bridge_id, right_key, strlen(right_key));
404                 break;
405         default:
406                 /* Sort can only work on something with a full or partial key. */
407                 ast_assert(0);
408                 cmp = 0;
409                 break;
410         }
411         return cmp;
412 }
413
414 /*! Removes the bridge to music on hold channel link */
415 static void remove_bridge_moh(char *bridge_id)
416 {
417         ao2_find(app_bridges_moh, bridge_id, OBJ_SEARCH_KEY | OBJ_UNLINK | OBJ_NODATA);
418         ast_free(bridge_id);
419 }
420
421 /*! After bridge failure callback for moh channels */
422 static void moh_after_bridge_cb_failed(enum ast_bridge_after_cb_reason reason, void *data)
423 {
424         char *bridge_id = data;
425
426         remove_bridge_moh(bridge_id);
427 }
428
429 /*! After bridge callback for moh channels */
430 static void moh_after_bridge_cb(struct ast_channel *chan, void *data)
431 {
432         char *bridge_id = data;
433
434         remove_bridge_moh(bridge_id);
435 }
436
437 /*! Request a bridge MOH channel */
438 static struct ast_channel *prepare_bridge_moh_channel(void)
439 {
440         RAII_VAR(struct ast_format_cap *, cap, NULL, ao2_cleanup);
441
442         cap = ast_format_cap_alloc(AST_FORMAT_CAP_FLAG_DEFAULT);
443         if (!cap) {
444                 return NULL;
445         }
446
447         ast_format_cap_append(cap, ast_format_slin, 0);
448
449         return ast_request("Announcer", cap, NULL, NULL, "ARI_MOH", NULL);
450 }
451
452 /*! Provides the moh channel with a thread so it can actually play its music */
453 static void *moh_channel_thread(void *data)
454 {
455         struct ast_channel *moh_channel = data;
456
457         while (!ast_safe_sleep(moh_channel, 1000)) {
458         }
459
460         ast_moh_stop(moh_channel);
461         ast_hangup(moh_channel);
462
463         return NULL;
464 }
465
466 /*!
467  * \internal
468  * \brief Creates, pushes, and links a channel for playing music on hold to bridge
469  *
470  * \param bridge Which bridge this moh channel exists for
471  *
472  * \retval NULL if the channel could not be created, pushed, or linked
473  * \retval Reference to the channel on success
474  */
475 static struct ast_channel *bridge_moh_create(struct ast_bridge *bridge)
476 {
477         RAII_VAR(struct stasis_app_bridge_channel_wrapper *, new_wrapper, NULL, ao2_cleanup);
478         RAII_VAR(char *, bridge_id, ast_strdup(bridge->uniqueid), ast_free);
479         struct ast_channel *chan;
480         pthread_t threadid;
481
482         if (!bridge_id) {
483                 return NULL;
484         }
485
486         chan = prepare_bridge_moh_channel();
487         if (!chan) {
488                 return NULL;
489         }
490
491         /* The after bridge callback assumes responsibility of the bridge_id. */
492         if (ast_bridge_set_after_callback(chan,
493                 moh_after_bridge_cb, moh_after_bridge_cb_failed, bridge_id)) {
494                 ast_hangup(chan);
495                 return NULL;
496         }
497         bridge_id = NULL;
498
499         if (ast_unreal_channel_push_to_bridge(chan, bridge,
500                 AST_BRIDGE_CHANNEL_FLAG_IMMOVABLE | AST_BRIDGE_CHANNEL_FLAG_LONELY)) {
501                 ast_hangup(chan);
502                 return NULL;
503         }
504
505         new_wrapper = ao2_alloc_options(sizeof(*new_wrapper),
506                 stasis_app_bridge_channel_wrapper_destructor, AO2_ALLOC_OPT_LOCK_NOLOCK);
507         if (!new_wrapper) {
508                 ast_hangup(chan);
509                 return NULL;
510         }
511
512         if (ast_string_field_init(new_wrapper, 32)) {
513                 ast_hangup(chan);
514                 return NULL;
515         }
516         ast_string_field_set(new_wrapper, bridge_id, bridge->uniqueid);
517         ast_string_field_set(new_wrapper, channel_id, ast_channel_uniqueid(chan));
518
519         if (!ao2_link_flags(app_bridges_moh, new_wrapper, OBJ_NOLOCK)) {
520                 ast_hangup(chan);
521                 return NULL;
522         }
523
524         if (ast_pthread_create_detached(&threadid, NULL, moh_channel_thread, chan)) {
525                 ast_log(LOG_ERROR, "Failed to create channel thread. Abandoning MOH channel creation.\n");
526                 ao2_unlink_flags(app_bridges_moh, new_wrapper, OBJ_NOLOCK);
527                 ast_hangup(chan);
528                 return NULL;
529         }
530
531         return chan;
532 }
533
534 struct ast_channel *stasis_app_bridge_moh_channel(struct ast_bridge *bridge)
535 {
536         RAII_VAR(struct stasis_app_bridge_channel_wrapper *, moh_wrapper, NULL, ao2_cleanup);
537
538         {
539                 SCOPED_AO2LOCK(lock, app_bridges_moh);
540
541                 moh_wrapper = ao2_find(app_bridges_moh, bridge->uniqueid, OBJ_SEARCH_KEY | OBJ_NOLOCK);
542                 if (!moh_wrapper) {
543                         return bridge_moh_create(bridge);
544                 }
545         }
546
547         return ast_channel_get_by_name(moh_wrapper->channel_id);
548 }
549
550 int stasis_app_bridge_moh_stop(struct ast_bridge *bridge)
551 {
552         RAII_VAR(struct stasis_app_bridge_channel_wrapper *, moh_wrapper, NULL, ao2_cleanup);
553         struct ast_channel *chan;
554
555         moh_wrapper = ao2_find(app_bridges_moh, bridge->uniqueid, OBJ_SEARCH_KEY | OBJ_UNLINK);
556         if (!moh_wrapper) {
557                 return -1;
558         }
559
560         chan = ast_channel_get_by_name(moh_wrapper->channel_id);
561         if (!chan) {
562                 return -1;
563         }
564
565         ast_moh_stop(chan);
566         ast_softhangup(chan, AST_CAUSE_NORMAL_CLEARING);
567         ao2_cleanup(chan);
568
569         return 0;
570 }
571
572 /*! Removes the bridge to playback channel link */
573 static void remove_bridge_playback(char *bridge_id)
574 {
575         struct stasis_app_bridge_channel_wrapper *wrapper;
576         struct stasis_app_control *control;
577
578         wrapper = ao2_find(app_bridges_playback, bridge_id, OBJ_SEARCH_KEY | OBJ_UNLINK);
579
580         if (wrapper) {
581                 control = stasis_app_control_find_by_channel_id(wrapper->channel_id);
582                 if (control) {
583                         ao2_unlink(app_controls, control);
584                         ao2_ref(control, -1);
585                 }
586                 ao2_ref(wrapper, -1);
587         }
588         ast_free(bridge_id);
589 }
590
591 static void playback_after_bridge_cb_failed(enum ast_bridge_after_cb_reason reason, void *data)
592 {
593         char *bridge_id = data;
594
595         remove_bridge_playback(bridge_id);
596 }
597
598 static void playback_after_bridge_cb(struct ast_channel *chan, void *data)
599 {
600         char *bridge_id = data;
601
602         remove_bridge_playback(bridge_id);
603 }
604
605 int stasis_app_bridge_playback_channel_add(struct ast_bridge *bridge,
606         struct ast_channel *chan,
607         struct stasis_app_control *control)
608 {
609         RAII_VAR(struct stasis_app_bridge_channel_wrapper *, new_wrapper, NULL, ao2_cleanup);
610         char *bridge_id = ast_strdup(bridge->uniqueid);
611
612         if (!bridge_id) {
613                 return -1;
614         }
615
616         if (ast_bridge_set_after_callback(chan,
617                 playback_after_bridge_cb, playback_after_bridge_cb_failed, bridge_id)) {
618                 ast_free(bridge_id);
619                 return -1;
620         }
621
622         new_wrapper = ao2_alloc_options(sizeof(*new_wrapper),
623                 stasis_app_bridge_channel_wrapper_destructor, AO2_ALLOC_OPT_LOCK_NOLOCK);
624         if (!new_wrapper) {
625                 return -1;
626         }
627
628         if (ast_string_field_init(new_wrapper, 32)) {
629                 return -1;
630         }
631
632         ast_string_field_set(new_wrapper, bridge_id, bridge->uniqueid);
633         ast_string_field_set(new_wrapper, channel_id, ast_channel_uniqueid(chan));
634
635         if (!ao2_link(app_bridges_playback, new_wrapper)) {
636                 return -1;
637         }
638
639         ao2_link(app_controls, control);
640         return 0;
641 }
642
643 struct ast_channel *stasis_app_bridge_playback_channel_find(struct ast_bridge *bridge)
644 {
645         struct stasis_app_bridge_channel_wrapper *playback_wrapper;
646         struct ast_channel *chan;
647
648         playback_wrapper = ao2_find(app_bridges_playback, bridge->uniqueid, OBJ_SEARCH_KEY);
649         if (!playback_wrapper) {
650                 return NULL;
651         }
652
653         chan = ast_channel_get_by_name(playback_wrapper->channel_id);
654         ao2_ref(playback_wrapper, -1);
655         return chan;
656 }
657
658 struct ast_bridge *stasis_app_bridge_find_by_id(
659         const char *bridge_id)
660 {
661         return ao2_find(app_bridges, bridge_id, OBJ_SEARCH_KEY);
662 }
663
664
665 /*!
666  * \brief In addition to running ao2_cleanup(), this function also removes the
667  * object from the app_controls container.
668  */
669 static void control_unlink(struct stasis_app_control *control)
670 {
671         if (!control) {
672                 return;
673         }
674
675         ao2_unlink(app_controls, control);
676         ao2_cleanup(control);
677 }
678
679 struct ast_bridge *stasis_app_bridge_create(const char *type, const char *name, const char *id)
680 {
681         struct ast_bridge *bridge;
682         char *requested_type, *requested_types = ast_strdupa(S_OR(type, "mixing"));
683         int capabilities = 0;
684         int flags = AST_BRIDGE_FLAG_MERGE_INHIBIT_FROM | AST_BRIDGE_FLAG_MERGE_INHIBIT_TO
685                 | AST_BRIDGE_FLAG_SWAP_INHIBIT_FROM | AST_BRIDGE_FLAG_SWAP_INHIBIT_TO
686                 | AST_BRIDGE_FLAG_TRANSFER_BRIDGE_ONLY;
687
688         while ((requested_type = strsep(&requested_types, ","))) {
689                 requested_type = ast_strip(requested_type);
690
691                 if (!strcmp(requested_type, "mixing")) {
692                         capabilities |= STASIS_BRIDGE_MIXING_CAPABILITIES;
693                         flags |= AST_BRIDGE_FLAG_SMART;
694                 } else if (!strcmp(requested_type, "holding")) {
695                         capabilities |= AST_BRIDGE_CAPABILITY_HOLDING;
696                 } else if (!strcmp(requested_type, "dtmf_events") ||
697                         !strcmp(requested_type, "proxy_media")) {
698                         capabilities &= ~AST_BRIDGE_CAPABILITY_NATIVE;
699                 }
700         }
701
702         if (!capabilities
703                 /* Holding and mixing capabilities don't mix. */
704                 || ((capabilities & AST_BRIDGE_CAPABILITY_HOLDING)
705                         && (capabilities & (STASIS_BRIDGE_MIXING_CAPABILITIES)))) {
706                 return NULL;
707         }
708
709         bridge = bridge_stasis_new(capabilities, flags, name, id);
710         if (bridge) {
711                 if (!ao2_link(app_bridges, bridge)) {
712                         ast_bridge_destroy(bridge, 0);
713                         bridge = NULL;
714                 }
715         }
716         return bridge;
717 }
718
719 void stasis_app_bridge_destroy(const char *bridge_id)
720 {
721         struct ast_bridge *bridge = stasis_app_bridge_find_by_id(bridge_id);
722         if (!bridge) {
723                 return;
724         }
725         ao2_unlink(app_bridges, bridge);
726         ast_bridge_destroy(bridge, 0);
727 }
728
729 static int send_start_msg(struct stasis_app *app, struct ast_channel *chan,
730         int argc, char *argv[])
731 {
732         RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
733         RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
734
735         struct ast_json *json_args;
736         int i;
737         struct stasis_message_sanitizer *sanitize = stasis_app_get_sanitizer();
738
739         ast_assert(chan != NULL);
740
741         /* Set channel info */
742         ast_channel_lock(chan);
743         snapshot = ast_channel_snapshot_create(chan);
744         ast_channel_unlock(chan);
745         if (!snapshot) {
746                 return -1;
747         }
748
749         if (sanitize && sanitize->channel_snapshot
750                 && sanitize->channel_snapshot(snapshot)) {
751                 return 0;
752         }
753
754         msg = ast_json_pack("{s: s, s: o, s: [], s: o}",
755                 "type", "StasisStart",
756                 "timestamp", ast_json_timeval(ast_tvnow(), NULL),
757                 "args",
758                 "channel", ast_channel_snapshot_to_json(snapshot, NULL));
759         if (!msg) {
760                 return -1;
761         }
762
763         /* Append arguments to args array */
764         json_args = ast_json_object_get(msg, "args");
765         ast_assert(json_args != NULL);
766         for (i = 0; i < argc; ++i) {
767                 int r = ast_json_array_append(json_args,
768                                               ast_json_string_create(argv[i]));
769                 if (r != 0) {
770                         ast_log(LOG_ERROR, "Error appending start message\n");
771                         return -1;
772                 }
773         }
774
775         app_send(app, msg);
776         return 0;
777 }
778
779 static int send_end_msg(struct stasis_app *app, struct ast_channel *chan)
780 {
781         RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
782         RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
783         struct stasis_message_sanitizer *sanitize = stasis_app_get_sanitizer();
784
785         ast_assert(chan != NULL);
786
787         /* Set channel info */
788         ast_channel_lock(chan);
789         snapshot = ast_channel_snapshot_create(chan);
790         ast_channel_unlock(chan);
791         if (snapshot == NULL) {
792                 return -1;
793         }
794
795         if (sanitize && sanitize->channel_snapshot
796                 && sanitize->channel_snapshot(snapshot)) {
797                 return 0;
798         }
799
800         msg = ast_json_pack("{s: s, s: o, s: o}",
801                 "type", "StasisEnd",
802                 "timestamp", ast_json_timeval(ast_tvnow(), NULL),
803                 "channel", ast_channel_snapshot_to_json(snapshot, NULL));
804         if (!msg) {
805                 return -1;
806         }
807
808         app_send(app, msg);
809         return 0;
810 }
811
812 void stasis_app_control_execute_until_exhausted(struct ast_channel *chan, struct stasis_app_control *control)
813 {
814         while (!control_is_done(control)) {
815                 int command_count;
816                 command_count = control_dispatch_all(control, chan);
817
818                 ao2_lock(control);
819
820                 if (control_command_count(control)) {
821                         /* If the command queue isn't empty, something added to the queue before it was locked. */
822                         ao2_unlock(control);
823                         continue;
824                 }
825
826                 if (command_count == 0 || ast_channel_fdno(chan) == -1) {
827                         control_mark_done(control);
828                         ao2_unlock(control);
829                         break;
830                 }
831                 ao2_unlock(control);
832         }
833 }
834
835 int stasis_app_control_is_done(struct stasis_app_control *control)
836 {
837         return control_is_done(control);
838 }
839
840 /*! /brief Stasis dialplan application callback */
841 int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc,
842                     char *argv[])
843 {
844         SCOPED_MODULE_USE(ast_module_info->self);
845
846         RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
847         RAII_VAR(struct stasis_app_control *, control, NULL, control_unlink);
848         struct ast_bridge *bridge = NULL;
849         int res = 0;
850
851         ast_assert(chan != NULL);
852
853         app = ao2_find(apps_registry, app_name, OBJ_SEARCH_KEY);
854         if (!app) {
855                 ast_log(LOG_ERROR,
856                         "Stasis app '%s' not registered\n", app_name);
857                 return -1;
858         }
859         if (!app_is_active(app)) {
860                 ast_log(LOG_ERROR,
861                         "Stasis app '%s' not active\n", app_name);
862                 return -1;
863         }
864
865         control = control_create(chan, app);
866         if (!control) {
867                 ast_log(LOG_ERROR, "Allocated failed\n");
868                 return -1;
869         }
870         ao2_link(app_controls, control);
871
872         res = send_start_msg(app, chan, argc, argv);
873         if (res != 0) {
874                 ast_log(LOG_ERROR,
875                         "Error sending start message to '%s'\n", app_name);
876                 return -1;
877         }
878
879         res = app_subscribe_channel(app, chan);
880         if (res != 0) {
881                 ast_log(LOG_ERROR, "Error subscribing app '%s' to channel '%s'\n",
882                         app_name, ast_channel_name(chan));
883                 return -1;
884         }
885
886         while (!control_is_done(control)) {
887                 RAII_VAR(struct ast_frame *, f, NULL, ast_frame_dtor);
888                 int r;
889                 int command_count;
890                 RAII_VAR(struct ast_bridge *, last_bridge, NULL, ao2_cleanup);
891
892                 /* Check to see if a bridge absorbed our hangup frame */
893                 if (ast_check_hangup_locked(chan)) {
894                         break;
895                 }
896
897                 last_bridge = bridge;
898                 bridge = ao2_bump(stasis_app_get_bridge(control));
899
900                 if (bridge != last_bridge) {
901                         app_unsubscribe_bridge(app, last_bridge);
902                         app_subscribe_bridge(app, bridge);
903                 }
904
905                 if (bridge) {
906                         /* Bridge is handling channel frames */
907                         control_wait(control);
908                         control_dispatch_all(control, chan);
909                         continue;
910                 }
911
912                 r = ast_waitfor(chan, MAX_WAIT_MS);
913
914                 if (r < 0) {
915                         ast_debug(3, "%s: Poll error\n",
916                                   ast_channel_uniqueid(chan));
917                         break;
918                 }
919
920                 command_count = control_dispatch_all(control, chan);
921
922                 if (command_count > 0 && ast_channel_fdno(chan) == -1) {
923                         /* Command drained the channel; wait for next frame */
924                         continue;
925                 }
926
927                 if (r == 0) {
928                         /* Timeout */
929                         continue;
930                 }
931
932                 f = ast_read(chan);
933                 if (!f) {
934                         /* Continue on in the dialplan */
935                         ast_debug(3, "%s: Hangup (no more frames)\n",
936                                 ast_channel_uniqueid(chan));
937                         break;
938                 }
939
940                 if (f->frametype == AST_FRAME_CONTROL) {
941                         if (f->subclass.integer == AST_CONTROL_HANGUP) {
942                                 /* Continue on in the dialplan */
943                                 ast_debug(3, "%s: Hangup\n",
944                                         ast_channel_uniqueid(chan));
945                                 break;
946                         }
947                 }
948         }
949
950         app_unsubscribe_bridge(app, stasis_app_get_bridge(control));
951         app_unsubscribe_channel(app, chan);
952         ao2_cleanup(bridge);
953
954         res = send_end_msg(app, chan);
955         if (res != 0) {
956                 ast_log(LOG_ERROR,
957                         "Error sending end message to %s\n", app_name);
958                 return res;
959         }
960
961         /* There's an off chance that app is ready for cleanup. Go ahead
962          * and clean up, just in case
963          */
964         cleanup();
965
966         return res;
967 }
968
969 int stasis_app_send(const char *app_name, struct ast_json *message)
970 {
971         RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
972
973         app = ao2_find(apps_registry, app_name, OBJ_SEARCH_KEY);
974         if (!app) {
975                 /* XXX We can do a better job handling late binding, queueing up
976                  * the call for a few seconds to wait for the app to register.
977                  */
978                 ast_log(LOG_WARNING,
979                         "Stasis app '%s' not registered\n", app_name);
980                 return -1;
981         }
982         app_send(app, message);
983         return 0;
984 }
985
986 static struct stasis_app *find_app_by_name(const char *app_name)
987 {
988         struct stasis_app *res = NULL;
989
990         if (!ast_strlen_zero(app_name)) {
991                 res = ao2_find(apps_registry, app_name, OBJ_SEARCH_KEY);
992         }
993
994         if (!res) {
995                 ast_log(LOG_WARNING, "Could not find app '%s'\n",
996                         app_name ? : "(null)");
997         }
998         return res;
999 }
1000
1001 static int append_name(void *obj, void *arg, int flags)
1002 {
1003         struct stasis_app *app = obj;
1004         struct ao2_container *apps = arg;
1005
1006         ast_str_container_add(apps, stasis_app_name(app));
1007         return 0;
1008 }
1009
1010 struct ao2_container *stasis_app_get_all(void)
1011 {
1012         RAII_VAR(struct ao2_container *, apps, NULL, ao2_cleanup);
1013
1014         apps = ast_str_container_alloc(1);
1015         if (!apps) {
1016                 return NULL;
1017         }
1018
1019         ao2_callback(apps_registry, OBJ_NODATA, append_name, apps);
1020
1021         return ao2_bump(apps);
1022 }
1023
1024 int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data)
1025 {
1026         RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
1027
1028         SCOPED_LOCK(apps_lock, apps_registry, ao2_lock, ao2_unlock);
1029
1030         app = ao2_find(apps_registry, app_name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1031         if (app) {
1032                 app_update(app, handler, data);
1033         } else {
1034                 app = app_create(app_name, handler, data);
1035                 if (app) {
1036                         ao2_link_flags(apps_registry, app, OBJ_NOLOCK);
1037                 } else {
1038                         return -1;
1039                 }
1040         }
1041
1042         /* We lazily clean up the apps_registry, because it's good enough to
1043          * prevent memory leaks, and we're lazy.
1044          */
1045         cleanup();
1046         return 0;
1047 }
1048
1049 void stasis_app_unregister(const char *app_name)
1050 {
1051         RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
1052
1053         if (!app_name) {
1054                 return;
1055         }
1056
1057         app = ao2_find(apps_registry, app_name, OBJ_SEARCH_KEY);
1058         if (!app) {
1059                 ast_log(LOG_ERROR,
1060                         "Stasis app '%s' not registered\n", app_name);
1061                 return;
1062         }
1063
1064         app_deactivate(app);
1065
1066         /* There's a decent chance that app is ready for cleanup. Go ahead
1067          * and clean up, just in case
1068          */
1069         cleanup();
1070 }
1071
1072 /*!
1073  * \internal \brief List of registered event sources.
1074  */
1075 AST_RWLIST_HEAD_STATIC(event_sources, stasis_app_event_source);
1076
1077 void stasis_app_register_event_source(struct stasis_app_event_source *obj)
1078 {
1079         SCOPED_LOCK(lock, &event_sources, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
1080         AST_LIST_INSERT_TAIL(&event_sources, obj, next);
1081         /* only need to bump the module ref on non-core sources because the
1082            core ones are [un]registered by this module. */
1083         if (!stasis_app_is_core_event_source(obj)) {
1084                 ast_module_ref(ast_module_info->self);
1085         }
1086 }
1087
1088 void stasis_app_unregister_event_source(struct stasis_app_event_source *obj)
1089 {
1090         struct stasis_app_event_source *source;
1091         SCOPED_LOCK(lock, &event_sources, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
1092         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&event_sources, source, next) {
1093                 if (source == obj) {
1094                         AST_RWLIST_REMOVE_CURRENT(next);
1095                         if (!stasis_app_is_core_event_source(obj)) {
1096                                 ast_module_unref(ast_module_info->self);
1097                         }
1098                         break;
1099                 }
1100         }
1101         AST_RWLIST_TRAVERSE_SAFE_END;
1102 }
1103
1104 /*!
1105  * \internal
1106  * \brief Convert event source data to JSON.
1107  *
1108  * Calls each event source that has a "to_json" handler allowing each
1109  * source to add data to the given JSON object.
1110  *
1111  * \param app application associated with the event source
1112  * \param json a json object to "fill"
1113  *
1114  * \retval The given json object.
1115  */
1116 static struct ast_json *app_event_sources_to_json(
1117         const struct stasis_app *app, struct ast_json *json)
1118 {
1119         struct stasis_app_event_source *source;
1120         SCOPED_LOCK(lock, &event_sources, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
1121         AST_LIST_TRAVERSE(&event_sources, source, next) {
1122                 if (source->to_json) {
1123                         source->to_json(app, json);
1124                 }
1125         }
1126         return json;
1127 }
1128
1129 static struct ast_json *stasis_app_object_to_json(struct stasis_app *app)
1130 {
1131         if (!app) {
1132                 return NULL;
1133         }
1134
1135         return app_event_sources_to_json(app, app_to_json(app));
1136 }
1137
1138 struct ast_json *stasis_app_to_json(const char *app_name)
1139 {
1140         RAII_VAR(struct stasis_app *, app, find_app_by_name(app_name), ao2_cleanup);
1141
1142         return stasis_app_object_to_json(app);
1143 }
1144
1145 /*!
1146  * \internal
1147  * \brief Finds an event source that matches a uri scheme.
1148  *
1149  * Uri(s) should begin with a particular scheme that can be matched
1150  * against an event source.
1151  *
1152  * \param uri uri containing a scheme to match
1153  *
1154  * \retval an event source if found, NULL otherwise.
1155  */
1156 static struct stasis_app_event_source *app_event_source_find(const char *uri)
1157 {
1158         struct stasis_app_event_source *source;
1159         SCOPED_LOCK(lock, &event_sources, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
1160         AST_LIST_TRAVERSE(&event_sources, source, next) {
1161                 if (ast_begins_with(uri, source->scheme)) {
1162                         return source;
1163                 }
1164         }
1165         return NULL;
1166 }
1167
1168 /*!
1169  * \internal
1170  * \brief Callback for subscription handling
1171  *
1172  * \param app [un]subscribing application
1173  * \param uri scheme:id of an event source
1174  * \param event_source being [un]subscribed [from]to
1175  *
1176  * \retval stasis_app_subscribe_res return code.
1177  */
1178 typedef enum stasis_app_subscribe_res (*app_subscription_handler)(
1179         struct stasis_app *app, const char *uri,
1180         struct stasis_app_event_source *event_source);
1181
1182 /*!
1183  * \internal
1184  * \brief Subscriptions handler for application [un]subscribing.
1185  *
1186  * \param app_name Name of the application to subscribe.
1187  * \param event_source_uris URIs for the event sources to subscribe to.
1188  * \param event_sources_count Array size of event_source_uris.
1189  * \param json Optional output pointer for JSON representation of the app
1190  *             after adding the subscription.
1191  * \param handler [un]subscribe handler
1192  *
1193  * \retval stasis_app_subscribe_res return code.
1194  */
1195 static enum stasis_app_subscribe_res app_handle_subscriptions(
1196         const char *app_name, const char **event_source_uris,
1197         int event_sources_count, struct ast_json **json,
1198         app_subscription_handler handler)
1199 {
1200         RAII_VAR(struct stasis_app *, app, find_app_by_name(app_name), ao2_cleanup);
1201         int i;
1202
1203         if (!app) {
1204                 return STASIS_ASR_APP_NOT_FOUND;
1205         }
1206
1207         for (i = 0; i < event_sources_count; ++i) {
1208                 const char *uri = event_source_uris[i];
1209                 enum stasis_app_subscribe_res res = STASIS_ASR_INTERNAL_ERROR;
1210                 struct stasis_app_event_source *event_source;
1211
1212                 if (!(event_source = app_event_source_find(uri))) {
1213                         ast_log(LOG_WARNING, "Invalid scheme: %s\n", uri);
1214                         return STASIS_ASR_EVENT_SOURCE_BAD_SCHEME;
1215                 }
1216
1217                 if (handler &&
1218                     ((res = handler(app, uri, event_source)))) {
1219                         return res;
1220                 }
1221         }
1222
1223         if (json) {
1224                 ast_debug(3, "%s: Successful; setting results\n", app_name);
1225                 *json = stasis_app_object_to_json(app);
1226         }
1227
1228         return STASIS_ASR_OK;
1229 }
1230
1231 enum stasis_app_subscribe_res stasis_app_subscribe_channel(const char *app_name,
1232         struct ast_channel *chan)
1233 {
1234         RAII_VAR(struct stasis_app *, app, find_app_by_name(app_name), ao2_cleanup);
1235         int res;
1236
1237         if (!app) {
1238                 return STASIS_ASR_APP_NOT_FOUND;
1239         }
1240
1241         ast_debug(3, "%s: Subscribing to %s\n", app_name, ast_channel_uniqueid(chan));
1242
1243         res = app_subscribe_channel(app, chan);
1244         if (res != 0) {
1245                 ast_log(LOG_ERROR, "Error subscribing app '%s' to channel '%s'\n",
1246                         app_name, ast_channel_uniqueid(chan));
1247                 return STASIS_ASR_INTERNAL_ERROR;
1248         }
1249
1250         return STASIS_ASR_OK;
1251 }
1252
1253
1254 /*!
1255  * \internal
1256  * \brief Subscribe an app to an event source.
1257  *
1258  * \param app subscribing application
1259  * \param uri scheme:id of an event source
1260  * \param event_source being subscribed to
1261  *
1262  * \retval stasis_app_subscribe_res return code.
1263  */
1264 static enum stasis_app_subscribe_res app_subscribe(
1265         struct stasis_app *app, const char *uri,
1266         struct stasis_app_event_source *event_source)
1267 {
1268         const char *app_name = stasis_app_name(app);
1269         RAII_VAR(void *, obj, NULL, ao2_cleanup);
1270
1271         ast_debug(3, "%s: Checking %s\n", app_name, uri);
1272
1273         if (!event_source->find ||
1274             (!(obj = event_source->find(app, uri + strlen(event_source->scheme))))) {
1275                 ast_log(LOG_WARNING, "Event source not found: %s\n", uri);
1276                 return STASIS_ASR_EVENT_SOURCE_NOT_FOUND;
1277         }
1278
1279         ast_debug(3, "%s: Subscribing to %s\n", app_name, uri);
1280
1281         if (!event_source->subscribe || (event_source->subscribe(app, obj))) {
1282                 ast_log(LOG_WARNING, "Error subscribing app '%s' to '%s'\n",
1283                         app_name, uri);
1284                 return STASIS_ASR_INTERNAL_ERROR;
1285         }
1286
1287         return STASIS_ASR_OK;
1288 }
1289
1290 enum stasis_app_subscribe_res stasis_app_subscribe(const char *app_name,
1291         const char **event_source_uris, int event_sources_count,
1292         struct ast_json **json)
1293 {
1294         return app_handle_subscriptions(
1295                 app_name, event_source_uris, event_sources_count,
1296                 json, app_subscribe);
1297 }
1298
1299 /*!
1300  * \internal
1301  * \brief Unsubscribe an app from an event source.
1302  *
1303  * \param app application to unsubscribe
1304  * \param uri scheme:id of an event source
1305  * \param event_source being unsubscribed from
1306  *
1307  * \retval stasis_app_subscribe_res return code.
1308  */
1309 static enum stasis_app_subscribe_res app_unsubscribe(
1310         struct stasis_app *app, const char *uri,
1311         struct stasis_app_event_source *event_source)
1312 {
1313         const char *app_name = stasis_app_name(app);
1314         const char *id = uri + strlen(event_source->scheme);
1315
1316         if (!event_source->is_subscribed ||
1317             (!event_source->is_subscribed(app, id))) {
1318                 return STASIS_ASR_EVENT_SOURCE_NOT_FOUND;
1319         }
1320
1321         ast_debug(3, "%s: Unsubscribing from %s\n", app_name, uri);
1322
1323         if (!event_source->unsubscribe || (event_source->unsubscribe(app, id))) {
1324                 ast_log(LOG_WARNING, "Error unsubscribing app '%s' to '%s'\n",
1325                         app_name, uri);
1326                 return -1;
1327         }
1328         return 0;
1329 }
1330
1331 enum stasis_app_subscribe_res stasis_app_unsubscribe(const char *app_name,
1332         const char **event_source_uris, int event_sources_count,
1333         struct ast_json **json)
1334 {
1335         return app_handle_subscriptions(
1336                 app_name, event_source_uris, event_sources_count,
1337                 json, app_unsubscribe);
1338 }
1339
1340 enum stasis_app_user_event_res stasis_app_user_event(const char *app_name,
1341         const char *event_name,
1342         const char **source_uris, int sources_count,
1343         struct ast_json *json_variables)
1344 {
1345         RAII_VAR(struct stasis_app *, app, find_app_by_name(app_name), ao2_cleanup);
1346         RAII_VAR(struct ast_json *, blob, NULL, ast_json_unref);
1347         RAII_VAR(struct ast_multi_object_blob *, multi, NULL, ao2_cleanup);
1348         RAII_VAR(void *, obj, NULL, ao2_cleanup);
1349         RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
1350         enum stasis_app_subscribe_res res = STASIS_APP_USER_INTERNAL_ERROR;
1351         struct ast_json *json_value;
1352         int have_channel = 0;
1353         int i;
1354
1355         if (!app) {
1356                 ast_log(LOG_WARNING, "App %s not found\n", app_name);
1357                 return STASIS_APP_USER_APP_NOT_FOUND;
1358         }
1359
1360         blob = json_variables;
1361         if (!blob) {
1362                 blob = ast_json_pack("{}");
1363         }
1364         json_value = ast_json_string_create(event_name);
1365         if (!json_value) {
1366                 ast_log(LOG_ERROR, "unable to create json string\n");
1367                 return res;
1368         }
1369         if (ast_json_object_set(blob, "eventname", json_value)) {
1370                 ast_log(LOG_ERROR, "unable to set eventname to blob\n");
1371                 return res;
1372         }
1373
1374         multi = ast_multi_object_blob_create(blob);
1375
1376         for (i = 0; i < sources_count; ++i) {
1377                 const char *uri = source_uris[i];
1378                 void *snapshot=NULL;
1379                 enum stasis_user_multi_object_snapshot_type type;
1380
1381                 if (ast_begins_with(uri, "channel:")) {
1382                         type = STASIS_UMOS_CHANNEL;
1383                         snapshot = ast_channel_snapshot_get_latest(uri + 8);
1384                         have_channel = 1;
1385                 } else if (ast_begins_with(uri, "bridge:")) {
1386                         type = STASIS_UMOS_BRIDGE;
1387                         snapshot = ast_bridge_snapshot_get_latest(uri + 7);
1388                 } else if (ast_begins_with(uri, "endpoint:")) {
1389                         type = STASIS_UMOS_ENDPOINT;
1390                         snapshot = ast_endpoint_latest_snapshot(uri + 9, NULL);
1391                 } else {
1392                         ast_log(LOG_WARNING, "Invalid scheme: %s\n", uri);
1393                         return STASIS_APP_USER_EVENT_SOURCE_BAD_SCHEME;
1394                 }
1395                 if (!snapshot) {
1396                         ast_log(LOG_ERROR, "Unable to get snapshot for %s\n", uri);
1397                         return STASIS_APP_USER_EVENT_SOURCE_NOT_FOUND;
1398                 }
1399                 ast_multi_object_blob_add(multi, type, snapshot);
1400         }
1401
1402         message = stasis_message_create(ast_multi_user_event_type(), multi);
1403         if (!message) {
1404                 ast_log(LOG_ERROR, "Unable to create stasis user event message\n");
1405                 return res;
1406         }
1407
1408         /*
1409          * Publishing to two different topics is normally to be avoided -- except
1410          * in this case both are final destinations with no forwards (only listeners).
1411          * The message has to be delivered to the application topic for ARI, but a
1412          * copy is also delivered directly to the manager for AMI if there is a channel.
1413          */
1414         stasis_publish(ast_app_get_topic(app), message);
1415
1416         if (have_channel) {
1417                 stasis_publish(ast_manager_get_topic(), message);
1418         }
1419
1420         return STASIS_APP_USER_OK;
1421 }
1422
1423 void stasis_app_ref(void)
1424 {
1425         ast_module_ref(ast_module_info->self);
1426 }
1427
1428 void stasis_app_unref(void)
1429 {
1430         ast_module_unref(ast_module_info->self);
1431 }
1432
1433 static int unload_module(void)
1434 {
1435         stasis_app_unregister_event_sources();
1436
1437         messaging_cleanup();
1438
1439         ao2_cleanup(apps_registry);
1440         apps_registry = NULL;
1441
1442         ao2_cleanup(app_controls);
1443         app_controls = NULL;
1444
1445         ao2_cleanup(app_bridges);
1446         app_bridges = NULL;
1447
1448         ao2_cleanup(app_bridges_moh);
1449         app_bridges_moh = NULL;
1450
1451         ao2_cleanup(app_bridges_playback);
1452         app_bridges_playback = NULL;
1453
1454         return 0;
1455 }
1456
1457 /* \brief Sanitization callback for channel snapshots */
1458 static int channel_snapshot_sanitizer(const struct ast_channel_snapshot *snapshot)
1459 {
1460         if (!snapshot || !(snapshot->tech_properties & AST_CHAN_TP_INTERNAL)) {
1461                 return 0;
1462         }
1463         return 1;
1464 }
1465
1466 /* \brief Sanitization callback for channel unique IDs */
1467 static int channel_id_sanitizer(const char *id)
1468 {
1469         RAII_VAR(struct ast_channel_snapshot *, snapshot, ast_channel_snapshot_get_latest(id), ao2_cleanup);
1470
1471         return channel_snapshot_sanitizer(snapshot);
1472 }
1473
1474 /* \brief Sanitization callbacks for communication to Stasis applications */
1475 struct stasis_message_sanitizer app_sanitizer = {
1476         .channel_id = channel_id_sanitizer,
1477         .channel_snapshot = channel_snapshot_sanitizer,
1478 };
1479
1480 struct stasis_message_sanitizer *stasis_app_get_sanitizer(void)
1481 {
1482         return &app_sanitizer;
1483 }
1484
1485 static int load_module(void)
1486 {
1487         apps_registry = ao2_container_alloc(APPS_NUM_BUCKETS, app_hash, app_compare);
1488         app_controls = ao2_container_alloc(CONTROLS_NUM_BUCKETS, control_hash, control_compare);
1489         app_bridges = ao2_container_alloc(BRIDGES_NUM_BUCKETS, bridges_hash, bridges_compare);
1490         app_bridges_moh = ao2_container_alloc_hash(
1491                 AO2_ALLOC_OPT_LOCK_MUTEX, AO2_CONTAINER_ALLOC_OPT_DUPS_REJECT,
1492                 37, bridges_channel_hash_fn, bridges_channel_sort_fn, NULL);
1493         app_bridges_playback = ao2_container_alloc_hash(
1494                 AO2_ALLOC_OPT_LOCK_MUTEX, AO2_CONTAINER_ALLOC_OPT_DUPS_REJECT,
1495                 37, bridges_channel_hash_fn, bridges_channel_sort_fn, NULL);
1496         if (!apps_registry || !app_controls || !app_bridges || !app_bridges_moh || !app_bridges_playback) {
1497                 unload_module();
1498                 return AST_MODULE_LOAD_FAILURE;
1499         }
1500
1501         if (messaging_init()) {
1502                 unload_module();
1503                 return AST_MODULE_LOAD_FAILURE;
1504         }
1505
1506         bridge_stasis_init();
1507
1508         stasis_app_register_event_sources();
1509
1510         return AST_MODULE_LOAD_SUCCESS;
1511 }
1512
1513 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS, "Stasis application support",
1514         .support_level = AST_MODULE_SUPPORT_CORE,
1515         .load = load_module,
1516         .unload = unload_module,
1517         );