chan_ooh323: fix h323 log file path
[asterisk/asterisk.git] / res / res_stasis.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2012 - 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis application support.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  *
25  * <code>res_stasis.so</code> brings together the various components of the
26  * Stasis application infrastructure.
27  *
28  * First, there's the Stasis application handler, stasis_app_exec(). This is
29  * called by <code>app_stasis.so</code> to give control of a channel to the
30  * Stasis application code from the dialplan.
31  *
32  * While a channel is in stasis_app_exec(), it has a \ref stasis_app_control
33  * object, which may be used to control the channel.
34  *
35  * To control the channel, commands may be sent to channel using
36  * stasis_app_send_command() and stasis_app_send_async_command().
37  *
38  * Alongside this, applications may be registered/unregistered using
39  * stasis_app_register()/stasis_app_unregister(). While a channel is in Stasis,
40  * events received on the channel's topic are converted to JSON and forwarded to
41  * the \ref stasis_app_cb. The application may also subscribe to the channel to
42  * continue to receive messages even after the channel has left Stasis, but it
43  * will not be able to control it.
44  *
45  * Given all the stuff that comes together in this module, it's been broken up
46  * into several pieces that are in <code>res/stasis/</code> and compiled into
47  * <code>res_stasis.so</code>.
48  */
49
50 /*** MODULEINFO
51         <support_level>core</support_level>
52  ***/
53
54 #include "asterisk.h"
55
56 #include "asterisk/astobj2.h"
57 #include "asterisk/callerid.h"
58 #include "asterisk/module.h"
59 #include "asterisk/stasis_app_impl.h"
60 #include "asterisk/stasis_channels.h"
61 #include "asterisk/stasis_bridges.h"
62 #include "asterisk/stasis_endpoints.h"
63 #include "asterisk/stasis_message_router.h"
64 #include "asterisk/strings.h"
65 #include "stasis/app.h"
66 #include "stasis/control.h"
67 #include "stasis/messaging.h"
68 #include "stasis/stasis_bridge.h"
69 #include "asterisk/core_unreal.h"
70 #include "asterisk/musiconhold.h"
71 #include "asterisk/causes.h"
72 #include "asterisk/stringfields.h"
73 #include "asterisk/bridge_after.h"
74 #include "asterisk/format_cache.h"
75
76 /*! Time to wait for a frame in the application */
77 #define MAX_WAIT_MS 200
78
79 /*!
80  * \brief Number of buckets for the Stasis application hash table.  Remember to
81  * keep it a prime number!
82  */
83 #define APPS_NUM_BUCKETS 127
84
85 /*!
86  * \brief Number of buckets for the Stasis application hash table.  Remember to
87  * keep it a prime number!
88  */
89 #define CONTROLS_NUM_BUCKETS 127
90
91 /*!
92  * \brief Number of buckets for the Stasis bridges hash table.  Remember to
93  * keep it a prime number!
94  */
95 #define BRIDGES_NUM_BUCKETS 127
96
97 /*!
98  * \brief Stasis application container.
99  */
100 struct ao2_container *apps_registry;
101
102 struct ao2_container *app_controls;
103
104 struct ao2_container *app_bridges;
105
106 struct ao2_container *app_bridges_moh;
107
108 struct ao2_container *app_bridges_playback;
109
110 /*!
111  * \internal \brief List of registered event sources.
112  */
113 AST_RWLIST_HEAD_STATIC(event_sources, stasis_app_event_source);
114
115 static struct ast_json *stasis_end_to_json(struct stasis_message *message,
116                 const struct stasis_message_sanitizer *sanitize)
117 {
118         struct ast_channel_blob *payload = stasis_message_data(message);
119         struct ast_json *msg;
120
121         if (sanitize && sanitize->channel_snapshot &&
122                         sanitize->channel_snapshot(payload->snapshot)) {
123                 return NULL;
124         }
125
126         msg = ast_json_pack("{s: s, s: O, s: o}",
127                 "type", "StasisEnd",
128                 "timestamp", ast_json_object_get(payload->blob, "timestamp"),
129                 "channel", ast_channel_snapshot_to_json(payload->snapshot, sanitize));
130         if (!msg) {
131                 ast_log(LOG_ERROR, "Failed to pack JSON for StasisEnd message\n");
132                 return NULL;
133         }
134
135         return msg;
136 }
137
138 STASIS_MESSAGE_TYPE_DEFN_LOCAL(end_message_type,
139         .to_json = stasis_end_to_json);
140
141 struct start_message_blob {
142         struct ast_channel_snapshot *channel;           /*!< Channel that is entering Stasis() */
143         struct ast_channel_snapshot *replace_channel;   /*!< Channel that is being replaced (optional) */
144         struct ast_json *blob;                          /*!< JSON blob containing timestamp and args */
145 };
146
147 static struct ast_json *stasis_start_to_json(struct stasis_message *message,
148                 const struct stasis_message_sanitizer *sanitize)
149 {
150         struct start_message_blob *payload = stasis_message_data(message);
151         struct ast_json *msg;
152
153         if (sanitize && sanitize->channel_snapshot &&
154                         sanitize->channel_snapshot(payload->channel)) {
155                 return NULL;
156         }
157
158         msg = ast_json_pack("{s: s, s: O, s: O, s: o}",
159                 "type", "StasisStart",
160                 "timestamp", ast_json_object_get(payload->blob, "timestamp"),
161                 "args", ast_json_object_get(payload->blob, "args"),
162                 "channel", ast_channel_snapshot_to_json(payload->channel, NULL));
163         if (!msg) {
164                 ast_log(LOG_ERROR, "Failed to pack JSON for StasisStart message\n");
165                 return NULL;
166         }
167
168         if (payload->replace_channel) {
169                 int res = ast_json_object_set(msg, "replace_channel",
170                         ast_channel_snapshot_to_json(payload->replace_channel, NULL));
171
172                 if (res) {
173                         ast_json_unref(msg);
174                         ast_log(LOG_ERROR, "Failed to append JSON for StasisStart message\n");
175                         return NULL;
176                 }
177         }
178
179         return msg;
180 }
181
182 STASIS_MESSAGE_TYPE_DEFN_LOCAL(start_message_type,
183         .to_json = stasis_start_to_json);
184
185 /*! AO2 hash function for \ref app */
186 static int app_hash(const void *obj, const int flags)
187 {
188         const struct stasis_app *app;
189         const char *key;
190
191         switch (flags & OBJ_SEARCH_MASK) {
192         case OBJ_SEARCH_KEY:
193                 key = obj;
194                 break;
195         case OBJ_SEARCH_OBJECT:
196                 app = obj;
197                 key = stasis_app_name(app);
198                 break;
199         default:
200                 /* Hash can only work on something with a full key. */
201                 ast_assert(0);
202                 return 0;
203         }
204         return ast_str_hash(key);
205 }
206
207 /*! AO2 comparison function for \ref app */
208 static int app_compare(void *obj, void *arg, int flags)
209 {
210         const struct stasis_app *object_left = obj;
211         const struct stasis_app *object_right = arg;
212         const char *right_key = arg;
213         int cmp;
214
215         switch (flags & OBJ_SEARCH_MASK) {
216         case OBJ_SEARCH_OBJECT:
217                 right_key = stasis_app_name(object_right);
218                 /* Fall through */
219         case OBJ_SEARCH_KEY:
220                 cmp = strcmp(stasis_app_name(object_left), right_key);
221                 break;
222         case OBJ_SEARCH_PARTIAL_KEY:
223                 /*
224                  * We could also use a partial key struct containing a length
225                  * so strlen() does not get called for every comparison instead.
226                  */
227                 cmp = strncmp(stasis_app_name(object_left), right_key, strlen(right_key));
228                 break;
229         default:
230                 /*
231                  * What arg points to is specific to this traversal callback
232                  * and has no special meaning to astobj2.
233                  */
234                 cmp = 0;
235                 break;
236         }
237         if (cmp) {
238                 return 0;
239         }
240         /*
241          * At this point the traversal callback is identical to a sorted
242          * container.
243          */
244         return CMP_MATCH;
245 }
246
247 /*! AO2 hash function for \ref stasis_app_control */
248 static int control_hash(const void *obj, const int flags)
249 {
250         const struct stasis_app_control *control;
251         const char *key;
252
253         switch (flags & OBJ_SEARCH_MASK) {
254         case OBJ_SEARCH_KEY:
255                 key = obj;
256                 break;
257         case OBJ_SEARCH_OBJECT:
258                 control = obj;
259                 key = stasis_app_control_get_channel_id(control);
260                 break;
261         default:
262                 /* Hash can only work on something with a full key. */
263                 ast_assert(0);
264                 return 0;
265         }
266         return ast_str_hash(key);
267 }
268
269 /*! AO2 comparison function for \ref stasis_app_control */
270 static int control_compare(void *obj, void *arg, int flags)
271 {
272         const struct stasis_app_control *object_left = obj;
273         const struct stasis_app_control *object_right = arg;
274         const char *right_key = arg;
275         int cmp;
276
277         switch (flags & OBJ_SEARCH_MASK) {
278         case OBJ_SEARCH_OBJECT:
279                 right_key = stasis_app_control_get_channel_id(object_right);
280                 /* Fall through */
281         case OBJ_SEARCH_KEY:
282                 cmp = strcmp(stasis_app_control_get_channel_id(object_left), right_key);
283                 break;
284         case OBJ_SEARCH_PARTIAL_KEY:
285                 /*
286                  * We could also use a partial key struct containing a length
287                  * so strlen() does not get called for every comparison instead.
288                  */
289                 cmp = strncmp(stasis_app_control_get_channel_id(object_left), right_key, strlen(right_key));
290                 break;
291         default:
292                 /*
293                  * What arg points to is specific to this traversal callback
294                  * and has no special meaning to astobj2.
295                  */
296                 cmp = 0;
297                 break;
298         }
299         if (cmp) {
300                 return 0;
301         }
302         /*
303          * At this point the traversal callback is identical to a sorted
304          * container.
305          */
306         return CMP_MATCH;
307 }
308
309 static int cleanup_cb(void *obj, void *arg, int flags)
310 {
311         struct stasis_app *app = obj;
312
313         if (!app_is_finished(app)) {
314                 return 0;
315         }
316
317         ast_verb(1, "Shutting down application '%s'\n", stasis_app_name(app));
318         app_shutdown(app);
319
320         return CMP_MATCH;
321
322 }
323
324 /*!
325  * \brief Clean up any old apps that we don't need any more.
326  */
327 static void cleanup(void)
328 {
329         ao2_callback(apps_registry, OBJ_MULTIPLE | OBJ_NODATA | OBJ_UNLINK,
330                 cleanup_cb, NULL);
331 }
332
333 struct stasis_app_control *stasis_app_control_create(struct ast_channel *chan)
334 {
335         return control_create(chan, NULL);
336 }
337
338 struct stasis_app_control *stasis_app_control_find_by_channel(
339         const struct ast_channel *chan)
340 {
341         if (chan == NULL) {
342                 return NULL;
343         }
344
345         return stasis_app_control_find_by_channel_id(
346                 ast_channel_uniqueid(chan));
347 }
348
349 struct stasis_app_control *stasis_app_control_find_by_channel_id(
350         const char *channel_id)
351 {
352         return ao2_find(app_controls, channel_id, OBJ_SEARCH_KEY);
353 }
354
355 /*! AO2 hash function for bridges container  */
356 static int bridges_hash(const void *obj, const int flags)
357 {
358         const struct ast_bridge *bridge;
359         const char *key;
360
361         switch (flags & OBJ_SEARCH_MASK) {
362         case OBJ_SEARCH_KEY:
363                 key = obj;
364                 break;
365         case OBJ_SEARCH_OBJECT:
366                 bridge = obj;
367                 key = bridge->uniqueid;
368                 break;
369         default:
370                 /* Hash can only work on something with a full key. */
371                 ast_assert(0);
372                 return 0;
373         }
374         return ast_str_hash(key);
375 }
376
377 /*! AO2 comparison function for bridges container */
378 static int bridges_compare(void *obj, void *arg, int flags)
379 {
380         const struct ast_bridge *object_left = obj;
381         const struct ast_bridge *object_right = arg;
382         const char *right_key = arg;
383         int cmp;
384
385         switch (flags & OBJ_SEARCH_MASK) {
386         case OBJ_SEARCH_OBJECT:
387                 right_key = object_right->uniqueid;
388                 /* Fall through */
389         case OBJ_SEARCH_KEY:
390                 cmp = strcmp(object_left->uniqueid, right_key);
391                 break;
392         case OBJ_SEARCH_PARTIAL_KEY:
393                 /*
394                  * We could also use a partial key struct containing a length
395                  * so strlen() does not get called for every comparison instead.
396                  */
397                 cmp = strncmp(object_left->uniqueid, right_key, strlen(right_key));
398                 break;
399         default:
400                 /*
401                  * What arg points to is specific to this traversal callback
402                  * and has no special meaning to astobj2.
403                  */
404                 cmp = 0;
405                 break;
406         }
407         if (cmp) {
408                 return 0;
409         }
410         /*
411          * At this point the traversal callback is identical to a sorted
412          * container.
413          */
414         return CMP_MATCH;
415 }
416
417 /*!
418  *  Used with app_bridges_moh and app_bridge_control, they provide links
419  *  between bridges and channels used for ARI application purposes
420  */
421 struct stasis_app_bridge_channel_wrapper {
422         AST_DECLARE_STRING_FIELDS(
423                 AST_STRING_FIELD(channel_id);
424                 AST_STRING_FIELD(bridge_id);
425         );
426 };
427
428 static void stasis_app_bridge_channel_wrapper_destructor(void *obj)
429 {
430         struct stasis_app_bridge_channel_wrapper *wrapper = obj;
431         ast_string_field_free_memory(wrapper);
432 }
433
434 /*! AO2 hash function for the bridges moh container */
435 static int bridges_channel_hash_fn(const void *obj, const int flags)
436 {
437         const struct stasis_app_bridge_channel_wrapper *wrapper;
438         const char *key;
439
440         switch (flags & OBJ_SEARCH_MASK) {
441         case OBJ_SEARCH_KEY:
442                 key = obj;
443                 break;
444         case OBJ_SEARCH_OBJECT:
445                 wrapper = obj;
446                 key = wrapper->bridge_id;
447                 break;
448         default:
449                 /* Hash can only work on something with a full key. */
450                 ast_assert(0);
451                 return 0;
452         }
453         return ast_str_hash(key);
454 }
455
456 static int bridges_channel_sort_fn(const void *obj_left, const void *obj_right, const int flags)
457 {
458         const struct stasis_app_bridge_channel_wrapper *left = obj_left;
459         const struct stasis_app_bridge_channel_wrapper *right = obj_right;
460         const char *right_key = obj_right;
461         int cmp;
462
463         switch (flags & OBJ_SEARCH_MASK) {
464         case OBJ_SEARCH_OBJECT:
465                 right_key = right->bridge_id;
466                 /* Fall through */
467         case OBJ_SEARCH_KEY:
468                 cmp = strcmp(left->bridge_id, right_key);
469                 break;
470         case OBJ_SEARCH_PARTIAL_KEY:
471                 cmp = strncmp(left->bridge_id, right_key, strlen(right_key));
472                 break;
473         default:
474                 /* Sort can only work on something with a full or partial key. */
475                 ast_assert(0);
476                 cmp = 0;
477                 break;
478         }
479         return cmp;
480 }
481
482 /*! Request a bridge MOH channel */
483 static struct ast_channel *prepare_bridge_moh_channel(void)
484 {
485         struct ast_channel *chan;
486         struct ast_format_cap *cap;
487
488         cap = ast_format_cap_alloc(AST_FORMAT_CAP_FLAG_DEFAULT);
489         if (!cap) {
490                 return NULL;
491         }
492
493         ast_format_cap_append(cap, ast_format_slin, 0);
494
495         chan = ast_request("Announcer", cap, NULL, NULL, "ARI_MOH", NULL);
496         ao2_ref(cap, -1);
497
498         return chan;
499 }
500
501 /*! Provides the moh channel with a thread so it can actually play its music */
502 static void *moh_channel_thread(void *data)
503 {
504         struct stasis_app_bridge_channel_wrapper *moh_wrapper = data;
505         struct ast_channel *moh_channel = ast_channel_get_by_name(moh_wrapper->channel_id);
506         struct ast_frame *f;
507
508         if (!moh_channel) {
509                 ao2_unlink(app_bridges_moh, moh_wrapper);
510                 ao2_ref(moh_wrapper, -1);
511                 return NULL;
512         }
513
514         /* Read and discard any frame coming from the stasis bridge. */
515         for (;;) {
516                 if (ast_waitfor(moh_channel, -1) < 0) {
517                         /* Error or hungup */
518                         break;
519                 }
520
521                 f = ast_read(moh_channel);
522                 if (!f) {
523                         /* Hungup */
524                         break;
525                 }
526                 ast_frfree(f);
527         }
528
529         ao2_unlink(app_bridges_moh, moh_wrapper);
530         ao2_ref(moh_wrapper, -1);
531
532         ast_moh_stop(moh_channel);
533         ast_hangup(moh_channel);
534
535         return NULL;
536 }
537
538 /*!
539  * \internal
540  * \brief Creates, pushes, and links a channel for playing music on hold to bridge
541  *
542  * \param bridge Which bridge this moh channel exists for
543  *
544  * \retval NULL if the channel could not be created, pushed, or linked
545  * \retval Reference to the channel on success
546  */
547 static struct ast_channel *bridge_moh_create(struct ast_bridge *bridge)
548 {
549         struct stasis_app_bridge_channel_wrapper *new_wrapper;
550         struct ast_channel *chan;
551         pthread_t threadid;
552
553         chan = prepare_bridge_moh_channel();
554         if (!chan) {
555                 return NULL;
556         }
557
558         if (stasis_app_channel_unreal_set_internal(chan)) {
559                 ast_hangup(chan);
560                 return NULL;
561         }
562
563         if (ast_unreal_channel_push_to_bridge(chan, bridge,
564                 AST_BRIDGE_CHANNEL_FLAG_IMMOVABLE | AST_BRIDGE_CHANNEL_FLAG_LONELY)) {
565                 ast_hangup(chan);
566                 return NULL;
567         }
568
569         new_wrapper = ao2_alloc_options(sizeof(*new_wrapper),
570                 stasis_app_bridge_channel_wrapper_destructor, AO2_ALLOC_OPT_LOCK_NOLOCK);
571         if (!new_wrapper) {
572                 ast_hangup(chan);
573                 return NULL;
574         }
575
576         if (ast_string_field_init(new_wrapper, AST_UUID_STR_LEN + AST_CHANNEL_NAME)
577                 || ast_string_field_set(new_wrapper, bridge_id, bridge->uniqueid)
578                 || ast_string_field_set(new_wrapper, channel_id, ast_channel_uniqueid(chan))) {
579                 ao2_ref(new_wrapper, -1);
580                 ast_hangup(chan);
581                 return NULL;
582         }
583
584         if (!ao2_link_flags(app_bridges_moh, new_wrapper, OBJ_NOLOCK)) {
585                 ao2_ref(new_wrapper, -1);
586                 ast_hangup(chan);
587                 return NULL;
588         }
589
590         /* Pass the new_wrapper ref to moh_channel_thread() */
591         if (ast_pthread_create_detached(&threadid, NULL, moh_channel_thread, new_wrapper)) {
592                 ast_log(LOG_ERROR, "Failed to create channel thread. Abandoning MOH channel creation.\n");
593                 ao2_unlink_flags(app_bridges_moh, new_wrapper, OBJ_NOLOCK);
594                 ao2_ref(new_wrapper, -1);
595                 ast_hangup(chan);
596                 return NULL;
597         }
598
599         return chan;
600 }
601
602 struct ast_channel *stasis_app_bridge_moh_channel(struct ast_bridge *bridge)
603 {
604         struct ast_channel *chan;
605         struct stasis_app_bridge_channel_wrapper *moh_wrapper;
606
607         ao2_lock(app_bridges_moh);
608         moh_wrapper = ao2_find(app_bridges_moh, bridge->uniqueid, OBJ_SEARCH_KEY | OBJ_NOLOCK);
609         if (!moh_wrapper) {
610                 chan = bridge_moh_create(bridge);
611         }
612         ao2_unlock(app_bridges_moh);
613
614         if (moh_wrapper) {
615                 chan = ast_channel_get_by_name(moh_wrapper->channel_id);
616                 ao2_ref(moh_wrapper, -1);
617         }
618
619         return chan;
620 }
621
622 int stasis_app_bridge_moh_stop(struct ast_bridge *bridge)
623 {
624         struct stasis_app_bridge_channel_wrapper *moh_wrapper;
625         struct ast_channel *chan;
626
627         moh_wrapper = ao2_find(app_bridges_moh, bridge->uniqueid, OBJ_SEARCH_KEY | OBJ_UNLINK);
628         if (!moh_wrapper) {
629                 return -1;
630         }
631
632         chan = ast_channel_get_by_name(moh_wrapper->channel_id);
633         ao2_ref(moh_wrapper, -1);
634         if (!chan) {
635                 return -1;
636         }
637
638         ast_moh_stop(chan);
639         ast_softhangup(chan, AST_CAUSE_NORMAL_CLEARING);
640         ao2_cleanup(chan);
641
642         return 0;
643 }
644
645 /*! Removes the bridge to playback channel link */
646 static void remove_bridge_playback(char *bridge_id)
647 {
648         struct stasis_app_bridge_channel_wrapper *wrapper;
649         struct stasis_app_control *control;
650
651         wrapper = ao2_find(app_bridges_playback, bridge_id, OBJ_SEARCH_KEY | OBJ_UNLINK);
652
653         if (wrapper) {
654                 control = stasis_app_control_find_by_channel_id(wrapper->channel_id);
655                 if (control) {
656                         ao2_unlink(app_controls, control);
657                         ao2_ref(control, -1);
658                 }
659                 ao2_ref(wrapper, -1);
660         }
661         ast_free(bridge_id);
662 }
663
664 static void playback_after_bridge_cb_failed(enum ast_bridge_after_cb_reason reason, void *data)
665 {
666         char *bridge_id = data;
667
668         remove_bridge_playback(bridge_id);
669 }
670
671 static void playback_after_bridge_cb(struct ast_channel *chan, void *data)
672 {
673         char *bridge_id = data;
674
675         remove_bridge_playback(bridge_id);
676 }
677
678 int stasis_app_bridge_playback_channel_add(struct ast_bridge *bridge,
679         struct ast_channel *chan,
680         struct stasis_app_control *control)
681 {
682         RAII_VAR(struct stasis_app_bridge_channel_wrapper *, new_wrapper, NULL, ao2_cleanup);
683         char *bridge_id = ast_strdup(bridge->uniqueid);
684
685         if (!bridge_id) {
686                 return -1;
687         }
688
689         if (ast_bridge_set_after_callback(chan,
690                 playback_after_bridge_cb, playback_after_bridge_cb_failed, bridge_id)) {
691                 ast_free(bridge_id);
692                 return -1;
693         }
694
695         new_wrapper = ao2_alloc_options(sizeof(*new_wrapper),
696                 stasis_app_bridge_channel_wrapper_destructor, AO2_ALLOC_OPT_LOCK_NOLOCK);
697         if (!new_wrapper) {
698                 return -1;
699         }
700
701         if (ast_string_field_init(new_wrapper, 32)) {
702                 return -1;
703         }
704
705         ast_string_field_set(new_wrapper, bridge_id, bridge->uniqueid);
706         ast_string_field_set(new_wrapper, channel_id, ast_channel_uniqueid(chan));
707
708         if (!ao2_link(app_bridges_playback, new_wrapper)) {
709                 return -1;
710         }
711
712         ao2_link(app_controls, control);
713         return 0;
714 }
715
716 void stasis_app_bridge_playback_channel_remove(char *bridge_id,
717         struct stasis_app_control *control)
718 {
719         struct stasis_app_bridge_channel_wrapper *wrapper;
720
721         wrapper = ao2_find(app_bridges_playback, bridge_id, OBJ_SEARCH_KEY | OBJ_UNLINK);
722         if (wrapper) {
723                 /* If wrapper is not found, then that means the after bridge callback has been
724                  * called or is in progress. No need to unlink the control here since that has
725                  * been done or is about to be done in the after bridge callback
726                  */
727                 ao2_unlink(app_controls, control);
728                 ao2_ref(wrapper, -1);
729         }
730 }
731
732 struct ast_channel *stasis_app_bridge_playback_channel_find(struct ast_bridge *bridge)
733 {
734         struct stasis_app_bridge_channel_wrapper *playback_wrapper;
735         struct ast_channel *chan;
736
737         playback_wrapper = ao2_find(app_bridges_playback, bridge->uniqueid, OBJ_SEARCH_KEY);
738         if (!playback_wrapper) {
739                 return NULL;
740         }
741
742         chan = ast_channel_get_by_name(playback_wrapper->channel_id);
743         ao2_ref(playback_wrapper, -1);
744         return chan;
745 }
746
747 struct ast_bridge *stasis_app_bridge_find_by_id(
748         const char *bridge_id)
749 {
750         return ao2_find(app_bridges, bridge_id, OBJ_SEARCH_KEY);
751 }
752
753
754 /*!
755  * \brief In addition to running ao2_cleanup(), this function also removes the
756  * object from the app_controls container.
757  */
758 static void control_unlink(struct stasis_app_control *control)
759 {
760         if (!control) {
761                 return;
762         }
763
764         ao2_unlink(app_controls, control);
765         ao2_cleanup(control);
766 }
767
768 static struct ast_bridge *bridge_create_common(const char *type, const char *name, const char *id, int invisible)
769 {
770         struct ast_bridge *bridge;
771         char *requested_type, *requested_types = ast_strdupa(S_OR(type, "mixing"));
772         int capabilities = 0;
773         int flags = AST_BRIDGE_FLAG_MERGE_INHIBIT_FROM | AST_BRIDGE_FLAG_MERGE_INHIBIT_TO
774                 | AST_BRIDGE_FLAG_SWAP_INHIBIT_FROM | AST_BRIDGE_FLAG_SWAP_INHIBIT_TO
775                 | AST_BRIDGE_FLAG_TRANSFER_BRIDGE_ONLY;
776         enum ast_bridge_video_mode_type video_mode = AST_BRIDGE_VIDEO_MODE_TALKER_SRC;
777
778         if (invisible) {
779                 flags |= AST_BRIDGE_FLAG_INVISIBLE;
780         }
781
782         while ((requested_type = strsep(&requested_types, ","))) {
783                 requested_type = ast_strip(requested_type);
784
785                 if (!strcmp(requested_type, "mixing")) {
786                         capabilities |= STASIS_BRIDGE_MIXING_CAPABILITIES;
787                         flags |= AST_BRIDGE_FLAG_SMART;
788                 } else if (!strcmp(requested_type, "holding")) {
789                         capabilities |= AST_BRIDGE_CAPABILITY_HOLDING;
790                 } else if (!strcmp(requested_type, "dtmf_events") ||
791                         !strcmp(requested_type, "proxy_media")) {
792                         capabilities &= ~AST_BRIDGE_CAPABILITY_NATIVE;
793                 } else if (!strcmp(requested_type, "video_sfu")) {
794                         video_mode = AST_BRIDGE_VIDEO_MODE_SFU;
795                 }
796         }
797
798         /* For an SFU video bridge we ensure it always remains in multimix for the best experience. */
799         if (video_mode == AST_BRIDGE_VIDEO_MODE_SFU) {
800                 capabilities = AST_BRIDGE_CAPABILITY_MULTIMIX;
801                 flags &= ~AST_BRIDGE_FLAG_SMART;
802         }
803
804         if (!capabilities
805                 /* Holding and mixing capabilities don't mix. */
806                 || ((capabilities & AST_BRIDGE_CAPABILITY_HOLDING)
807                         && (capabilities & (STASIS_BRIDGE_MIXING_CAPABILITIES)))) {
808                 return NULL;
809         }
810
811         bridge = bridge_stasis_new(capabilities, flags, name, id);
812         if (bridge) {
813                 if (video_mode == AST_BRIDGE_VIDEO_MODE_SFU) {
814                         ast_bridge_set_sfu_video_mode(bridge);
815                         /* We require a minimum 5 seconds between video updates to stop floods from clients,
816                          * this should rarely be changed but should become configurable in the future.
817                          */
818                         ast_bridge_set_video_update_discard(bridge, 5);
819                 } else {
820                         ast_bridge_set_talker_src_video_mode(bridge);
821                 }
822                 if (!ao2_link(app_bridges, bridge)) {
823                         ast_bridge_destroy(bridge, 0);
824                         bridge = NULL;
825                 }
826         }
827         return bridge;
828 }
829
830 struct ast_bridge *stasis_app_bridge_create(const char *type, const char *name, const char *id)
831 {
832         return bridge_create_common(type, name, id, 0);
833 }
834
835 struct ast_bridge *stasis_app_bridge_create_invisible(const char *type, const char *name, const char *id)
836 {
837         return bridge_create_common(type, name, id, 1);
838 }
839
840 void stasis_app_bridge_destroy(const char *bridge_id)
841 {
842         struct ast_bridge *bridge = stasis_app_bridge_find_by_id(bridge_id);
843         if (!bridge) {
844                 return;
845         }
846         ao2_unlink(app_bridges, bridge);
847         ast_bridge_destroy(bridge, 0);
848 }
849
850 struct replace_channel_store {
851         struct ast_channel_snapshot *snapshot;
852         char *app;
853 };
854
855 static void replace_channel_destroy(void *obj)
856 {
857         struct replace_channel_store *replace = obj;
858
859         ao2_cleanup(replace->snapshot);
860         ast_free(replace->app);
861         ast_free(replace);
862 }
863
864 static const struct ast_datastore_info replace_channel_store_info = {
865         .type = "replace-channel-store",
866         .destroy = replace_channel_destroy,
867 };
868
869 static struct replace_channel_store *get_replace_channel_store(struct ast_channel *chan, int no_create)
870 {
871         struct ast_datastore *datastore;
872         struct replace_channel_store *ret;
873
874         ast_channel_lock(chan);
875         datastore = ast_channel_datastore_find(chan, &replace_channel_store_info, NULL);
876         if (!datastore && !no_create) {
877                 datastore = ast_datastore_alloc(&replace_channel_store_info, NULL);
878                 if (datastore) {
879                         ast_channel_datastore_add(chan, datastore);
880                 }
881         }
882
883         if (!datastore) {
884                 ast_channel_unlock(chan);
885                 return NULL;
886         }
887
888         if (!datastore->data) {
889                 datastore->data = ast_calloc(1, sizeof(struct replace_channel_store));
890         }
891
892         ret = datastore->data;
893         ast_channel_unlock(chan);
894
895         return ret;
896 }
897
898 int app_set_replace_channel_snapshot(struct ast_channel *chan, struct ast_channel_snapshot *replace_snapshot)
899 {
900         struct replace_channel_store *replace = get_replace_channel_store(chan, 0);
901
902         if (!replace) {
903                 return -1;
904         }
905
906         ao2_replace(replace->snapshot, replace_snapshot);
907         return 0;
908 }
909
910 int app_set_replace_channel_app(struct ast_channel *chan, const char *replace_app)
911 {
912         struct replace_channel_store *replace = get_replace_channel_store(chan, 0);
913
914         if (!replace) {
915                 return -1;
916         }
917
918         ast_free(replace->app);
919         replace->app = NULL;
920
921         if (replace_app) {
922                 replace->app = ast_strdup(replace_app);
923                 if (!replace->app) {
924                         return -1;
925                 }
926         }
927
928         return 0;
929 }
930
931 static struct ast_channel_snapshot *get_replace_channel_snapshot(struct ast_channel *chan)
932 {
933         struct replace_channel_store *replace = get_replace_channel_store(chan, 1);
934         struct ast_channel_snapshot *replace_channel_snapshot;
935
936         if (!replace) {
937                 return NULL;
938         }
939
940         replace_channel_snapshot = replace->snapshot;
941         replace->snapshot = NULL;
942
943         return replace_channel_snapshot;
944 }
945
946 char *app_get_replace_channel_app(struct ast_channel *chan)
947 {
948         struct replace_channel_store *replace = get_replace_channel_store(chan, 1);
949         char *replace_channel_app;
950
951         if (!replace) {
952                 return NULL;
953         }
954
955         replace_channel_app = replace->app;
956         replace->app = NULL;
957
958         return replace_channel_app;
959 }
960
961 static void start_message_blob_dtor(void *obj)
962 {
963         struct start_message_blob *payload = obj;
964
965         ao2_cleanup(payload->channel);
966         ao2_cleanup(payload->replace_channel);
967         ast_json_unref(payload->blob);
968 }
969
970 static int send_start_msg_snapshots(struct ast_channel *chan, struct stasis_app *app,
971         int argc, char *argv[], struct ast_channel_snapshot *snapshot,
972         struct ast_channel_snapshot *replace_channel_snapshot)
973 {
974         struct ast_json *json_blob;
975         struct ast_json *json_args;
976         struct start_message_blob *payload;
977         struct stasis_message *msg;
978         int i;
979
980         if (app_subscribe_channel(app, chan)) {
981                 ast_log(LOG_ERROR, "Error subscribing app '%s' to channel '%s'\n",
982                         stasis_app_name(app), ast_channel_name(chan));
983                 return -1;
984         }
985
986         payload = ao2_alloc(sizeof(*payload), start_message_blob_dtor);
987         if (!payload) {
988                 ast_log(LOG_ERROR, "Error packing JSON for StasisStart message\n");
989                 return -1;
990         }
991
992         payload->channel = ao2_bump(snapshot);
993         payload->replace_channel = ao2_bump(replace_channel_snapshot);
994
995         json_blob = ast_json_pack("{s: s, s: o, s: []}",
996                 "app", stasis_app_name(app),
997                 "timestamp", ast_json_timeval(ast_tvnow(), NULL),
998                 "args");
999         if (!json_blob) {
1000                 ast_log(LOG_ERROR, "Error packing JSON for StasisStart message\n");
1001                 ao2_ref(payload, -1);
1002                 return -1;
1003         }
1004         payload->blob = json_blob;
1005
1006
1007         /* Append arguments to args array */
1008         json_args = ast_json_object_get(json_blob, "args");
1009         ast_assert(json_args != NULL);
1010         for (i = 0; i < argc; ++i) {
1011                 int r = ast_json_array_append(json_args,
1012                                               ast_json_string_create(argv[i]));
1013                 if (r != 0) {
1014                         ast_log(LOG_ERROR, "Error appending to StasisStart message\n");
1015                         ao2_ref(payload, -1);
1016                         return -1;
1017                 }
1018         }
1019
1020
1021         msg = stasis_message_create(start_message_type(), payload);
1022         ao2_ref(payload, -1);
1023         if (!msg) {
1024                 ast_log(LOG_ERROR, "Error sending StasisStart message\n");
1025                 return -1;
1026         }
1027
1028         if (replace_channel_snapshot) {
1029                 app_unsubscribe_channel_id(app, replace_channel_snapshot->base->uniqueid);
1030         }
1031         stasis_publish(ast_app_get_topic(app), msg);
1032         ao2_ref(msg, -1);
1033         return 0;
1034 }
1035
1036 static int send_start_msg(struct stasis_app *app, struct ast_channel *chan,
1037         int argc, char *argv[])
1038 {
1039         int ret = -1;
1040         struct ast_channel_snapshot *snapshot;
1041         struct ast_channel_snapshot *replace_channel_snapshot;
1042
1043         ast_assert(chan != NULL);
1044
1045         replace_channel_snapshot = get_replace_channel_snapshot(chan);
1046
1047         /* Set channel info */
1048         ast_channel_lock(chan);
1049         snapshot = ast_channel_snapshot_create(chan);
1050         ast_channel_unlock(chan);
1051         if (snapshot) {
1052                 ret = send_start_msg_snapshots(chan, app, argc, argv, snapshot, replace_channel_snapshot);
1053                 ao2_ref(snapshot, -1);
1054         }
1055         ao2_cleanup(replace_channel_snapshot);
1056
1057         return ret;
1058 }
1059
1060 static void remove_masquerade_store(struct ast_channel *chan);
1061
1062 int app_send_end_msg(struct stasis_app *app, struct ast_channel *chan)
1063 {
1064         struct stasis_message_sanitizer *sanitize = stasis_app_get_sanitizer();
1065         struct ast_json *blob;
1066         struct stasis_message *msg;
1067
1068         if (sanitize && sanitize->channel
1069                 && sanitize->channel(chan)) {
1070                 return 0;
1071         }
1072
1073         blob = ast_json_pack("{s: s, s: o}",
1074                 "app", stasis_app_name(app),
1075                 "timestamp", ast_json_timeval(ast_tvnow(), NULL)
1076                 );
1077         if (!blob) {
1078                 ast_log(LOG_ERROR, "Error packing JSON for StasisEnd message\n");
1079                 return -1;
1080         }
1081
1082         remove_masquerade_store(chan);
1083         app_unsubscribe_channel(app, chan);
1084         msg = ast_channel_blob_create(chan, end_message_type(), blob);
1085         if (msg) {
1086                 stasis_publish(ast_app_get_topic(app), msg);
1087         }
1088         ao2_cleanup(msg);
1089         ast_json_unref(blob);
1090
1091         return 0;
1092 }
1093
1094 static int masq_match_cb(void *obj, void *data, int flags)
1095 {
1096         struct stasis_app_control *control = obj;
1097         struct ast_channel *chan = data;
1098
1099         if (!strcmp(ast_channel_uniqueid(chan),
1100                 stasis_app_control_get_channel_id(control))) {
1101                 return CMP_MATCH;
1102         }
1103
1104         return 0;
1105 }
1106
1107 static void channel_stolen_cb(void *data, struct ast_channel *old_chan, struct ast_channel *new_chan)
1108 {
1109         struct stasis_app_control *control;
1110
1111         /*
1112          * At this point, old_chan is the channel pointer that is in Stasis() and
1113          * has the unknown channel's name in it while new_chan is the channel pointer
1114          * that is not in Stasis(), but has the guts of the channel that Stasis() knows
1115          * about.
1116          *
1117          * Find and unlink control since the channel has a new name/uniqueid
1118          * and its hash has changed.  Since the channel is leaving stasis don't
1119          * bother putting it back into the container.  Nobody is going to
1120          * remove it from the container later.
1121          */
1122         control = ao2_callback(app_controls, OBJ_UNLINK, masq_match_cb, old_chan);
1123         if (!control) {
1124                 ast_log(LOG_ERROR, "Could not find control for masqueraded channel\n");
1125                 return;
1126         }
1127
1128         /* send the StasisEnd message to the app */
1129         stasis_app_channel_set_stasis_end_published(new_chan);
1130         app_send_end_msg(control_app(control), new_chan);
1131
1132         /* remove the datastore */
1133         remove_masquerade_store(old_chan);
1134
1135         ao2_cleanup(control);
1136 }
1137
1138 static void channel_replaced_cb(void *data, struct ast_channel *old_chan, struct ast_channel *new_chan)
1139 {
1140         RAII_VAR(struct ast_channel_snapshot *, new_snapshot, NULL, ao2_cleanup);
1141         RAII_VAR(struct ast_channel_snapshot *, old_snapshot, NULL, ao2_cleanup);
1142         struct stasis_app_control *control;
1143
1144         /* At this point, new_chan is the channel pointer that is in Stasis() and
1145          * has the unknown channel's name in it while old_chan is the channel pointer
1146          * that is not in Stasis(), but has the guts of the channel that Stasis() knows
1147          * about */
1148
1149         /* grab a snapshot for the channel that is jumping into Stasis() */
1150         new_snapshot = ast_channel_snapshot_get_latest(ast_channel_uniqueid(new_chan));
1151         if (!new_snapshot) {
1152                 ast_log(LOG_ERROR, "Could not get snapshot for masquerading channel\n");
1153                 return;
1154         }
1155
1156         /* grab a snapshot for the channel that has been kicked out of Stasis() */
1157         old_snapshot = ast_channel_snapshot_get_latest(ast_channel_uniqueid(old_chan));
1158         if (!old_snapshot) {
1159                 ast_log(LOG_ERROR, "Could not get snapshot for masqueraded channel\n");
1160                 return;
1161         }
1162
1163         /*
1164          * Find, unlink, and relink control since the channel has a new
1165          * name/uniqueid and its hash has changed.
1166          */
1167         control = ao2_callback(app_controls, OBJ_UNLINK, masq_match_cb, new_chan);
1168         if (!control) {
1169                 ast_log(LOG_ERROR, "Could not find control for masquerading channel\n");
1170                 return;
1171         }
1172         ao2_link(app_controls, control);
1173
1174
1175         /* send the StasisStart with replace_channel to the app */
1176         send_start_msg_snapshots(new_chan, control_app(control), 0, NULL, new_snapshot,
1177                 old_snapshot);
1178         /* send the StasisEnd message to the app */
1179         app_send_end_msg(control_app(control), old_chan);
1180
1181         ao2_cleanup(control);
1182 }
1183
1184 static const struct ast_datastore_info masquerade_store_info = {
1185         .type = "stasis-masqerade",
1186         .chan_fixup = channel_stolen_cb,
1187         .chan_breakdown = channel_replaced_cb,
1188 };
1189
1190 static int has_masquerade_store(struct ast_channel *chan)
1191 {
1192         SCOPED_CHANNELLOCK(lock, chan);
1193         return !!ast_channel_datastore_find(chan, &masquerade_store_info, NULL);
1194 }
1195
1196 static int add_masquerade_store(struct ast_channel *chan)
1197 {
1198         struct ast_datastore *datastore;
1199
1200         SCOPED_CHANNELLOCK(lock, chan);
1201         if (ast_channel_datastore_find(chan, &masquerade_store_info, NULL)) {
1202                 return 0;
1203         }
1204
1205         datastore = ast_datastore_alloc(&masquerade_store_info, NULL);
1206         if (!datastore) {
1207                 return -1;
1208         }
1209
1210         ast_channel_datastore_add(chan, datastore);
1211
1212         return 0;
1213 }
1214
1215 static void remove_masquerade_store(struct ast_channel *chan)
1216 {
1217         struct ast_datastore *datastore;
1218
1219         SCOPED_CHANNELLOCK(lock, chan);
1220         datastore = ast_channel_datastore_find(chan, &masquerade_store_info, NULL);
1221         if (!datastore) {
1222                 return;
1223         }
1224
1225         ast_channel_datastore_remove(chan, datastore);
1226         ast_datastore_free(datastore);
1227 }
1228
1229 void stasis_app_control_execute_until_exhausted(struct ast_channel *chan, struct stasis_app_control *control)
1230 {
1231         while (!control_is_done(control)) {
1232                 int command_count;
1233                 command_count = control_dispatch_all(control, chan);
1234
1235                 ao2_lock(control);
1236
1237                 if (control_command_count(control)) {
1238                         /* If the command queue isn't empty, something added to the queue before it was locked. */
1239                         ao2_unlock(control);
1240                         continue;
1241                 }
1242
1243                 if (command_count == 0 || ast_channel_fdno(chan) == -1) {
1244                         control_mark_done(control);
1245                         ao2_unlock(control);
1246                         break;
1247                 }
1248                 ao2_unlock(control);
1249         }
1250 }
1251
1252 int stasis_app_control_is_done(struct stasis_app_control *control)
1253 {
1254         return control_is_done(control);
1255 }
1256
1257 void stasis_app_control_flush_queue(struct stasis_app_control *control)
1258 {
1259         control_flush_queue(control);
1260 }
1261
1262 struct ast_datastore_info set_end_published_info = {
1263         .type = "stasis_end_published",
1264 };
1265
1266 void stasis_app_channel_set_stasis_end_published(struct ast_channel *chan)
1267 {
1268         struct ast_datastore *datastore;
1269
1270         datastore = ast_datastore_alloc(&set_end_published_info, NULL);
1271         if (datastore) {
1272                 ast_channel_lock(chan);
1273                 ast_channel_datastore_add(chan, datastore);
1274                 ast_channel_unlock(chan);
1275         }
1276 }
1277
1278 int stasis_app_channel_is_stasis_end_published(struct ast_channel *chan)
1279 {
1280         struct ast_datastore *datastore;
1281
1282         ast_channel_lock(chan);
1283         datastore = ast_channel_datastore_find(chan, &set_end_published_info, NULL);
1284         ast_channel_unlock(chan);
1285
1286         return datastore ? 1 : 0;
1287 }
1288
1289 static void remove_stasis_end_published(struct ast_channel *chan)
1290 {
1291         struct ast_datastore *datastore;
1292
1293         ast_channel_lock(chan);
1294         datastore = ast_channel_datastore_find(chan, &set_end_published_info, NULL);
1295         if (datastore) {
1296                 ast_channel_datastore_remove(chan, datastore);
1297                 ast_datastore_free(datastore);
1298         }
1299         ast_channel_unlock(chan);
1300 }
1301
1302 /*! /brief Stasis dialplan application callback */
1303 int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc,
1304                     char *argv[])
1305 {
1306         RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
1307         RAII_VAR(struct stasis_app_control *, control, NULL, control_unlink);
1308         struct ast_bridge *bridge = NULL;
1309         int res = 0;
1310         int needs_depart;
1311
1312         ast_assert(chan != NULL);
1313
1314         /* Just in case there's a lingering indication that the channel has had a stasis
1315          * end published on it, remove that now.
1316          */
1317         remove_stasis_end_published(chan);
1318
1319         if (!apps_registry) {
1320                 return -1;
1321         }
1322
1323         app = ao2_find(apps_registry, app_name, OBJ_SEARCH_KEY);
1324         if (!app) {
1325                 ast_log(LOG_ERROR,
1326                         "Stasis app '%s' not registered\n", app_name);
1327                 return -1;
1328         }
1329         if (!app_is_active(app)) {
1330                 ast_log(LOG_ERROR,
1331                         "Stasis app '%s' not active\n", app_name);
1332                 return -1;
1333         }
1334
1335         control = control_create(chan, app);
1336         if (!control) {
1337                 ast_log(LOG_ERROR, "Control allocation failed or Stasis app '%s' not registered\n", app_name);
1338                 return -1;
1339         }
1340
1341         if (!control_app(control)) {
1342                 ast_log(LOG_ERROR, "Stasis app '%s' not registered\n", app_name);
1343                 return -1;
1344         }
1345
1346         if (!app_is_active(control_app(control))) {
1347                 ast_log(LOG_ERROR, "Stasis app '%s' not active\n", app_name);
1348                 return -1;
1349         }
1350         ao2_link(app_controls, control);
1351
1352         if (add_masquerade_store(chan)) {
1353                 ast_log(LOG_ERROR, "Failed to attach masquerade detector\n");
1354                 return -1;
1355         }
1356
1357         res = send_start_msg(control_app(control), chan, argc, argv);
1358         if (res != 0) {
1359                 ast_log(LOG_ERROR,
1360                         "Error sending start message to '%s'\n", app_name);
1361                 remove_masquerade_store(chan);
1362                 return -1;
1363         }
1364
1365         /* Pull queued prestart commands and execute */
1366         control_prestart_dispatch_all(control, chan);
1367
1368         while (!control_is_done(control)) {
1369                 RAII_VAR(struct ast_frame *, f, NULL, ast_frame_dtor);
1370                 int r;
1371                 int command_count;
1372                 RAII_VAR(struct ast_bridge *, last_bridge, NULL, ao2_cleanup);
1373
1374                 /* Check to see if a bridge absorbed our hangup frame */
1375                 if (ast_check_hangup_locked(chan)) {
1376                         control_mark_done(control);
1377                         break;
1378                 }
1379
1380                 /* control->next_app is only modified within the control thread, so this is safe */
1381                 if (control_next_app(control)) {
1382                         struct stasis_app *next_app = ao2_find(apps_registry, control_next_app(control), OBJ_SEARCH_KEY);
1383
1384                         if (next_app && app_is_active(next_app)) {
1385                                 int idx;
1386                                 int next_argc;
1387                                 char **next_argv;
1388
1389                                 /* If something goes wrong in this conditional, res will need to be non-zero
1390                                  * so that the code below the exec loop knows something went wrong during a move.
1391                                  */
1392                                 if (!stasis_app_channel_is_stasis_end_published(chan)) {
1393                                         res = has_masquerade_store(chan) && app_send_end_msg(control_app(control), chan);
1394                                         if (res != 0) {
1395                                                 ast_log(LOG_ERROR,
1396                                                         "Error sending end message to %s\n", stasis_app_name(control_app(control)));
1397                                                 control_mark_done(control);
1398                                                 ao2_ref(next_app, -1);
1399                                                 break;
1400                                         }
1401                                 } else {
1402                                         remove_stasis_end_published(chan);
1403                                 }
1404
1405                                 /* This will ao2_bump next_app, and unref the previous app by 1 */
1406                                 control_set_app(control, next_app);
1407
1408                                 /* There's a chance that the previous application is ready for clean up, so go ahead
1409                                  * and do that now.
1410                                  */
1411                                 cleanup();
1412
1413                                 /* We need to add another masquerade store, otherwise the leave message will
1414                                  * not show up for the correct application.
1415                                  */
1416                                 if (add_masquerade_store(chan)) {
1417                                         ast_log(LOG_ERROR, "Failed to attach masquerade detector\n");
1418                                         res = -1;
1419                                         control_mark_done(control);
1420                                         ao2_ref(next_app, -1);
1421                                         break;
1422                                 }
1423
1424                                 /* We MUST get the size before the list, as control_next_app_args steals the elements
1425                                  * from the string vector.
1426                                  */
1427                                 next_argc = control_next_app_args_size(control);
1428                                 next_argv = control_next_app_args(control);
1429
1430                                 res = send_start_msg(control_app(control), chan, next_argc, next_argv);
1431
1432                                 /* Even if res != 0, we still need to free the memory we got from control_argv */
1433                                 if (next_argv) {
1434                                         for (idx = 0; idx < next_argc; idx++) {
1435                                                 ast_free(next_argv[idx]);
1436                                         }
1437                                         ast_free(next_argv);
1438                                 }
1439
1440                                 if (res != 0) {
1441                                         ast_log(LOG_ERROR,
1442                                                 "Error sending start message to '%s'\n", stasis_app_name(control_app(control)));
1443                                         remove_masquerade_store(chan);
1444                                         control_mark_done(control);
1445                                         ao2_ref(next_app, -1);
1446                                         break;
1447                                 }
1448
1449                                 /* Done switching applications, free memory and clean up */
1450                                 control_move_cleanup(control);
1451                         } else {
1452                                 /* If we can't switch applications, do nothing */
1453                                 struct ast_json *msg;
1454                                 RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
1455
1456                                 if (!next_app) {
1457                                         ast_log(LOG_ERROR, "Could not move to Stasis app '%s' - not registered\n",
1458                                                 control_next_app(control));
1459                                 } else {
1460                                         ast_log(LOG_ERROR, "Could not move to Stasis app '%s' - not active\n",
1461                                                 control_next_app(control));
1462                                 }
1463
1464                                 snapshot = ast_channel_snapshot_get_latest(ast_channel_uniqueid(chan));
1465                                 if (!snapshot) {
1466                                         ast_log(LOG_ERROR, "Could not get channel shapshot for '%s'\n",
1467                                                 ast_channel_name(chan));
1468                                 } else {
1469                                         struct ast_json *json_args;
1470                                         int next_argc = control_next_app_args_size(control);
1471                                         char **next_argv = control_next_app_args(control);
1472
1473                                         msg = ast_json_pack("{s: s, s: o, s: o, s: s, s: []}",
1474                                                 "type", "ApplicationMoveFailed",
1475                                                 "timestamp", ast_json_timeval(ast_tvnow(), NULL),
1476                                                 "channel", ast_channel_snapshot_to_json(snapshot, NULL),
1477                                                 "destination", control_next_app(control),
1478                                                 "args");
1479                                         if (!msg) {
1480                                                 ast_log(LOG_ERROR, "Failed to pack JSON for ApplicationMoveFailed message\n");
1481                                         } else {
1482                                                 json_args = ast_json_object_get(msg, "args");
1483                                                 if (!json_args) {
1484                                                         ast_log(LOG_ERROR, "Could not get args json array");
1485                                                 } else {
1486                                                         int r = 0;
1487                                                         int idx;
1488                                                         for (idx = 0; idx < next_argc; ++idx) {
1489                                                                 r = ast_json_array_append(json_args,
1490                                                                         ast_json_string_create(next_argv[idx]));
1491                                                                 if (r != 0) {
1492                                                                         ast_log(LOG_ERROR, "Error appending to ApplicationMoveFailed message\n");
1493                                                                         break;
1494                                                                 }
1495                                                         }
1496                                                         if (r == 0) {
1497                                                                 app_send(control_app(control), msg);
1498                                                         }
1499                                                 }
1500                                                 ast_json_unref(msg);
1501                                         }
1502                                 }
1503                         }
1504                         control_move_cleanup(control);
1505                         ao2_cleanup(next_app);
1506                 }
1507
1508                 last_bridge = bridge;
1509                 bridge = ao2_bump(stasis_app_get_bridge(control));
1510
1511                 if (bridge != last_bridge) {
1512                         if (last_bridge) {
1513                                 app_unsubscribe_bridge(control_app(control), last_bridge);
1514                         }
1515                         if (bridge) {
1516                                 app_subscribe_bridge(control_app(control), bridge);
1517                         }
1518                 }
1519
1520                 if (bridge) {
1521                         /* Bridge/dial is handling channel frames */
1522                         control_wait(control);
1523                         control_dispatch_all(control, chan);
1524                         continue;
1525                 }
1526
1527                 r = ast_waitfor(chan, MAX_WAIT_MS);
1528
1529                 if (r < 0) {
1530                         ast_debug(3, "%s: Poll error\n",
1531                                   ast_channel_uniqueid(chan));
1532                         control_mark_done(control);
1533                         break;
1534                 }
1535
1536                 command_count = control_dispatch_all(control, chan);
1537
1538                 if (command_count > 0 && ast_channel_fdno(chan) == -1) {
1539                         /* Command drained the channel; wait for next frame */
1540                         continue;
1541                 }
1542
1543                 if (r == 0) {
1544                         /* Timeout */
1545                         continue;
1546                 }
1547
1548                 f = ast_read(chan);
1549                 if (!f) {
1550                         /* Continue on in the dialplan */
1551                         ast_debug(3, "%s: Hangup (no more frames)\n",
1552                                 ast_channel_uniqueid(chan));
1553                         control_mark_done(control);
1554                         break;
1555                 }
1556
1557                 if (f->frametype == AST_FRAME_CONTROL) {
1558                         if (f->subclass.integer == AST_CONTROL_HANGUP) {
1559                                 /* Continue on in the dialplan */
1560                                 ast_debug(3, "%s: Hangup\n",
1561                                         ast_channel_uniqueid(chan));
1562                                 control_mark_done(control);
1563                                 break;
1564                         }
1565                 }
1566         }
1567
1568         ast_channel_lock(chan);
1569         needs_depart = (ast_channel_internal_bridge_channel(chan) != NULL);
1570         ast_channel_unlock(chan);
1571         if (needs_depart) {
1572                 ast_bridge_depart(chan);
1573         }
1574
1575         if (stasis_app_get_bridge(control)) {
1576                 app_unsubscribe_bridge(control_app(control), stasis_app_get_bridge(control));
1577         }
1578         ao2_cleanup(bridge);
1579
1580         /* Only publish a stasis_end event if it hasn't already been published */
1581         if (!res && !stasis_app_channel_is_stasis_end_published(chan)) {
1582                 /* A masquerade has occurred and this message will be wrong so it
1583                  * has already been sent elsewhere. */
1584                 res = has_masquerade_store(chan) && app_send_end_msg(control_app(control), chan);
1585                 if (res != 0) {
1586                         ast_log(LOG_ERROR,
1587                                 "Error sending end message to %s\n", stasis_app_name(control_app(control)));
1588                         return res;
1589                 }
1590         } else {
1591                 remove_stasis_end_published(chan);
1592         }
1593
1594         control_flush_queue(control);
1595
1596         /* Stop any lingering silence generator */
1597         control_silence_stop_now(control);
1598
1599         /* There's an off chance that app is ready for cleanup. Go ahead
1600          * and clean up, just in case
1601          */
1602         cleanup();
1603
1604         /* The control needs to be removed from the controls container in
1605          * case a new PBX is started and ends up coming back into Stasis.
1606          */
1607         control_unlink(control);
1608         control = NULL;
1609
1610         if (!res && !ast_channel_pbx(chan)) {
1611                 int chan_hungup;
1612
1613                 /* The ASYNCGOTO softhangup flag may have broken the channel out of
1614                  * its bridge to run dialplan, so if there's no pbx on the channel
1615                  * let it run dialplan here. Otherwise, it will run when this
1616                  * application exits. */
1617                 ast_channel_lock(chan);
1618                 ast_channel_clear_softhangup(chan, AST_SOFTHANGUP_ASYNCGOTO);
1619                 chan_hungup = ast_check_hangup(chan);
1620                 ast_channel_unlock(chan);
1621
1622                 if (!chan_hungup) {
1623                         struct ast_pbx_args pbx_args;
1624
1625                         memset(&pbx_args, 0, sizeof(pbx_args));
1626                         pbx_args.no_hangup_chan = 1;
1627
1628                         res = ast_pbx_run_args(chan, &pbx_args);
1629                 }
1630         }
1631
1632         return res;
1633 }
1634
1635 int stasis_app_send(const char *app_name, struct ast_json *message)
1636 {
1637         struct stasis_app *app;
1638
1639         if (!apps_registry) {
1640                 return -1;
1641         }
1642
1643         app = ao2_find(apps_registry, app_name, OBJ_SEARCH_KEY);
1644         if (!app) {
1645                 /* XXX We can do a better job handling late binding, queueing up
1646                  * the call for a few seconds to wait for the app to register.
1647                  */
1648                 ast_log(LOG_WARNING,
1649                         "Stasis app '%s' not registered\n", app_name);
1650                 return -1;
1651         }
1652         app_send(app, message);
1653         ao2_ref(app, -1);
1654
1655         return 0;
1656 }
1657
1658 static struct stasis_app *find_app_by_name(const char *app_name)
1659 {
1660         struct stasis_app *res = NULL;
1661
1662         if (!apps_registry) {
1663                 return NULL;
1664         }
1665
1666         if (!ast_strlen_zero(app_name)) {
1667                 res = ao2_find(apps_registry, app_name, OBJ_SEARCH_KEY);
1668         }
1669
1670         return res;
1671 }
1672
1673 struct stasis_app *stasis_app_get_by_name(const char *name)
1674 {
1675         return find_app_by_name(name);
1676 }
1677
1678 static int append_name(void *obj, void *arg, int flags)
1679 {
1680         struct stasis_app *app = obj;
1681         struct ao2_container *apps = arg;
1682
1683         ast_str_container_add(apps, stasis_app_name(app));
1684         return 0;
1685 }
1686
1687 struct ao2_container *stasis_app_get_all(void)
1688 {
1689         struct ao2_container *apps;
1690
1691         if (!apps_registry) {
1692                 return NULL;
1693         }
1694
1695         apps = ast_str_container_alloc(1);
1696         if (!apps) {
1697                 return NULL;
1698         }
1699
1700         ao2_callback(apps_registry, OBJ_NODATA, append_name, apps);
1701
1702         return apps;
1703 }
1704
1705 static int __stasis_app_register(const char *app_name, stasis_app_cb handler, void *data, int all_events)
1706 {
1707         struct stasis_app *app;
1708
1709         if (!apps_registry) {
1710                 return -1;
1711         }
1712
1713         ao2_lock(apps_registry);
1714         app = ao2_find(apps_registry, app_name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1715         if (app) {
1716                 app_update(app, handler, data);
1717         } else {
1718                 app = app_create(app_name, handler, data, all_events ? STASIS_APP_SUBSCRIBE_ALL : STASIS_APP_SUBSCRIBE_MANUAL);
1719                 if (!app) {
1720                         ao2_unlock(apps_registry);
1721                         return -1;
1722                 }
1723
1724                 if (all_events) {
1725                         struct stasis_app_event_source *source;
1726
1727                         AST_RWLIST_RDLOCK(&event_sources);
1728                         AST_LIST_TRAVERSE(&event_sources, source, next) {
1729                                 if (!source->subscribe) {
1730                                         continue;
1731                                 }
1732
1733                                 source->subscribe(app, NULL);
1734                         }
1735                         AST_RWLIST_UNLOCK(&event_sources);
1736                 }
1737                 ao2_link_flags(apps_registry, app, OBJ_NOLOCK);
1738         }
1739
1740         /* We lazily clean up the apps_registry, because it's good enough to
1741          * prevent memory leaks, and we're lazy.
1742          */
1743         cleanup();
1744         ao2_unlock(apps_registry);
1745         ao2_ref(app, -1);
1746         return 0;
1747 }
1748
1749 int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data)
1750 {
1751         return __stasis_app_register(app_name, handler, data, 0);
1752 }
1753
1754 int stasis_app_register_all(const char *app_name, stasis_app_cb handler, void *data)
1755 {
1756         return __stasis_app_register(app_name, handler, data, 1);
1757 }
1758
1759 void stasis_app_unregister(const char *app_name)
1760 {
1761         struct stasis_app *app;
1762
1763         if (!app_name) {
1764                 return;
1765         }
1766
1767         if (!apps_registry) {
1768                 return;
1769         }
1770
1771         app = ao2_find(apps_registry, app_name, OBJ_SEARCH_KEY);
1772         if (!app) {
1773                 ast_log(LOG_ERROR,
1774                         "Stasis app '%s' not registered\n", app_name);
1775                 return;
1776         }
1777
1778         app_deactivate(app);
1779
1780         /* There's a decent chance that app is ready for cleanup. Go ahead
1781          * and clean up, just in case
1782          */
1783         cleanup();
1784
1785         ao2_ref(app, -1);
1786 }
1787
1788 void stasis_app_register_event_source(struct stasis_app_event_source *obj)
1789 {
1790         AST_RWLIST_WRLOCK(&event_sources);
1791         AST_LIST_INSERT_TAIL(&event_sources, obj, next);
1792         AST_RWLIST_UNLOCK(&event_sources);
1793 }
1794
1795 void stasis_app_unregister_event_source(struct stasis_app_event_source *obj)
1796 {
1797         struct stasis_app_event_source *source;
1798
1799         AST_RWLIST_WRLOCK(&event_sources);
1800         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&event_sources, source, next) {
1801                 if (source == obj) {
1802                         AST_RWLIST_REMOVE_CURRENT(next);
1803                         break;
1804                 }
1805         }
1806         AST_RWLIST_TRAVERSE_SAFE_END;
1807         AST_RWLIST_UNLOCK(&event_sources);
1808 }
1809
1810 /*!
1811  * \internal
1812  * \brief Convert event source data to JSON.
1813  *
1814  * Calls each event source that has a "to_json" handler allowing each
1815  * source to add data to the given JSON object.
1816  *
1817  * \param app application associated with the event source
1818  * \param json a json object to "fill"
1819  *
1820  * \retval The given json object.
1821  */
1822 static struct ast_json *app_event_sources_to_json(
1823         const struct stasis_app *app, struct ast_json *json)
1824 {
1825         struct stasis_app_event_source *source;
1826
1827         AST_RWLIST_RDLOCK(&event_sources);
1828         AST_LIST_TRAVERSE(&event_sources, source, next) {
1829                 if (source->to_json) {
1830                         source->to_json(app, json);
1831                 }
1832         }
1833         AST_RWLIST_UNLOCK(&event_sources);
1834
1835         return json;
1836 }
1837
1838 struct ast_json *stasis_app_object_to_json(struct stasis_app *app)
1839 {
1840         if (!app) {
1841                 return NULL;
1842         }
1843
1844         return stasis_app_event_filter_to_json(
1845                 app, app_event_sources_to_json(app, app_to_json(app)));
1846 }
1847
1848 struct ast_json *stasis_app_to_json(const char *app_name)
1849 {
1850         struct stasis_app *app = find_app_by_name(app_name);
1851         struct ast_json *json = stasis_app_object_to_json(app);
1852
1853         ao2_cleanup(app);
1854
1855         return json;
1856 }
1857
1858 /*!
1859  * \internal
1860  * \brief Finds an event source that matches a uri scheme.
1861  *
1862  * Uri(s) should begin with a particular scheme that can be matched
1863  * against an event source.
1864  *
1865  * \param uri uri containing a scheme to match
1866  *
1867  * \retval an event source if found, NULL otherwise.
1868  */
1869 static struct stasis_app_event_source *app_event_source_find(const char *uri)
1870 {
1871         struct stasis_app_event_source *source;
1872
1873         AST_RWLIST_RDLOCK(&event_sources);
1874         AST_LIST_TRAVERSE(&event_sources, source, next) {
1875                 if (ast_begins_with(uri, source->scheme)) {
1876                         break;
1877                 }
1878         }
1879         AST_RWLIST_UNLOCK(&event_sources);
1880
1881         return source;
1882 }
1883
1884 /*!
1885  * \internal
1886  * \brief Callback for subscription handling
1887  *
1888  * \param app [un]subscribing application
1889  * \param uri scheme:id of an event source
1890  * \param event_source being [un]subscribed [from]to
1891  *
1892  * \retval stasis_app_subscribe_res return code.
1893  */
1894 typedef enum stasis_app_subscribe_res (*app_subscription_handler)(
1895         struct stasis_app *app, const char *uri,
1896         struct stasis_app_event_source *event_source);
1897
1898 /*!
1899  * \internal
1900  * \brief Subscriptions handler for application [un]subscribing.
1901  *
1902  * \param app_name Name of the application to subscribe.
1903  * \param event_source_uris URIs for the event sources to subscribe to.
1904  * \param event_sources_count Array size of event_source_uris.
1905  * \param json Optional output pointer for JSON representation of the app
1906  *             after adding the subscription.
1907  * \param handler [un]subscribe handler
1908  *
1909  * \retval stasis_app_subscribe_res return code.
1910  */
1911 static enum stasis_app_subscribe_res app_handle_subscriptions(
1912         const char *app_name, const char **event_source_uris,
1913         int event_sources_count, struct ast_json **json,
1914         app_subscription_handler handler)
1915 {
1916         struct stasis_app *app = find_app_by_name(app_name);
1917         int i;
1918
1919         ast_assert(handler != NULL);
1920
1921         if (!app) {
1922                 return STASIS_ASR_APP_NOT_FOUND;
1923         }
1924
1925         for (i = 0; i < event_sources_count; ++i) {
1926                 const char *uri = event_source_uris[i];
1927                 struct stasis_app_event_source *event_source;
1928                 enum stasis_app_subscribe_res res;
1929
1930                 event_source = app_event_source_find(uri);
1931                 if (!event_source) {
1932                         ast_log(LOG_WARNING, "Invalid scheme: %s\n", uri);
1933                         ao2_ref(app, -1);
1934
1935                         return STASIS_ASR_EVENT_SOURCE_BAD_SCHEME;
1936                 }
1937
1938                 res = handler(app, uri, event_source);
1939                 if (res != STASIS_ASR_OK) {
1940                         ao2_ref(app, -1);
1941
1942                         return res;
1943                 }
1944         }
1945
1946         if (json) {
1947                 ast_debug(3, "%s: Successful; setting results\n", app_name);
1948                 *json = stasis_app_object_to_json(app);
1949         }
1950
1951         ao2_ref(app, -1);
1952
1953         return STASIS_ASR_OK;
1954 }
1955
1956 enum stasis_app_subscribe_res stasis_app_subscribe_channel(const char *app_name,
1957         struct ast_channel *chan)
1958 {
1959         struct stasis_app *app = find_app_by_name(app_name);
1960         int res;
1961
1962         if (!app) {
1963                 return STASIS_ASR_APP_NOT_FOUND;
1964         }
1965
1966         ast_debug(3, "%s: Subscribing to %s\n", app_name, ast_channel_uniqueid(chan));
1967
1968         res = app_subscribe_channel(app, chan);
1969         ao2_ref(app, -1);
1970
1971         if (res != 0) {
1972                 ast_log(LOG_ERROR, "Error subscribing app '%s' to channel '%s'\n",
1973                         app_name, ast_channel_uniqueid(chan));
1974                 return STASIS_ASR_INTERNAL_ERROR;
1975         }
1976
1977         return STASIS_ASR_OK;
1978 }
1979
1980
1981 /*!
1982  * \internal
1983  * \brief Subscribe an app to an event source.
1984  *
1985  * \param app subscribing application
1986  * \param uri scheme:id of an event source
1987  * \param event_source being subscribed to
1988  *
1989  * \retval stasis_app_subscribe_res return code.
1990  */
1991 static enum stasis_app_subscribe_res app_subscribe(
1992         struct stasis_app *app, const char *uri,
1993         struct stasis_app_event_source *event_source)
1994 {
1995         const char *app_name = stasis_app_name(app);
1996         RAII_VAR(void *, obj, NULL, ao2_cleanup);
1997
1998         ast_debug(3, "%s: Checking %s\n", app_name, uri);
1999
2000         if (!ast_strlen_zero(uri + strlen(event_source->scheme)) &&
2001             (!event_source->find || (!(obj = event_source->find(app, uri + strlen(event_source->scheme)))))) {
2002                 ast_log(LOG_WARNING, "Event source not found: %s\n", uri);
2003                 return STASIS_ASR_EVENT_SOURCE_NOT_FOUND;
2004         }
2005
2006         ast_debug(3, "%s: Subscribing to %s\n", app_name, uri);
2007
2008         if (!event_source->subscribe || (event_source->subscribe(app, obj))) {
2009                 ast_log(LOG_WARNING, "Error subscribing app '%s' to '%s'\n",
2010                         app_name, uri);
2011                 return STASIS_ASR_INTERNAL_ERROR;
2012         }
2013
2014         return STASIS_ASR_OK;
2015 }
2016
2017 enum stasis_app_subscribe_res stasis_app_subscribe(const char *app_name,
2018         const char **event_source_uris, int event_sources_count,
2019         struct ast_json **json)
2020 {
2021         return app_handle_subscriptions(
2022                 app_name, event_source_uris, event_sources_count,
2023                 json, app_subscribe);
2024 }
2025
2026 /*!
2027  * \internal
2028  * \brief Unsubscribe an app from an event source.
2029  *
2030  * \param app application to unsubscribe
2031  * \param uri scheme:id of an event source
2032  * \param event_source being unsubscribed from
2033  *
2034  * \retval stasis_app_subscribe_res return code.
2035  */
2036 static enum stasis_app_subscribe_res app_unsubscribe(
2037         struct stasis_app *app, const char *uri,
2038         struct stasis_app_event_source *event_source)
2039 {
2040         const char *app_name = stasis_app_name(app);
2041         const char *id = uri + strlen(event_source->scheme);
2042
2043         if (!event_source->is_subscribed ||
2044             (!event_source->is_subscribed(app, id))) {
2045                 return STASIS_ASR_EVENT_SOURCE_NOT_FOUND;
2046         }
2047
2048         ast_debug(3, "%s: Unsubscribing from %s\n", app_name, uri);
2049
2050         if (!event_source->unsubscribe || (event_source->unsubscribe(app, id))) {
2051                 ast_log(LOG_WARNING, "Error unsubscribing app '%s' to '%s'\n",
2052                         app_name, uri);
2053                 return -1;
2054         }
2055         return 0;
2056 }
2057
2058 enum stasis_app_subscribe_res stasis_app_unsubscribe(const char *app_name,
2059         const char **event_source_uris, int event_sources_count,
2060         struct ast_json **json)
2061 {
2062         return app_handle_subscriptions(
2063                 app_name, event_source_uris, event_sources_count,
2064                 json, app_unsubscribe);
2065 }
2066
2067 enum stasis_app_user_event_res stasis_app_user_event(const char *app_name,
2068         const char *event_name,
2069         const char **source_uris, int sources_count,
2070         struct ast_json *json_variables)
2071 {
2072         RAII_VAR(struct stasis_app *, app, find_app_by_name(app_name), ao2_cleanup);
2073         struct ast_json *blob = NULL;
2074         struct ast_multi_object_blob *multi;
2075         struct stasis_message *message;
2076         enum stasis_app_user_event_res res = STASIS_APP_USER_INTERNAL_ERROR;
2077         int have_channel = 0;
2078         int i;
2079
2080         if (!app) {
2081                 ast_log(LOG_WARNING, "App %s not found\n", app_name);
2082                 return STASIS_APP_USER_APP_NOT_FOUND;
2083         }
2084
2085         if (!ast_multi_user_event_type()) {
2086                 return res;
2087         }
2088
2089         if (json_variables) {
2090                 struct ast_json *json_value = ast_json_string_create(event_name);
2091
2092                 if (json_value && !ast_json_object_set(json_variables, "eventname", json_value)) {
2093                         blob = ast_json_ref(json_variables);
2094                 }
2095         } else {
2096                 blob = ast_json_pack("{s: s}", "eventname", event_name);
2097         }
2098
2099         if (!blob) {
2100                 ast_log(LOG_ERROR, "Failed to initialize blob\n");
2101
2102                 return res;
2103         }
2104
2105         multi = ast_multi_object_blob_create(blob);
2106         ast_json_unref(blob);
2107         if (!multi) {
2108                 ast_log(LOG_ERROR, "Failed to initialize multi\n");
2109
2110                 return res;
2111         }
2112
2113         for (i = 0; i < sources_count; ++i) {
2114                 const char *uri = source_uris[i];
2115                 void *snapshot=NULL;
2116                 enum stasis_user_multi_object_snapshot_type type;
2117
2118                 if (ast_begins_with(uri, "channel:")) {
2119                         type = STASIS_UMOS_CHANNEL;
2120                         snapshot = ast_channel_snapshot_get_latest(uri + 8);
2121                         have_channel = 1;
2122                 } else if (ast_begins_with(uri, "bridge:")) {
2123                         type = STASIS_UMOS_BRIDGE;
2124                         snapshot = ast_bridge_get_snapshot_by_uniqueid(uri + 7);
2125                 } else if (ast_begins_with(uri, "endpoint:")) {
2126                         type = STASIS_UMOS_ENDPOINT;
2127                         snapshot = ast_endpoint_latest_snapshot(uri + 9, NULL);
2128                 } else {
2129                         ast_log(LOG_WARNING, "Invalid scheme: %s\n", uri);
2130                         ao2_ref(multi, -1);
2131
2132                         return STASIS_APP_USER_EVENT_SOURCE_BAD_SCHEME;
2133                 }
2134                 if (!snapshot) {
2135                         ast_log(LOG_ERROR, "Unable to get snapshot for %s\n", uri);
2136                         ao2_ref(multi, -1);
2137
2138                         return STASIS_APP_USER_EVENT_SOURCE_NOT_FOUND;
2139                 }
2140                 ast_multi_object_blob_add(multi, type, snapshot);
2141         }
2142
2143         message = stasis_message_create(ast_multi_user_event_type(), multi);
2144         ao2_ref(multi, -1);
2145
2146         if (!message) {
2147                 ast_log(LOG_ERROR, "Unable to create stasis user event message\n");
2148                 return res;
2149         }
2150
2151         /*
2152          * Publishing to two different topics is normally to be avoided -- except
2153          * in this case both are final destinations with no forwards (only listeners).
2154          * The message has to be delivered to the application topic for ARI, but a
2155          * copy is also delivered directly to the manager for AMI if there is a channel.
2156          */
2157         stasis_publish(ast_app_get_topic(app), message);
2158
2159         if (have_channel) {
2160                 stasis_publish(ast_manager_get_topic(), message);
2161         }
2162         ao2_ref(message, -1);
2163
2164         return STASIS_APP_USER_OK;
2165 }
2166
2167 static int unload_module(void)
2168 {
2169         stasis_app_unregister_event_sources();
2170
2171         messaging_cleanup();
2172
2173         cleanup();
2174
2175         stasis_app_control_shutdown();
2176
2177         ao2_cleanup(apps_registry);
2178         apps_registry = NULL;
2179
2180         ao2_cleanup(app_controls);
2181         app_controls = NULL;
2182
2183         ao2_cleanup(app_bridges);
2184         app_bridges = NULL;
2185
2186         ao2_cleanup(app_bridges_moh);
2187         app_bridges_moh = NULL;
2188
2189         ao2_cleanup(app_bridges_playback);
2190         app_bridges_playback = NULL;
2191
2192         STASIS_MESSAGE_TYPE_CLEANUP(end_message_type);
2193         STASIS_MESSAGE_TYPE_CLEANUP(start_message_type);
2194
2195         return 0;
2196 }
2197
2198 /* \brief Sanitization callback for channel snapshots */
2199 static int channel_snapshot_sanitizer(const struct ast_channel_snapshot *snapshot)
2200 {
2201         if (!snapshot || !(snapshot->base->tech_properties & AST_CHAN_TP_INTERNAL)) {
2202                 return 0;
2203         }
2204         return 1;
2205 }
2206
2207 /* \brief Sanitization callback for channels */
2208 static int channel_sanitizer(const struct ast_channel *chan)
2209 {
2210         if (!chan || !(ast_channel_tech(chan)->properties & AST_CHAN_TP_INTERNAL)) {
2211                 return 0;
2212         }
2213         return 1;
2214 }
2215
2216 /* \brief Sanitization callback for channel unique IDs */
2217 static int channel_id_sanitizer(const char *id)
2218 {
2219         struct ast_channel_snapshot *snapshot;
2220         int ret;
2221
2222         snapshot = ast_channel_snapshot_get_latest(id);
2223         ret = channel_snapshot_sanitizer(snapshot);
2224         ao2_cleanup(snapshot);
2225
2226         return ret;
2227 }
2228
2229 /* \brief Sanitization callbacks for communication to Stasis applications */
2230 struct stasis_message_sanitizer app_sanitizer = {
2231         .channel_id = channel_id_sanitizer,
2232         .channel_snapshot = channel_snapshot_sanitizer,
2233         .channel = channel_sanitizer,
2234 };
2235
2236 struct stasis_message_sanitizer *stasis_app_get_sanitizer(void)
2237 {
2238         return &app_sanitizer;
2239 }
2240
2241 static const struct ast_datastore_info stasis_internal_channel_info = {
2242         .type = "stasis-internal-channel",
2243 };
2244
2245 static int set_internal_datastore(struct ast_channel *chan)
2246 {
2247         struct ast_datastore *datastore;
2248
2249         datastore = ast_channel_datastore_find(chan, &stasis_internal_channel_info, NULL);
2250         if (!datastore) {
2251                 datastore = ast_datastore_alloc(&stasis_internal_channel_info, NULL);
2252                 if (!datastore) {
2253                         return -1;
2254                 }
2255                 ast_channel_datastore_add(chan, datastore);
2256         }
2257         return 0;
2258 }
2259
2260 int stasis_app_channel_unreal_set_internal(struct ast_channel *chan)
2261 {
2262         struct ast_channel *outchan = NULL, *outowner = NULL;
2263         int res = 0;
2264         struct ast_unreal_pvt *unreal_pvt = ast_channel_tech_pvt(chan);
2265
2266         ao2_ref(unreal_pvt, +1);
2267         ast_unreal_lock_all(unreal_pvt, &outowner, &outchan);
2268         if (outowner) {
2269                 res |= set_internal_datastore(outowner);
2270                 ast_channel_unlock(outowner);
2271                 ast_channel_unref(outowner);
2272         }
2273         if (outchan) {
2274                 res |= set_internal_datastore(outchan);
2275                 ast_channel_unlock(outchan);
2276                 ast_channel_unref(outchan);
2277         }
2278         ao2_unlock(unreal_pvt);
2279         ao2_ref(unreal_pvt, -1);
2280         return res;
2281 }
2282
2283 int stasis_app_channel_set_internal(struct ast_channel *chan)
2284 {
2285         int res;
2286
2287         ast_channel_lock(chan);
2288         res = set_internal_datastore(chan);
2289         ast_channel_unlock(chan);
2290
2291         return res;
2292 }
2293
2294 int stasis_app_channel_is_internal(struct ast_channel *chan)
2295 {
2296         struct ast_datastore *datastore;
2297         int res = 0;
2298
2299         ast_channel_lock(chan);
2300         datastore = ast_channel_datastore_find(chan, &stasis_internal_channel_info, NULL);
2301         if (datastore) {
2302                 res = 1;
2303         }
2304         ast_channel_unlock(chan);
2305
2306         return res;
2307 }
2308
2309 static int load_module(void)
2310 {
2311         if (STASIS_MESSAGE_TYPE_INIT(start_message_type) != 0) {
2312                 return AST_MODULE_LOAD_DECLINE;
2313         }
2314         if (STASIS_MESSAGE_TYPE_INIT(end_message_type) != 0) {
2315                 return AST_MODULE_LOAD_DECLINE;
2316         }
2317         apps_registry = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
2318                 APPS_NUM_BUCKETS, app_hash, NULL, app_compare);
2319         app_controls = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
2320                 CONTROLS_NUM_BUCKETS, control_hash, NULL, control_compare);
2321         app_bridges = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
2322                 BRIDGES_NUM_BUCKETS, bridges_hash, NULL, bridges_compare);
2323         app_bridges_moh = ao2_container_alloc_hash(
2324                 AO2_ALLOC_OPT_LOCK_MUTEX, 0,
2325                 37, bridges_channel_hash_fn, NULL, NULL);
2326         app_bridges_playback = ao2_container_alloc_hash(
2327                 AO2_ALLOC_OPT_LOCK_MUTEX, AO2_CONTAINER_ALLOC_OPT_DUPS_REJECT,
2328                 37, bridges_channel_hash_fn, bridges_channel_sort_fn, NULL);
2329         if (!apps_registry || !app_controls || !app_bridges || !app_bridges_moh || !app_bridges_playback) {
2330                 unload_module();
2331                 return AST_MODULE_LOAD_DECLINE;
2332         }
2333
2334         if (messaging_init()) {
2335                 unload_module();
2336                 return AST_MODULE_LOAD_DECLINE;
2337         }
2338
2339         bridge_stasis_init();
2340
2341         stasis_app_register_event_sources();
2342
2343         return AST_MODULE_LOAD_SUCCESS;
2344 }
2345
2346 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS | AST_MODFLAG_LOAD_ORDER, "Stasis application support",
2347         .load_pri = AST_MODPRI_APP_DEPEND - 1,
2348         .support_level = AST_MODULE_SUPPORT_CORE,
2349         .load = load_module,
2350         .unload = unload_module,
2351 );