git migration: Refactor the ASTERISK_FILE_VERSION macro
[asterisk/asterisk.git] / res / res_stasis.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2012 - 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis application support.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  *
25  * <code>res_stasis.so</code> brings together the various components of the
26  * Stasis application infrastructure.
27  *
28  * First, there's the Stasis application handler, stasis_app_exec(). This is
29  * called by <code>app_stasis.so</code> to give control of a channel to the
30  * Stasis application code from the dialplan.
31  *
32  * While a channel is in stasis_app_exec(), it has a \ref stasis_app_control
33  * object, which may be used to control the channel.
34  *
35  * To control the channel, commands may be sent to channel using
36  * stasis_app_send_command() and stasis_app_send_async_command().
37  *
38  * Alongside this, applications may be registered/unregistered using
39  * stasis_app_register()/stasis_app_unregister(). While a channel is in Stasis,
40  * events received on the channel's topic are converted to JSON and forwarded to
41  * the \ref stasis_app_cb. The application may also subscribe to the channel to
42  * continue to receive messages even after the channel has left Stasis, but it
43  * will not be able to control it.
44  *
45  * Given all the stuff that comes together in this module, it's been broken up
46  * into several pieces that are in <code>res/stasis/</code> and compiled into
47  * <code>res_stasis.so</code>.
48  */
49
50 /*** MODULEINFO
51         <support_level>core</support_level>
52  ***/
53
54 #include "asterisk.h"
55
56 ASTERISK_REGISTER_FILE()
57
58 #include "asterisk/astobj2.h"
59 #include "asterisk/callerid.h"
60 #include "asterisk/module.h"
61 #include "asterisk/stasis_app_impl.h"
62 #include "asterisk/stasis_channels.h"
63 #include "asterisk/stasis_bridges.h"
64 #include "asterisk/stasis_endpoints.h"
65 #include "asterisk/stasis_message_router.h"
66 #include "asterisk/strings.h"
67 #include "stasis/app.h"
68 #include "stasis/control.h"
69 #include "stasis/messaging.h"
70 #include "stasis/stasis_bridge.h"
71 #include "asterisk/core_unreal.h"
72 #include "asterisk/musiconhold.h"
73 #include "asterisk/causes.h"
74 #include "asterisk/stringfields.h"
75 #include "asterisk/bridge_after.h"
76 #include "asterisk/format_cache.h"
77
78 /*! Time to wait for a frame in the application */
79 #define MAX_WAIT_MS 200
80
81 /*!
82  * \brief Number of buckets for the Stasis application hash table.  Remember to
83  * keep it a prime number!
84  */
85 #define APPS_NUM_BUCKETS 127
86
87 /*!
88  * \brief Number of buckets for the Stasis application hash table.  Remember to
89  * keep it a prime number!
90  */
91 #define CONTROLS_NUM_BUCKETS 127
92
93 /*!
94  * \brief Number of buckets for the Stasis bridges hash table.  Remember to
95  * keep it a prime number!
96  */
97 #define BRIDGES_NUM_BUCKETS 127
98
99 /*!
100  * \brief Stasis application container.
101  */
102 struct ao2_container *apps_registry;
103
104 struct ao2_container *app_controls;
105
106 struct ao2_container *app_bridges;
107
108 struct ao2_container *app_bridges_moh;
109
110 struct ao2_container *app_bridges_playback;
111
112 static struct ast_json *stasis_end_to_json(struct stasis_message *message,
113                 const struct stasis_message_sanitizer *sanitize)
114 {
115         struct ast_channel_blob *payload = stasis_message_data(message);
116
117         if (sanitize && sanitize->channel_snapshot &&
118                         sanitize->channel_snapshot(payload->snapshot)) {
119                 return NULL;
120         }
121
122         return ast_json_pack("{s: s, s: o, s: o}",
123                 "type", "StasisEnd",
124                 "timestamp", ast_json_timeval(ast_tvnow(), NULL),
125                 "channel", ast_channel_snapshot_to_json(payload->snapshot, sanitize));
126 }
127
128 STASIS_MESSAGE_TYPE_DEFN_LOCAL(end_message_type,
129         .to_json = stasis_end_to_json);
130
131 struct start_message_blob {
132         struct ast_channel_snapshot *channel;           /*!< Channel that is entering Stasis() */
133         struct ast_channel_snapshot *replace_channel;   /*!< Channel that is being replaced (optional) */
134         struct ast_json *blob;                          /*!< JSON blob containing timestamp and args */
135 };
136
137 static struct ast_json *stasis_start_to_json(struct stasis_message *message,
138                 const struct stasis_message_sanitizer *sanitize)
139 {
140         struct start_message_blob *payload = stasis_message_data(message);
141         struct ast_json *msg;
142
143         if (sanitize && sanitize->channel_snapshot &&
144                         sanitize->channel_snapshot(payload->channel)) {
145                 return NULL;
146         }
147
148         msg = ast_json_pack("{s: s, s: O, s: O, s: o}",
149                 "type", "StasisStart",
150                 "timestamp", ast_json_object_get(payload->blob, "timestamp"),
151                 "args", ast_json_object_get(payload->blob, "args"),
152                 "channel", ast_channel_snapshot_to_json(payload->channel, NULL));
153         if (!msg) {
154                 ast_log(LOG_ERROR, "Failed to pack JSON for StasisStart message\n");
155                 return NULL;
156         }
157
158         if (payload->replace_channel) {
159                 int res = ast_json_object_set(msg, "replace_channel",
160                         ast_channel_snapshot_to_json(payload->replace_channel, NULL));
161
162                 if (res) {
163                         ast_json_unref(msg);
164                         ast_log(LOG_ERROR, "Failed to append JSON for StasisStart message\n");
165                         return NULL;
166                 }
167         }
168
169         return msg;
170 }
171
172 STASIS_MESSAGE_TYPE_DEFN_LOCAL(start_message_type,
173         .to_json = stasis_start_to_json);
174
175 const char *stasis_app_name(const struct stasis_app *app)
176 {
177         return app_name(app);
178 }
179
180 /*! AO2 hash function for \ref app */
181 static int app_hash(const void *obj, const int flags)
182 {
183         const struct stasis_app *app;
184         const char *key;
185
186         switch (flags & OBJ_SEARCH_MASK) {
187         case OBJ_SEARCH_KEY:
188                 key = obj;
189                 break;
190         case OBJ_SEARCH_OBJECT:
191                 app = obj;
192                 key = stasis_app_name(app);
193                 break;
194         default:
195                 /* Hash can only work on something with a full key. */
196                 ast_assert(0);
197                 return 0;
198         }
199         return ast_str_hash(key);
200 }
201
202 /*! AO2 comparison function for \ref app */
203 static int app_compare(void *obj, void *arg, int flags)
204 {
205         const struct stasis_app *object_left = obj;
206         const struct stasis_app *object_right = arg;
207         const char *right_key = arg;
208         int cmp;
209
210         switch (flags & OBJ_SEARCH_MASK) {
211         case OBJ_SEARCH_OBJECT:
212                 right_key = stasis_app_name(object_right);
213                 /* Fall through */
214         case OBJ_SEARCH_KEY:
215                 cmp = strcmp(stasis_app_name(object_left), right_key);
216                 break;
217         case OBJ_SEARCH_PARTIAL_KEY:
218                 /*
219                  * We could also use a partial key struct containing a length
220                  * so strlen() does not get called for every comparison instead.
221                  */
222                 cmp = strncmp(stasis_app_name(object_left), right_key, strlen(right_key));
223                 break;
224         default:
225                 /*
226                  * What arg points to is specific to this traversal callback
227                  * and has no special meaning to astobj2.
228                  */
229                 cmp = 0;
230                 break;
231         }
232         if (cmp) {
233                 return 0;
234         }
235         /*
236          * At this point the traversal callback is identical to a sorted
237          * container.
238          */
239         return CMP_MATCH;
240 }
241
242 /*! AO2 hash function for \ref stasis_app_control */
243 static int control_hash(const void *obj, const int flags)
244 {
245         const struct stasis_app_control *control;
246         const char *key;
247
248         switch (flags & OBJ_SEARCH_MASK) {
249         case OBJ_SEARCH_KEY:
250                 key = obj;
251                 break;
252         case OBJ_SEARCH_OBJECT:
253                 control = obj;
254                 key = stasis_app_control_get_channel_id(control);
255                 break;
256         default:
257                 /* Hash can only work on something with a full key. */
258                 ast_assert(0);
259                 return 0;
260         }
261         return ast_str_hash(key);
262 }
263
264 /*! AO2 comparison function for \ref stasis_app_control */
265 static int control_compare(void *obj, void *arg, int flags)
266 {
267         const struct stasis_app_control *object_left = obj;
268         const struct stasis_app_control *object_right = arg;
269         const char *right_key = arg;
270         int cmp;
271
272         switch (flags & OBJ_SEARCH_MASK) {
273         case OBJ_SEARCH_OBJECT:
274                 right_key = stasis_app_control_get_channel_id(object_right);
275                 /* Fall through */
276         case OBJ_SEARCH_KEY:
277                 cmp = strcmp(stasis_app_control_get_channel_id(object_left), right_key);
278                 break;
279         case OBJ_SEARCH_PARTIAL_KEY:
280                 /*
281                  * We could also use a partial key struct containing a length
282                  * so strlen() does not get called for every comparison instead.
283                  */
284                 cmp = strncmp(stasis_app_control_get_channel_id(object_left), right_key, strlen(right_key));
285                 break;
286         default:
287                 /*
288                  * What arg points to is specific to this traversal callback
289                  * and has no special meaning to astobj2.
290                  */
291                 cmp = 0;
292                 break;
293         }
294         if (cmp) {
295                 return 0;
296         }
297         /*
298          * At this point the traversal callback is identical to a sorted
299          * container.
300          */
301         return CMP_MATCH;
302 }
303
304 static int cleanup_cb(void *obj, void *arg, int flags)
305 {
306         struct stasis_app *app = obj;
307
308         if (!app_is_finished(app)) {
309                 return 0;
310         }
311
312         ast_verb(1, "Shutting down application '%s'\n", stasis_app_name(app));
313         app_shutdown(app);
314
315         return CMP_MATCH;
316
317 }
318
319 /*!
320  * \brief Clean up any old apps that we don't need any more.
321  */
322 static void cleanup(void)
323 {
324         ao2_callback(apps_registry, OBJ_MULTIPLE | OBJ_NODATA | OBJ_UNLINK,
325                 cleanup_cb, NULL);
326 }
327
328 struct stasis_app_control *stasis_app_control_create(struct ast_channel *chan)
329 {
330         return control_create(chan, NULL);
331 }
332
333 struct stasis_app_control *stasis_app_control_find_by_channel(
334         const struct ast_channel *chan)
335 {
336         if (chan == NULL) {
337                 return NULL;
338         }
339
340         return stasis_app_control_find_by_channel_id(
341                 ast_channel_uniqueid(chan));
342 }
343
344 struct stasis_app_control *stasis_app_control_find_by_channel_id(
345         const char *channel_id)
346 {
347         return ao2_find(app_controls, channel_id, OBJ_SEARCH_KEY);
348 }
349
350 /*! AO2 hash function for bridges container  */
351 static int bridges_hash(const void *obj, const int flags)
352 {
353         const struct ast_bridge *bridge;
354         const char *key;
355
356         switch (flags & OBJ_SEARCH_MASK) {
357         case OBJ_SEARCH_KEY:
358                 key = obj;
359                 break;
360         case OBJ_SEARCH_OBJECT:
361                 bridge = obj;
362                 key = bridge->uniqueid;
363                 break;
364         default:
365                 /* Hash can only work on something with a full key. */
366                 ast_assert(0);
367                 return 0;
368         }
369         return ast_str_hash(key);
370 }
371
372 /*! AO2 comparison function for bridges container */
373 static int bridges_compare(void *obj, void *arg, int flags)
374 {
375         const struct ast_bridge *object_left = obj;
376         const struct ast_bridge *object_right = arg;
377         const char *right_key = arg;
378         int cmp;
379
380         switch (flags & OBJ_SEARCH_MASK) {
381         case OBJ_SEARCH_OBJECT:
382                 right_key = object_right->uniqueid;
383                 /* Fall through */
384         case OBJ_SEARCH_KEY:
385                 cmp = strcmp(object_left->uniqueid, right_key);
386                 break;
387         case OBJ_SEARCH_PARTIAL_KEY:
388                 /*
389                  * We could also use a partial key struct containing a length
390                  * so strlen() does not get called for every comparison instead.
391                  */
392                 cmp = strncmp(object_left->uniqueid, right_key, strlen(right_key));
393                 break;
394         default:
395                 /*
396                  * What arg points to is specific to this traversal callback
397                  * and has no special meaning to astobj2.
398                  */
399                 cmp = 0;
400                 break;
401         }
402         if (cmp) {
403                 return 0;
404         }
405         /*
406          * At this point the traversal callback is identical to a sorted
407          * container.
408          */
409         return CMP_MATCH;
410 }
411
412 /*!
413  *  Used with app_bridges_moh and app_bridge_control, they provide links
414  *  between bridges and channels used for ARI application purposes
415  */
416 struct stasis_app_bridge_channel_wrapper {
417         AST_DECLARE_STRING_FIELDS(
418                 AST_STRING_FIELD(channel_id);
419                 AST_STRING_FIELD(bridge_id);
420         );
421 };
422
423 static void stasis_app_bridge_channel_wrapper_destructor(void *obj)
424 {
425         struct stasis_app_bridge_channel_wrapper *wrapper = obj;
426         ast_string_field_free_memory(wrapper);
427 }
428
429 /*! AO2 hash function for the bridges moh container */
430 static int bridges_channel_hash_fn(const void *obj, const int flags)
431 {
432         const struct stasis_app_bridge_channel_wrapper *wrapper;
433         const char *key;
434
435         switch (flags & OBJ_SEARCH_MASK) {
436         case OBJ_SEARCH_KEY:
437                 key = obj;
438                 break;
439         case OBJ_SEARCH_OBJECT:
440                 wrapper = obj;
441                 key = wrapper->bridge_id;
442                 break;
443         default:
444                 /* Hash can only work on something with a full key. */
445                 ast_assert(0);
446                 return 0;
447         }
448         return ast_str_hash(key);
449 }
450
451 static int bridges_channel_sort_fn(const void *obj_left, const void *obj_right, const int flags)
452 {
453         const struct stasis_app_bridge_channel_wrapper *left = obj_left;
454         const struct stasis_app_bridge_channel_wrapper *right = obj_right;
455         const char *right_key = obj_right;
456         int cmp;
457
458         switch (flags & OBJ_SEARCH_MASK) {
459         case OBJ_SEARCH_OBJECT:
460                 right_key = right->bridge_id;
461                 /* Fall through */
462         case OBJ_SEARCH_KEY:
463                 cmp = strcmp(left->bridge_id, right_key);
464                 break;
465         case OBJ_SEARCH_PARTIAL_KEY:
466                 cmp = strncmp(left->bridge_id, right_key, strlen(right_key));
467                 break;
468         default:
469                 /* Sort can only work on something with a full or partial key. */
470                 ast_assert(0);
471                 cmp = 0;
472                 break;
473         }
474         return cmp;
475 }
476
477 /*! Removes the bridge to music on hold channel link */
478 static void remove_bridge_moh(char *bridge_id)
479 {
480         ao2_find(app_bridges_moh, bridge_id, OBJ_SEARCH_KEY | OBJ_UNLINK | OBJ_NODATA);
481         ast_free(bridge_id);
482 }
483
484 /*! After bridge failure callback for moh channels */
485 static void moh_after_bridge_cb_failed(enum ast_bridge_after_cb_reason reason, void *data)
486 {
487         char *bridge_id = data;
488
489         remove_bridge_moh(bridge_id);
490 }
491
492 /*! After bridge callback for moh channels */
493 static void moh_after_bridge_cb(struct ast_channel *chan, void *data)
494 {
495         char *bridge_id = data;
496
497         remove_bridge_moh(bridge_id);
498 }
499
500 /*! Request a bridge MOH channel */
501 static struct ast_channel *prepare_bridge_moh_channel(void)
502 {
503         RAII_VAR(struct ast_format_cap *, cap, NULL, ao2_cleanup);
504
505         cap = ast_format_cap_alloc(AST_FORMAT_CAP_FLAG_DEFAULT);
506         if (!cap) {
507                 return NULL;
508         }
509
510         ast_format_cap_append(cap, ast_format_slin, 0);
511
512         return ast_request("Announcer", cap, NULL, NULL, "ARI_MOH", NULL);
513 }
514
515 /*! Provides the moh channel with a thread so it can actually play its music */
516 static void *moh_channel_thread(void *data)
517 {
518         struct ast_channel *moh_channel = data;
519
520         while (!ast_safe_sleep(moh_channel, 1000)) {
521         }
522
523         ast_moh_stop(moh_channel);
524         ast_hangup(moh_channel);
525
526         return NULL;
527 }
528
529 /*!
530  * \internal
531  * \brief Creates, pushes, and links a channel for playing music on hold to bridge
532  *
533  * \param bridge Which bridge this moh channel exists for
534  *
535  * \retval NULL if the channel could not be created, pushed, or linked
536  * \retval Reference to the channel on success
537  */
538 static struct ast_channel *bridge_moh_create(struct ast_bridge *bridge)
539 {
540         RAII_VAR(struct stasis_app_bridge_channel_wrapper *, new_wrapper, NULL, ao2_cleanup);
541         RAII_VAR(char *, bridge_id, ast_strdup(bridge->uniqueid), ast_free);
542         struct ast_channel *chan;
543         pthread_t threadid;
544
545         if (!bridge_id) {
546                 return NULL;
547         }
548
549         chan = prepare_bridge_moh_channel();
550         if (!chan) {
551                 return NULL;
552         }
553
554         if (stasis_app_channel_unreal_set_internal(chan)) {
555                 ast_hangup(chan);
556                 return NULL;
557         }
558
559         /* The after bridge callback assumes responsibility of the bridge_id. */
560         if (ast_bridge_set_after_callback(chan,
561                 moh_after_bridge_cb, moh_after_bridge_cb_failed, bridge_id)) {
562                 ast_hangup(chan);
563                 return NULL;
564         }
565         bridge_id = NULL;
566
567         if (ast_unreal_channel_push_to_bridge(chan, bridge,
568                 AST_BRIDGE_CHANNEL_FLAG_IMMOVABLE | AST_BRIDGE_CHANNEL_FLAG_LONELY)) {
569                 ast_hangup(chan);
570                 return NULL;
571         }
572
573         new_wrapper = ao2_alloc_options(sizeof(*new_wrapper),
574                 stasis_app_bridge_channel_wrapper_destructor, AO2_ALLOC_OPT_LOCK_NOLOCK);
575         if (!new_wrapper) {
576                 ast_hangup(chan);
577                 return NULL;
578         }
579
580         if (ast_string_field_init(new_wrapper, 32)) {
581                 ast_hangup(chan);
582                 return NULL;
583         }
584         ast_string_field_set(new_wrapper, bridge_id, bridge->uniqueid);
585         ast_string_field_set(new_wrapper, channel_id, ast_channel_uniqueid(chan));
586
587         if (!ao2_link_flags(app_bridges_moh, new_wrapper, OBJ_NOLOCK)) {
588                 ast_hangup(chan);
589                 return NULL;
590         }
591
592         if (ast_pthread_create_detached(&threadid, NULL, moh_channel_thread, chan)) {
593                 ast_log(LOG_ERROR, "Failed to create channel thread. Abandoning MOH channel creation.\n");
594                 ao2_unlink_flags(app_bridges_moh, new_wrapper, OBJ_NOLOCK);
595                 ast_hangup(chan);
596                 return NULL;
597         }
598
599         return chan;
600 }
601
602 struct ast_channel *stasis_app_bridge_moh_channel(struct ast_bridge *bridge)
603 {
604         RAII_VAR(struct stasis_app_bridge_channel_wrapper *, moh_wrapper, NULL, ao2_cleanup);
605
606         {
607                 SCOPED_AO2LOCK(lock, app_bridges_moh);
608
609                 moh_wrapper = ao2_find(app_bridges_moh, bridge->uniqueid, OBJ_SEARCH_KEY | OBJ_NOLOCK);
610                 if (!moh_wrapper) {
611                         return bridge_moh_create(bridge);
612                 }
613         }
614
615         return ast_channel_get_by_name(moh_wrapper->channel_id);
616 }
617
618 int stasis_app_bridge_moh_stop(struct ast_bridge *bridge)
619 {
620         RAII_VAR(struct stasis_app_bridge_channel_wrapper *, moh_wrapper, NULL, ao2_cleanup);
621         struct ast_channel *chan;
622
623         moh_wrapper = ao2_find(app_bridges_moh, bridge->uniqueid, OBJ_SEARCH_KEY | OBJ_UNLINK);
624         if (!moh_wrapper) {
625                 return -1;
626         }
627
628         chan = ast_channel_get_by_name(moh_wrapper->channel_id);
629         if (!chan) {
630                 return -1;
631         }
632
633         ast_moh_stop(chan);
634         ast_softhangup(chan, AST_CAUSE_NORMAL_CLEARING);
635         ao2_cleanup(chan);
636
637         return 0;
638 }
639
640 /*! Removes the bridge to playback channel link */
641 static void remove_bridge_playback(char *bridge_id)
642 {
643         struct stasis_app_bridge_channel_wrapper *wrapper;
644         struct stasis_app_control *control;
645
646         wrapper = ao2_find(app_bridges_playback, bridge_id, OBJ_SEARCH_KEY | OBJ_UNLINK);
647
648         if (wrapper) {
649                 control = stasis_app_control_find_by_channel_id(wrapper->channel_id);
650                 if (control) {
651                         ao2_unlink(app_controls, control);
652                         ao2_ref(control, -1);
653                 }
654                 ao2_ref(wrapper, -1);
655         }
656         ast_free(bridge_id);
657 }
658
659 static void playback_after_bridge_cb_failed(enum ast_bridge_after_cb_reason reason, void *data)
660 {
661         char *bridge_id = data;
662
663         remove_bridge_playback(bridge_id);
664 }
665
666 static void playback_after_bridge_cb(struct ast_channel *chan, void *data)
667 {
668         char *bridge_id = data;
669
670         remove_bridge_playback(bridge_id);
671 }
672
673 int stasis_app_bridge_playback_channel_add(struct ast_bridge *bridge,
674         struct ast_channel *chan,
675         struct stasis_app_control *control)
676 {
677         RAII_VAR(struct stasis_app_bridge_channel_wrapper *, new_wrapper, NULL, ao2_cleanup);
678         char *bridge_id = ast_strdup(bridge->uniqueid);
679
680         if (!bridge_id) {
681                 return -1;
682         }
683
684         if (ast_bridge_set_after_callback(chan,
685                 playback_after_bridge_cb, playback_after_bridge_cb_failed, bridge_id)) {
686                 ast_free(bridge_id);
687                 return -1;
688         }
689
690         new_wrapper = ao2_alloc_options(sizeof(*new_wrapper),
691                 stasis_app_bridge_channel_wrapper_destructor, AO2_ALLOC_OPT_LOCK_NOLOCK);
692         if (!new_wrapper) {
693                 return -1;
694         }
695
696         if (ast_string_field_init(new_wrapper, 32)) {
697                 return -1;
698         }
699
700         ast_string_field_set(new_wrapper, bridge_id, bridge->uniqueid);
701         ast_string_field_set(new_wrapper, channel_id, ast_channel_uniqueid(chan));
702
703         if (!ao2_link(app_bridges_playback, new_wrapper)) {
704                 return -1;
705         }
706
707         ao2_link(app_controls, control);
708         return 0;
709 }
710
711 struct ast_channel *stasis_app_bridge_playback_channel_find(struct ast_bridge *bridge)
712 {
713         struct stasis_app_bridge_channel_wrapper *playback_wrapper;
714         struct ast_channel *chan;
715
716         playback_wrapper = ao2_find(app_bridges_playback, bridge->uniqueid, OBJ_SEARCH_KEY);
717         if (!playback_wrapper) {
718                 return NULL;
719         }
720
721         chan = ast_channel_get_by_name(playback_wrapper->channel_id);
722         ao2_ref(playback_wrapper, -1);
723         return chan;
724 }
725
726 struct ast_bridge *stasis_app_bridge_find_by_id(
727         const char *bridge_id)
728 {
729         return ao2_find(app_bridges, bridge_id, OBJ_SEARCH_KEY);
730 }
731
732
733 /*!
734  * \brief In addition to running ao2_cleanup(), this function also removes the
735  * object from the app_controls container.
736  */
737 static void control_unlink(struct stasis_app_control *control)
738 {
739         if (!control) {
740                 return;
741         }
742
743         ao2_unlink(app_controls, control);
744         ao2_cleanup(control);
745 }
746
747 struct ast_bridge *stasis_app_bridge_create(const char *type, const char *name, const char *id)
748 {
749         struct ast_bridge *bridge;
750         char *requested_type, *requested_types = ast_strdupa(S_OR(type, "mixing"));
751         int capabilities = 0;
752         int flags = AST_BRIDGE_FLAG_MERGE_INHIBIT_FROM | AST_BRIDGE_FLAG_MERGE_INHIBIT_TO
753                 | AST_BRIDGE_FLAG_SWAP_INHIBIT_FROM | AST_BRIDGE_FLAG_SWAP_INHIBIT_TO
754                 | AST_BRIDGE_FLAG_TRANSFER_BRIDGE_ONLY;
755
756         while ((requested_type = strsep(&requested_types, ","))) {
757                 requested_type = ast_strip(requested_type);
758
759                 if (!strcmp(requested_type, "mixing")) {
760                         capabilities |= STASIS_BRIDGE_MIXING_CAPABILITIES;
761                         flags |= AST_BRIDGE_FLAG_SMART;
762                 } else if (!strcmp(requested_type, "holding")) {
763                         capabilities |= AST_BRIDGE_CAPABILITY_HOLDING;
764                 } else if (!strcmp(requested_type, "dtmf_events") ||
765                         !strcmp(requested_type, "proxy_media")) {
766                         capabilities &= ~AST_BRIDGE_CAPABILITY_NATIVE;
767                 }
768         }
769
770         if (!capabilities
771                 /* Holding and mixing capabilities don't mix. */
772                 || ((capabilities & AST_BRIDGE_CAPABILITY_HOLDING)
773                         && (capabilities & (STASIS_BRIDGE_MIXING_CAPABILITIES)))) {
774                 return NULL;
775         }
776
777         bridge = bridge_stasis_new(capabilities, flags, name, id);
778         if (bridge) {
779                 if (!ao2_link(app_bridges, bridge)) {
780                         ast_bridge_destroy(bridge, 0);
781                         bridge = NULL;
782                 }
783         }
784         return bridge;
785 }
786
787 void stasis_app_bridge_destroy(const char *bridge_id)
788 {
789         struct ast_bridge *bridge = stasis_app_bridge_find_by_id(bridge_id);
790         if (!bridge) {
791                 return;
792         }
793         ao2_unlink(app_bridges, bridge);
794         ast_bridge_destroy(bridge, 0);
795 }
796
797 struct replace_channel_store {
798         struct ast_channel_snapshot *snapshot;
799         char *app;
800 };
801
802 static void replace_channel_destroy(void *obj)
803 {
804         struct replace_channel_store *replace = obj;
805
806         ao2_cleanup(replace->snapshot);
807         ast_free(replace->app);
808         ast_free(replace);
809 }
810
811 static const struct ast_datastore_info replace_channel_store_info = {
812         .type = "replace-channel-store",
813         .destroy = replace_channel_destroy,
814 };
815
816 static struct replace_channel_store *get_replace_channel_store(struct ast_channel *chan, int no_create)
817 {
818         struct ast_datastore *datastore;
819
820         SCOPED_CHANNELLOCK(lock, chan);
821         datastore = ast_channel_datastore_find(chan, &replace_channel_store_info, NULL);
822         if (!datastore) {
823                 if (no_create) {
824                         return NULL;
825                 }
826
827                 datastore = ast_datastore_alloc(&replace_channel_store_info, NULL);
828                 if (!datastore) {
829                         return NULL;
830                 }
831                 ast_channel_datastore_add(chan, datastore);
832         }
833
834         if (!datastore->data) {
835                 datastore->data = ast_calloc(1, sizeof(struct replace_channel_store));
836         }
837         return datastore->data;
838 }
839
840 int app_set_replace_channel_snapshot(struct ast_channel *chan, struct ast_channel_snapshot *replace_snapshot)
841 {
842         struct replace_channel_store *replace = get_replace_channel_store(chan, 0);
843
844         if (!replace) {
845                 return -1;
846         }
847
848         ao2_replace(replace->snapshot, replace_snapshot);
849         return 0;
850 }
851
852 int app_set_replace_channel_app(struct ast_channel *chan, const char *replace_app)
853 {
854         struct replace_channel_store *replace = get_replace_channel_store(chan, 0);
855
856         if (!replace) {
857                 return -1;
858         }
859
860         ast_free(replace->app);
861         replace->app = NULL;
862
863         if (replace_app) {
864                 replace->app = ast_strdup(replace_app);
865                 if (!replace->app) {
866                         return -1;
867                 }
868         }
869
870         return 0;
871 }
872
873 static struct ast_channel_snapshot *get_replace_channel_snapshot(struct ast_channel *chan)
874 {
875         struct replace_channel_store *replace = get_replace_channel_store(chan, 1);
876         struct ast_channel_snapshot *replace_channel_snapshot;
877
878         if (!replace) {
879                 return NULL;
880         }
881
882         replace_channel_snapshot = replace->snapshot;
883         replace->snapshot = NULL;
884
885         return replace_channel_snapshot;
886 }
887
888 char *app_get_replace_channel_app(struct ast_channel *chan)
889 {
890         struct replace_channel_store *replace = get_replace_channel_store(chan, 1);
891         char *replace_channel_app;
892
893         if (!replace) {
894                 return NULL;
895         }
896
897         replace_channel_app = replace->app;
898         replace->app = NULL;
899
900         return replace_channel_app;
901 }
902
903 static void start_message_blob_dtor(void *obj)
904 {
905         struct start_message_blob *payload = obj;
906
907         ao2_cleanup(payload->channel);
908         ao2_cleanup(payload->replace_channel);
909         ast_json_unref(payload->blob);
910 }
911
912 static int send_start_msg_snapshots(struct ast_channel *chan, struct stasis_app *app,
913         int argc, char *argv[], struct ast_channel_snapshot *snapshot,
914         struct ast_channel_snapshot *replace_channel_snapshot)
915 {
916         RAII_VAR(struct ast_json *, json_blob, NULL, ast_json_unref);
917         struct ast_json *json_args;
918         RAII_VAR(struct start_message_blob *, payload, NULL, ao2_cleanup);
919         struct stasis_message *msg;
920         int i;
921
922         if (app_subscribe_channel(app, chan)) {
923                 ast_log(LOG_ERROR, "Error subscribing app '%s' to channel '%s'\n",
924                         app_name(app), ast_channel_name(chan));
925                 return -1;
926         }
927
928         payload = ao2_alloc(sizeof(*payload), start_message_blob_dtor);
929         if (!payload) {
930                 ast_log(LOG_ERROR, "Error packing JSON for StasisStart message\n");
931                 return -1;
932         }
933
934         payload->channel = ao2_bump(snapshot);
935         payload->replace_channel = ao2_bump(replace_channel_snapshot);
936
937         json_blob = ast_json_pack("{s: s, s: o, s: []}",
938                 "app", app_name(app),
939                 "timestamp", ast_json_timeval(ast_tvnow(), NULL),
940                 "args");
941         if (!json_blob) {
942                 ast_log(LOG_ERROR, "Error packing JSON for StasisStart message\n");
943                 return -1;
944         }
945
946         /* Append arguments to args array */
947         json_args = ast_json_object_get(json_blob, "args");
948         ast_assert(json_args != NULL);
949         for (i = 0; i < argc; ++i) {
950                 int r = ast_json_array_append(json_args,
951                                               ast_json_string_create(argv[i]));
952                 if (r != 0) {
953                         ast_log(LOG_ERROR, "Error appending to StasisStart message\n");
954                         return -1;
955                 }
956         }
957
958         payload->blob = ast_json_ref(json_blob);
959
960         msg = stasis_message_create(start_message_type(), payload);
961         if (!msg) {
962                 ast_log(LOG_ERROR, "Error sending StasisStart message\n");
963                 return -1;
964         }
965
966         if (replace_channel_snapshot) {
967                 app_unsubscribe_channel_id(app, replace_channel_snapshot->uniqueid);
968         }
969         stasis_publish(ast_app_get_topic(app), msg);
970         ao2_ref(msg, -1);
971         return 0;
972 }
973
974 static int send_start_msg(struct stasis_app *app, struct ast_channel *chan,
975         int argc, char *argv[])
976 {
977         RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
978         RAII_VAR(struct ast_channel_snapshot *, replace_channel_snapshot,
979                 NULL, ao2_cleanup);
980
981         ast_assert(chan != NULL);
982
983         replace_channel_snapshot = get_replace_channel_snapshot(chan);
984
985         /* Set channel info */
986         ast_channel_lock(chan);
987         snapshot = ast_channel_snapshot_create(chan);
988         ast_channel_unlock(chan);
989         if (!snapshot) {
990                 return -1;
991         }
992         return send_start_msg_snapshots(chan, app, argc, argv, snapshot, replace_channel_snapshot);
993 }
994
995 static void remove_masquerade_store(struct ast_channel *chan);
996
997 int app_send_end_msg(struct stasis_app *app, struct ast_channel *chan)
998 {
999         struct stasis_message_sanitizer *sanitize = stasis_app_get_sanitizer();
1000         struct ast_json *blob;
1001         struct stasis_message *msg;
1002
1003         if (sanitize && sanitize->channel
1004                 && sanitize->channel(chan)) {
1005                 return 0;
1006         }
1007
1008         blob = ast_json_pack("{s: s}", "app", app_name(app));
1009         if (!blob) {
1010                 ast_log(LOG_ERROR, "Error packing JSON for StasisEnd message\n");
1011                 return -1;
1012         }
1013
1014         remove_masquerade_store(chan);
1015         app_unsubscribe_channel(app, chan);
1016         msg = ast_channel_blob_create(chan, end_message_type(), blob);
1017         if (msg) {
1018                 stasis_publish(ast_app_get_topic(app), msg);
1019         }
1020         ao2_cleanup(msg);
1021         ast_json_unref(blob);
1022
1023         return 0;
1024 }
1025
1026 static int masq_match_cb(void *obj, void *data, int flags)
1027 {
1028         struct stasis_app_control *control = obj;
1029         struct ast_channel *chan = data;
1030
1031         if (!strcmp(ast_channel_uniqueid(chan),
1032                 stasis_app_control_get_channel_id(control))) {
1033                 return CMP_MATCH;
1034         }
1035
1036         return 0;
1037 }
1038
1039 static void channel_stolen_cb(void *data, struct ast_channel *old_chan, struct ast_channel *new_chan)
1040 {
1041         struct stasis_app_control *control;
1042
1043         /* find control */
1044         control = ao2_callback(app_controls, 0, masq_match_cb, old_chan);
1045         if (!control) {
1046                 ast_log(LOG_ERROR, "Could not find control for masqueraded channel\n");
1047                 return;
1048         }
1049
1050         /* send the StasisEnd message to the app */
1051         stasis_app_channel_set_stasis_end_published(new_chan);
1052         app_send_end_msg(control_app(control), new_chan);
1053
1054         /* remove the datastore */
1055         remove_masquerade_store(old_chan);
1056
1057         ao2_cleanup(control);
1058 }
1059
1060 static void channel_replaced_cb(void *data, struct ast_channel *old_chan, struct ast_channel *new_chan)
1061 {
1062         RAII_VAR(struct ast_channel_snapshot *, new_snapshot, NULL, ao2_cleanup);
1063         RAII_VAR(struct ast_channel_snapshot *, old_snapshot, NULL, ao2_cleanup);
1064         struct stasis_app_control *control;
1065
1066         /* At this point, new_chan is the channel pointer that is in Stasis() and
1067          * has the unknown channel's name in it while old_chan is the channel pointer
1068          * that is not in Stasis(), but has the guts of the channel that Stasis() knows
1069          * about */
1070
1071         /* grab a snapshot for the channel that is jumping into Stasis() */
1072         new_snapshot = ast_channel_snapshot_get_latest(ast_channel_uniqueid(new_chan));
1073         if (!new_snapshot) {
1074                 ast_log(LOG_ERROR, "Could not get snapshot for masquerading channel\n");
1075                 return;
1076         }
1077
1078         /* grab a snapshot for the channel that has been kicked out of Stasis() */
1079         old_snapshot = ast_channel_snapshot_get_latest(ast_channel_uniqueid(old_chan));
1080         if (!old_snapshot) {
1081                 ast_log(LOG_ERROR, "Could not get snapshot for masqueraded channel\n");
1082                 return;
1083         }
1084
1085         /* find, unlink, and relink control since the channel has a new name and
1086          * its hash has likely changed */
1087         control = ao2_callback(app_controls, OBJ_UNLINK, masq_match_cb, new_chan);
1088         if (!control) {
1089                 ast_log(LOG_ERROR, "Could not find control for masquerading channel\n");
1090                 return;
1091         }
1092         ao2_link(app_controls, control);
1093
1094
1095         /* send the StasisStart with replace_channel to the app */
1096         send_start_msg_snapshots(new_chan, control_app(control), 0, NULL, new_snapshot,
1097                 old_snapshot);
1098         /* send the StasisEnd message to the app */
1099         app_send_end_msg(control_app(control), old_chan);
1100
1101         ao2_cleanup(control);
1102 }
1103
1104 static const struct ast_datastore_info masquerade_store_info = {
1105         .type = "stasis-masqerade",
1106         .chan_fixup = channel_stolen_cb,
1107         .chan_breakdown = channel_replaced_cb,
1108 };
1109
1110 static int has_masquerade_store(struct ast_channel *chan)
1111 {
1112         SCOPED_CHANNELLOCK(lock, chan);
1113         return !!ast_channel_datastore_find(chan, &masquerade_store_info, NULL);
1114 }
1115
1116 static int add_masquerade_store(struct ast_channel *chan)
1117 {
1118         struct ast_datastore *datastore;
1119
1120         SCOPED_CHANNELLOCK(lock, chan);
1121         if (ast_channel_datastore_find(chan, &masquerade_store_info, NULL)) {
1122                 return 0;
1123         }
1124
1125         datastore = ast_datastore_alloc(&masquerade_store_info, NULL);
1126         if (!datastore) {
1127                 return -1;
1128         }
1129
1130         ast_channel_datastore_add(chan, datastore);
1131
1132         return 0;
1133 }
1134
1135 static void remove_masquerade_store(struct ast_channel *chan)
1136 {
1137         struct ast_datastore *datastore;
1138
1139         SCOPED_CHANNELLOCK(lock, chan);
1140         datastore = ast_channel_datastore_find(chan, &masquerade_store_info, NULL);
1141         if (!datastore) {
1142                 return;
1143         }
1144
1145         ast_channel_datastore_remove(chan, datastore);
1146         ast_datastore_free(datastore);
1147 }
1148
1149 void stasis_app_control_execute_until_exhausted(struct ast_channel *chan, struct stasis_app_control *control)
1150 {
1151         while (!control_is_done(control)) {
1152                 int command_count;
1153                 command_count = control_dispatch_all(control, chan);
1154
1155                 ao2_lock(control);
1156
1157                 if (control_command_count(control)) {
1158                         /* If the command queue isn't empty, something added to the queue before it was locked. */
1159                         ao2_unlock(control);
1160                         continue;
1161                 }
1162
1163                 if (command_count == 0 || ast_channel_fdno(chan) == -1) {
1164                         control_mark_done(control);
1165                         ao2_unlock(control);
1166                         break;
1167                 }
1168                 ao2_unlock(control);
1169         }
1170 }
1171
1172 int stasis_app_control_is_done(struct stasis_app_control *control)
1173 {
1174         return control_is_done(control);
1175 }
1176
1177 struct ast_datastore_info set_end_published_info = {
1178         .type = "stasis_end_published",
1179 };
1180
1181 void stasis_app_channel_set_stasis_end_published(struct ast_channel *chan)
1182 {
1183         struct ast_datastore *datastore;
1184
1185         datastore = ast_datastore_alloc(&set_end_published_info, NULL);
1186
1187         ast_channel_lock(chan);
1188         ast_channel_datastore_add(chan, datastore);
1189         ast_channel_unlock(chan);
1190 }
1191
1192 int stasis_app_channel_is_stasis_end_published(struct ast_channel *chan)
1193 {
1194         struct ast_datastore *datastore;
1195
1196         ast_channel_lock(chan);
1197         datastore = ast_channel_datastore_find(chan, &set_end_published_info, NULL);
1198         ast_channel_unlock(chan);
1199
1200         return datastore ? 1 : 0;
1201 }
1202
1203 static void remove_stasis_end_published(struct ast_channel *chan)
1204 {
1205         struct ast_datastore *datastore;
1206
1207         ast_channel_lock(chan);
1208         datastore = ast_channel_datastore_find(chan, &set_end_published_info, NULL);
1209         ast_channel_unlock(chan);
1210
1211         if (datastore) {
1212                 ast_channel_datastore_remove(chan, datastore);
1213                 ast_datastore_free(datastore);
1214         }
1215 }
1216
1217 /*! /brief Stasis dialplan application callback */
1218 int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc,
1219                     char *argv[])
1220 {
1221         SCOPED_MODULE_USE(ast_module_info->self);
1222
1223         RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
1224         RAII_VAR(struct stasis_app_control *, control, NULL, control_unlink);
1225         struct ast_bridge *bridge = NULL;
1226         int res = 0;
1227         int needs_depart;
1228
1229         ast_assert(chan != NULL);
1230
1231         /* Just in case there's a lingering indication that the channel has had a stasis
1232          * end published on it, remove that now.
1233          */
1234         remove_stasis_end_published(chan);
1235
1236         if (!apps_registry) {
1237                 return -1;
1238         }
1239
1240         app = ao2_find(apps_registry, app_name, OBJ_SEARCH_KEY);
1241         if (!app) {
1242                 ast_log(LOG_ERROR,
1243                         "Stasis app '%s' not registered\n", app_name);
1244                 return -1;
1245         }
1246         if (!app_is_active(app)) {
1247                 ast_log(LOG_ERROR,
1248                         "Stasis app '%s' not active\n", app_name);
1249                 return -1;
1250         }
1251
1252         control = control_create(chan, app);
1253         if (!control) {
1254                 ast_log(LOG_ERROR, "Allocated failed\n");
1255                 return -1;
1256         }
1257         ao2_link(app_controls, control);
1258
1259         if (add_masquerade_store(chan)) {
1260                 ast_log(LOG_ERROR, "Failed to attach masquerade detector\n");
1261                 return -1;
1262         }
1263
1264         res = send_start_msg(app, chan, argc, argv);
1265         if (res != 0) {
1266                 ast_log(LOG_ERROR,
1267                         "Error sending start message to '%s'\n", app_name);
1268                 remove_masquerade_store(chan);
1269                 return -1;
1270         }
1271
1272         /* Pull queued prestart commands and execute */
1273         control_prestart_dispatch_all(control, chan);
1274
1275         while (!control_is_done(control)) {
1276                 RAII_VAR(struct ast_frame *, f, NULL, ast_frame_dtor);
1277                 int r;
1278                 int command_count;
1279                 RAII_VAR(struct ast_bridge *, last_bridge, NULL, ao2_cleanup);
1280
1281                 /* Check to see if a bridge absorbed our hangup frame */
1282                 if (ast_check_hangup_locked(chan)) {
1283                         break;
1284                 }
1285
1286                 last_bridge = bridge;
1287                 bridge = ao2_bump(stasis_app_get_bridge(control));
1288
1289                 if (bridge != last_bridge) {
1290                         app_unsubscribe_bridge(app, last_bridge);
1291                         app_subscribe_bridge(app, bridge);
1292                 }
1293
1294                 if (bridge) {
1295                         /* Bridge is handling channel frames */
1296                         control_wait(control);
1297                         control_dispatch_all(control, chan);
1298                         continue;
1299                 }
1300
1301                 r = ast_waitfor(chan, MAX_WAIT_MS);
1302
1303                 if (r < 0) {
1304                         ast_debug(3, "%s: Poll error\n",
1305                                   ast_channel_uniqueid(chan));
1306                         break;
1307                 }
1308
1309                 command_count = control_dispatch_all(control, chan);
1310
1311                 if (command_count > 0 && ast_channel_fdno(chan) == -1) {
1312                         /* Command drained the channel; wait for next frame */
1313                         continue;
1314                 }
1315
1316                 if (r == 0) {
1317                         /* Timeout */
1318                         continue;
1319                 }
1320
1321                 f = ast_read(chan);
1322                 if (!f) {
1323                         /* Continue on in the dialplan */
1324                         ast_debug(3, "%s: Hangup (no more frames)\n",
1325                                 ast_channel_uniqueid(chan));
1326                         break;
1327                 }
1328
1329                 if (f->frametype == AST_FRAME_CONTROL) {
1330                         if (f->subclass.integer == AST_CONTROL_HANGUP) {
1331                                 /* Continue on in the dialplan */
1332                                 ast_debug(3, "%s: Hangup\n",
1333                                         ast_channel_uniqueid(chan));
1334                                 break;
1335                         }
1336                 }
1337         }
1338
1339         ast_channel_lock(chan);
1340         needs_depart = ast_channel_is_bridged(chan);
1341         ast_channel_unlock(chan);
1342         if (needs_depart) {
1343                 ast_bridge_depart(chan);
1344         }
1345
1346         app_unsubscribe_bridge(app, stasis_app_get_bridge(control));
1347         ao2_cleanup(bridge);
1348
1349         /* Only publish a stasis_end event if it hasn't already been published */
1350         if (!stasis_app_channel_is_stasis_end_published(chan)) {
1351                 /* A masquerade has occurred and this message will be wrong so it
1352                  * has already been sent elsewhere. */
1353                 res = has_masquerade_store(chan) && app_send_end_msg(app, chan);
1354                 if (res != 0) {
1355                         ast_log(LOG_ERROR,
1356                                 "Error sending end message to %s\n", app_name);
1357                         return res;
1358                 }
1359         } else {
1360                 remove_stasis_end_published(chan);
1361         }
1362
1363         /* There's an off chance that app is ready for cleanup. Go ahead
1364          * and clean up, just in case
1365          */
1366         cleanup();
1367
1368         /* The control needs to be removed from the controls container in
1369          * case a new PBX is started and ends up coming back into Stasis.
1370          */
1371         ao2_cleanup(app);
1372         app = NULL;
1373         control_unlink(control);
1374         control = NULL;
1375
1376         if (!ast_channel_pbx(chan)) {
1377                 int chan_hungup;
1378
1379                 /* The ASYNCGOTO softhangup flag may have broken the channel out of
1380                  * its bridge to run dialplan, so if there's no pbx on the channel
1381                  * let it run dialplan here. Otherwise, it will run when this
1382                  * application exits. */
1383                 ast_channel_lock(chan);
1384                 ast_channel_clear_softhangup(chan, AST_SOFTHANGUP_ASYNCGOTO);
1385                 chan_hungup = ast_check_hangup(chan);
1386                 ast_channel_unlock(chan);
1387
1388                 if (!chan_hungup) {
1389                         struct ast_pbx_args pbx_args;
1390
1391                         memset(&pbx_args, 0, sizeof(pbx_args));
1392                         pbx_args.no_hangup_chan = 1;
1393
1394                         res = ast_pbx_run_args(chan, &pbx_args);
1395                 }
1396         }
1397
1398         return res;
1399 }
1400
1401 int stasis_app_send(const char *app_name, struct ast_json *message)
1402 {
1403         RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
1404
1405         if (!apps_registry) {
1406                 return -1;
1407         }
1408
1409         app = ao2_find(apps_registry, app_name, OBJ_SEARCH_KEY);
1410         if (!app) {
1411                 /* XXX We can do a better job handling late binding, queueing up
1412                  * the call for a few seconds to wait for the app to register.
1413                  */
1414                 ast_log(LOG_WARNING,
1415                         "Stasis app '%s' not registered\n", app_name);
1416                 return -1;
1417         }
1418         app_send(app, message);
1419         return 0;
1420 }
1421
1422 static struct stasis_app *find_app_by_name(const char *app_name)
1423 {
1424         struct stasis_app *res = NULL;
1425
1426         if (!apps_registry) {
1427                 return NULL;
1428         }
1429
1430         if (!ast_strlen_zero(app_name)) {
1431                 res = ao2_find(apps_registry, app_name, OBJ_SEARCH_KEY);
1432         }
1433
1434         if (!res) {
1435                 ast_log(LOG_WARNING, "Could not find app '%s'\n",
1436                         app_name ? : "(null)");
1437         }
1438         return res;
1439 }
1440
1441 static int append_name(void *obj, void *arg, int flags)
1442 {
1443         struct stasis_app *app = obj;
1444         struct ao2_container *apps = arg;
1445
1446         ast_str_container_add(apps, stasis_app_name(app));
1447         return 0;
1448 }
1449
1450 struct ao2_container *stasis_app_get_all(void)
1451 {
1452         RAII_VAR(struct ao2_container *, apps, NULL, ao2_cleanup);
1453
1454         if (!apps_registry) {
1455                 return NULL;
1456         }
1457
1458         apps = ast_str_container_alloc(1);
1459         if (!apps) {
1460                 return NULL;
1461         }
1462
1463         ao2_callback(apps_registry, OBJ_NODATA, append_name, apps);
1464
1465         return ao2_bump(apps);
1466 }
1467
1468 int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data)
1469 {
1470         RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
1471
1472         if (!apps_registry) {
1473                 return -1;
1474         }
1475
1476         ao2_lock(apps_registry);
1477         app = ao2_find(apps_registry, app_name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1478         if (app) {
1479                 app_update(app, handler, data);
1480         } else {
1481                 app = app_create(app_name, handler, data);
1482                 if (app) {
1483                         ao2_link_flags(apps_registry, app, OBJ_NOLOCK);
1484                 } else {
1485                         ao2_unlock(apps_registry);
1486                         return -1;
1487                 }
1488         }
1489
1490         /* We lazily clean up the apps_registry, because it's good enough to
1491          * prevent memory leaks, and we're lazy.
1492          */
1493         cleanup();
1494         ao2_unlock(apps_registry);
1495         return 0;
1496 }
1497
1498 void stasis_app_unregister(const char *app_name)
1499 {
1500         RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
1501
1502         if (!app_name) {
1503                 return;
1504         }
1505
1506         if (!apps_registry) {
1507                 return;
1508         }
1509
1510         app = ao2_find(apps_registry, app_name, OBJ_SEARCH_KEY);
1511         if (!app) {
1512                 ast_log(LOG_ERROR,
1513                         "Stasis app '%s' not registered\n", app_name);
1514                 return;
1515         }
1516
1517         app_deactivate(app);
1518
1519         /* There's a decent chance that app is ready for cleanup. Go ahead
1520          * and clean up, just in case
1521          */
1522         cleanup();
1523 }
1524
1525 /*!
1526  * \internal \brief List of registered event sources.
1527  */
1528 AST_RWLIST_HEAD_STATIC(event_sources, stasis_app_event_source);
1529
1530 void stasis_app_register_event_source(struct stasis_app_event_source *obj)
1531 {
1532         SCOPED_LOCK(lock, &event_sources, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
1533         AST_LIST_INSERT_TAIL(&event_sources, obj, next);
1534         /* only need to bump the module ref on non-core sources because the
1535            core ones are [un]registered by this module. */
1536         if (!stasis_app_is_core_event_source(obj)) {
1537                 ast_module_ref(ast_module_info->self);
1538         }
1539 }
1540
1541 void stasis_app_unregister_event_source(struct stasis_app_event_source *obj)
1542 {
1543         struct stasis_app_event_source *source;
1544         SCOPED_LOCK(lock, &event_sources, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
1545         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&event_sources, source, next) {
1546                 if (source == obj) {
1547                         AST_RWLIST_REMOVE_CURRENT(next);
1548                         if (!stasis_app_is_core_event_source(obj)) {
1549                                 ast_module_unref(ast_module_info->self);
1550                         }
1551                         break;
1552                 }
1553         }
1554         AST_RWLIST_TRAVERSE_SAFE_END;
1555 }
1556
1557 /*!
1558  * \internal
1559  * \brief Convert event source data to JSON.
1560  *
1561  * Calls each event source that has a "to_json" handler allowing each
1562  * source to add data to the given JSON object.
1563  *
1564  * \param app application associated with the event source
1565  * \param json a json object to "fill"
1566  *
1567  * \retval The given json object.
1568  */
1569 static struct ast_json *app_event_sources_to_json(
1570         const struct stasis_app *app, struct ast_json *json)
1571 {
1572         struct stasis_app_event_source *source;
1573         SCOPED_LOCK(lock, &event_sources, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
1574         AST_LIST_TRAVERSE(&event_sources, source, next) {
1575                 if (source->to_json) {
1576                         source->to_json(app, json);
1577                 }
1578         }
1579         return json;
1580 }
1581
1582 static struct ast_json *stasis_app_object_to_json(struct stasis_app *app)
1583 {
1584         if (!app) {
1585                 return NULL;
1586         }
1587
1588         return app_event_sources_to_json(app, app_to_json(app));
1589 }
1590
1591 struct ast_json *stasis_app_to_json(const char *app_name)
1592 {
1593         RAII_VAR(struct stasis_app *, app, find_app_by_name(app_name), ao2_cleanup);
1594
1595         return stasis_app_object_to_json(app);
1596 }
1597
1598 /*!
1599  * \internal
1600  * \brief Finds an event source that matches a uri scheme.
1601  *
1602  * Uri(s) should begin with a particular scheme that can be matched
1603  * against an event source.
1604  *
1605  * \param uri uri containing a scheme to match
1606  *
1607  * \retval an event source if found, NULL otherwise.
1608  */
1609 static struct stasis_app_event_source *app_event_source_find(const char *uri)
1610 {
1611         struct stasis_app_event_source *source;
1612         SCOPED_LOCK(lock, &event_sources, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
1613         AST_LIST_TRAVERSE(&event_sources, source, next) {
1614                 if (ast_begins_with(uri, source->scheme)) {
1615                         return source;
1616                 }
1617         }
1618         return NULL;
1619 }
1620
1621 /*!
1622  * \internal
1623  * \brief Callback for subscription handling
1624  *
1625  * \param app [un]subscribing application
1626  * \param uri scheme:id of an event source
1627  * \param event_source being [un]subscribed [from]to
1628  *
1629  * \retval stasis_app_subscribe_res return code.
1630  */
1631 typedef enum stasis_app_subscribe_res (*app_subscription_handler)(
1632         struct stasis_app *app, const char *uri,
1633         struct stasis_app_event_source *event_source);
1634
1635 /*!
1636  * \internal
1637  * \brief Subscriptions handler for application [un]subscribing.
1638  *
1639  * \param app_name Name of the application to subscribe.
1640  * \param event_source_uris URIs for the event sources to subscribe to.
1641  * \param event_sources_count Array size of event_source_uris.
1642  * \param json Optional output pointer for JSON representation of the app
1643  *             after adding the subscription.
1644  * \param handler [un]subscribe handler
1645  *
1646  * \retval stasis_app_subscribe_res return code.
1647  */
1648 static enum stasis_app_subscribe_res app_handle_subscriptions(
1649         const char *app_name, const char **event_source_uris,
1650         int event_sources_count, struct ast_json **json,
1651         app_subscription_handler handler)
1652 {
1653         RAII_VAR(struct stasis_app *, app, find_app_by_name(app_name), ao2_cleanup);
1654         int i;
1655
1656         if (!app) {
1657                 return STASIS_ASR_APP_NOT_FOUND;
1658         }
1659
1660         for (i = 0; i < event_sources_count; ++i) {
1661                 const char *uri = event_source_uris[i];
1662                 enum stasis_app_subscribe_res res = STASIS_ASR_INTERNAL_ERROR;
1663                 struct stasis_app_event_source *event_source;
1664
1665                 if (!(event_source = app_event_source_find(uri))) {
1666                         ast_log(LOG_WARNING, "Invalid scheme: %s\n", uri);
1667                         return STASIS_ASR_EVENT_SOURCE_BAD_SCHEME;
1668                 }
1669
1670                 if (handler &&
1671                     ((res = handler(app, uri, event_source)))) {
1672                         return res;
1673                 }
1674         }
1675
1676         if (json) {
1677                 ast_debug(3, "%s: Successful; setting results\n", app_name);
1678                 *json = stasis_app_object_to_json(app);
1679         }
1680
1681         return STASIS_ASR_OK;
1682 }
1683
1684 enum stasis_app_subscribe_res stasis_app_subscribe_channel(const char *app_name,
1685         struct ast_channel *chan)
1686 {
1687         RAII_VAR(struct stasis_app *, app, find_app_by_name(app_name), ao2_cleanup);
1688         int res;
1689
1690         if (!app) {
1691                 return STASIS_ASR_APP_NOT_FOUND;
1692         }
1693
1694         ast_debug(3, "%s: Subscribing to %s\n", app_name, ast_channel_uniqueid(chan));
1695
1696         res = app_subscribe_channel(app, chan);
1697         if (res != 0) {
1698                 ast_log(LOG_ERROR, "Error subscribing app '%s' to channel '%s'\n",
1699                         app_name, ast_channel_uniqueid(chan));
1700                 return STASIS_ASR_INTERNAL_ERROR;
1701         }
1702
1703         return STASIS_ASR_OK;
1704 }
1705
1706
1707 /*!
1708  * \internal
1709  * \brief Subscribe an app to an event source.
1710  *
1711  * \param app subscribing application
1712  * \param uri scheme:id of an event source
1713  * \param event_source being subscribed to
1714  *
1715  * \retval stasis_app_subscribe_res return code.
1716  */
1717 static enum stasis_app_subscribe_res app_subscribe(
1718         struct stasis_app *app, const char *uri,
1719         struct stasis_app_event_source *event_source)
1720 {
1721         const char *app_name = stasis_app_name(app);
1722         RAII_VAR(void *, obj, NULL, ao2_cleanup);
1723
1724         ast_debug(3, "%s: Checking %s\n", app_name, uri);
1725
1726         if (!event_source->find ||
1727             (!(obj = event_source->find(app, uri + strlen(event_source->scheme))))) {
1728                 ast_log(LOG_WARNING, "Event source not found: %s\n", uri);
1729                 return STASIS_ASR_EVENT_SOURCE_NOT_FOUND;
1730         }
1731
1732         ast_debug(3, "%s: Subscribing to %s\n", app_name, uri);
1733
1734         if (!event_source->subscribe || (event_source->subscribe(app, obj))) {
1735                 ast_log(LOG_WARNING, "Error subscribing app '%s' to '%s'\n",
1736                         app_name, uri);
1737                 return STASIS_ASR_INTERNAL_ERROR;
1738         }
1739
1740         return STASIS_ASR_OK;
1741 }
1742
1743 enum stasis_app_subscribe_res stasis_app_subscribe(const char *app_name,
1744         const char **event_source_uris, int event_sources_count,
1745         struct ast_json **json)
1746 {
1747         return app_handle_subscriptions(
1748                 app_name, event_source_uris, event_sources_count,
1749                 json, app_subscribe);
1750 }
1751
1752 /*!
1753  * \internal
1754  * \brief Unsubscribe an app from an event source.
1755  *
1756  * \param app application to unsubscribe
1757  * \param uri scheme:id of an event source
1758  * \param event_source being unsubscribed from
1759  *
1760  * \retval stasis_app_subscribe_res return code.
1761  */
1762 static enum stasis_app_subscribe_res app_unsubscribe(
1763         struct stasis_app *app, const char *uri,
1764         struct stasis_app_event_source *event_source)
1765 {
1766         const char *app_name = stasis_app_name(app);
1767         const char *id = uri + strlen(event_source->scheme);
1768
1769         if (!event_source->is_subscribed ||
1770             (!event_source->is_subscribed(app, id))) {
1771                 return STASIS_ASR_EVENT_SOURCE_NOT_FOUND;
1772         }
1773
1774         ast_debug(3, "%s: Unsubscribing from %s\n", app_name, uri);
1775
1776         if (!event_source->unsubscribe || (event_source->unsubscribe(app, id))) {
1777                 ast_log(LOG_WARNING, "Error unsubscribing app '%s' to '%s'\n",
1778                         app_name, uri);
1779                 return -1;
1780         }
1781         return 0;
1782 }
1783
1784 enum stasis_app_subscribe_res stasis_app_unsubscribe(const char *app_name,
1785         const char **event_source_uris, int event_sources_count,
1786         struct ast_json **json)
1787 {
1788         return app_handle_subscriptions(
1789                 app_name, event_source_uris, event_sources_count,
1790                 json, app_unsubscribe);
1791 }
1792
1793 enum stasis_app_user_event_res stasis_app_user_event(const char *app_name,
1794         const char *event_name,
1795         const char **source_uris, int sources_count,
1796         struct ast_json *json_variables)
1797 {
1798         RAII_VAR(struct stasis_app *, app, find_app_by_name(app_name), ao2_cleanup);
1799         RAII_VAR(struct ast_json *, blob, NULL, ast_json_unref);
1800         RAII_VAR(struct ast_multi_object_blob *, multi, NULL, ao2_cleanup);
1801         RAII_VAR(void *, obj, NULL, ao2_cleanup);
1802         RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
1803         enum stasis_app_user_event_res res = STASIS_APP_USER_INTERNAL_ERROR;
1804         struct ast_json *json_value;
1805         int have_channel = 0;
1806         int i;
1807
1808         if (!app) {
1809                 ast_log(LOG_WARNING, "App %s not found\n", app_name);
1810                 return STASIS_APP_USER_APP_NOT_FOUND;
1811         }
1812
1813         if (!ast_multi_user_event_type()) {
1814                 return res;
1815         }
1816
1817         blob = json_variables;
1818         if (!blob) {
1819                 blob = ast_json_pack("{}");
1820         } else {
1821                 ast_json_ref(blob);
1822         }
1823         json_value = ast_json_string_create(event_name);
1824         if (!json_value) {
1825                 ast_log(LOG_ERROR, "unable to create json string\n");
1826                 return res;
1827         }
1828         if (ast_json_object_set(blob, "eventname", json_value)) {
1829                 ast_log(LOG_ERROR, "unable to set eventname to blob\n");
1830                 return res;
1831         }
1832
1833         multi = ast_multi_object_blob_create(blob);
1834
1835         for (i = 0; i < sources_count; ++i) {
1836                 const char *uri = source_uris[i];
1837                 void *snapshot=NULL;
1838                 enum stasis_user_multi_object_snapshot_type type;
1839
1840                 if (ast_begins_with(uri, "channel:")) {
1841                         type = STASIS_UMOS_CHANNEL;
1842                         snapshot = ast_channel_snapshot_get_latest(uri + 8);
1843                         have_channel = 1;
1844                 } else if (ast_begins_with(uri, "bridge:")) {
1845                         type = STASIS_UMOS_BRIDGE;
1846                         snapshot = ast_bridge_snapshot_get_latest(uri + 7);
1847                 } else if (ast_begins_with(uri, "endpoint:")) {
1848                         type = STASIS_UMOS_ENDPOINT;
1849                         snapshot = ast_endpoint_latest_snapshot(uri + 9, NULL);
1850                 } else {
1851                         ast_log(LOG_WARNING, "Invalid scheme: %s\n", uri);
1852                         return STASIS_APP_USER_EVENT_SOURCE_BAD_SCHEME;
1853                 }
1854                 if (!snapshot) {
1855                         ast_log(LOG_ERROR, "Unable to get snapshot for %s\n", uri);
1856                         return STASIS_APP_USER_EVENT_SOURCE_NOT_FOUND;
1857                 }
1858                 ast_multi_object_blob_add(multi, type, snapshot);
1859         }
1860
1861         message = stasis_message_create(ast_multi_user_event_type(), multi);
1862         if (!message) {
1863                 ast_log(LOG_ERROR, "Unable to create stasis user event message\n");
1864                 return res;
1865         }
1866
1867         /*
1868          * Publishing to two different topics is normally to be avoided -- except
1869          * in this case both are final destinations with no forwards (only listeners).
1870          * The message has to be delivered to the application topic for ARI, but a
1871          * copy is also delivered directly to the manager for AMI if there is a channel.
1872          */
1873         stasis_publish(ast_app_get_topic(app), message);
1874
1875         if (have_channel) {
1876                 stasis_publish(ast_manager_get_topic(), message);
1877         }
1878
1879         return STASIS_APP_USER_OK;
1880 }
1881
1882 void stasis_app_ref(void)
1883 {
1884         ast_module_ref(ast_module_info->self);
1885 }
1886
1887 void stasis_app_unref(void)
1888 {
1889         ast_module_unref(ast_module_info->self);
1890 }
1891
1892 static int unload_module(void)
1893 {
1894         stasis_app_unregister_event_sources();
1895
1896         messaging_cleanup();
1897
1898         cleanup();
1899         ao2_cleanup(apps_registry);
1900         apps_registry = NULL;
1901
1902         ao2_cleanup(app_controls);
1903         app_controls = NULL;
1904
1905         ao2_cleanup(app_bridges);
1906         app_bridges = NULL;
1907
1908         ao2_cleanup(app_bridges_moh);
1909         app_bridges_moh = NULL;
1910
1911         ao2_cleanup(app_bridges_playback);
1912         app_bridges_playback = NULL;
1913
1914         STASIS_MESSAGE_TYPE_CLEANUP(end_message_type);
1915         STASIS_MESSAGE_TYPE_CLEANUP(start_message_type);
1916
1917         return 0;
1918 }
1919
1920 /* \brief Sanitization callback for channel snapshots */
1921 static int channel_snapshot_sanitizer(const struct ast_channel_snapshot *snapshot)
1922 {
1923         if (!snapshot || !(snapshot->tech_properties & AST_CHAN_TP_INTERNAL)) {
1924                 return 0;
1925         }
1926         return 1;
1927 }
1928
1929 /* \brief Sanitization callback for channels */
1930 static int channel_sanitizer(const struct ast_channel *chan)
1931 {
1932         if (!chan || !(ast_channel_tech(chan)->properties & AST_CHAN_TP_INTERNAL)) {
1933                 return 0;
1934         }
1935         return 1;
1936 }
1937
1938 /* \brief Sanitization callback for channel unique IDs */
1939 static int channel_id_sanitizer(const char *id)
1940 {
1941         RAII_VAR(struct ast_channel_snapshot *, snapshot, ast_channel_snapshot_get_latest(id), ao2_cleanup);
1942
1943         return channel_snapshot_sanitizer(snapshot);
1944 }
1945
1946 /* \brief Sanitization callbacks for communication to Stasis applications */
1947 struct stasis_message_sanitizer app_sanitizer = {
1948         .channel_id = channel_id_sanitizer,
1949         .channel_snapshot = channel_snapshot_sanitizer,
1950         .channel = channel_sanitizer,
1951 };
1952
1953 struct stasis_message_sanitizer *stasis_app_get_sanitizer(void)
1954 {
1955         return &app_sanitizer;
1956 }
1957
1958 static const struct ast_datastore_info stasis_internal_channel_info = {
1959         .type = "stasis-internal-channel",
1960 };
1961
1962 static int set_internal_datastore(struct ast_channel *chan)
1963 {
1964         struct ast_datastore *datastore;
1965
1966         datastore = ast_channel_datastore_find(chan, &stasis_internal_channel_info, NULL);
1967         if (!datastore) {
1968                 datastore = ast_datastore_alloc(&stasis_internal_channel_info, NULL);
1969                 if (!datastore) {
1970                         return -1;
1971                 }
1972                 ast_channel_datastore_add(chan, datastore);
1973         }
1974         return 0;
1975 }
1976
1977 int stasis_app_channel_unreal_set_internal(struct ast_channel *chan)
1978 {
1979         struct ast_channel *outchan = NULL, *outowner = NULL;
1980         int res = 0;
1981         struct ast_unreal_pvt *unreal_pvt = ast_channel_tech_pvt(chan);
1982
1983         ao2_ref(unreal_pvt, +1);
1984         ast_unreal_lock_all(unreal_pvt, &outowner, &outchan);
1985         if (outowner) {
1986                 res |= set_internal_datastore(outowner);
1987                 ast_channel_unlock(outowner);
1988                 ast_channel_unref(outowner);
1989         }
1990         if (outchan) {
1991                 res |= set_internal_datastore(outchan);
1992                 ast_channel_unlock(outchan);
1993                 ast_channel_unref(outchan);
1994         }
1995         ao2_unlock(unreal_pvt);
1996         ao2_ref(unreal_pvt, -1);
1997         return res;
1998 }
1999
2000 int stasis_app_channel_set_internal(struct ast_channel *chan)
2001 {
2002         int res;
2003
2004         ast_channel_lock(chan);
2005         res = set_internal_datastore(chan);
2006         ast_channel_unlock(chan);
2007
2008         return res;
2009 }
2010
2011 int stasis_app_channel_is_internal(struct ast_channel *chan)
2012 {
2013         struct ast_datastore *datastore;
2014         int res = 0;
2015
2016         ast_channel_lock(chan);
2017         datastore = ast_channel_datastore_find(chan, &stasis_internal_channel_info, NULL);
2018         if (datastore) {
2019                 res = 1;
2020         }
2021         ast_channel_unlock(chan);
2022
2023         return res;
2024 }
2025
2026 static int load_module(void)
2027 {
2028         if (STASIS_MESSAGE_TYPE_INIT(start_message_type) != 0) {
2029                 return AST_MODULE_LOAD_DECLINE;
2030         }
2031         if (STASIS_MESSAGE_TYPE_INIT(end_message_type) != 0) {
2032                 return AST_MODULE_LOAD_DECLINE;
2033         }
2034         apps_registry = ao2_container_alloc(APPS_NUM_BUCKETS, app_hash, app_compare);
2035         app_controls = ao2_container_alloc(CONTROLS_NUM_BUCKETS, control_hash, control_compare);
2036         app_bridges = ao2_container_alloc(BRIDGES_NUM_BUCKETS, bridges_hash, bridges_compare);
2037         app_bridges_moh = ao2_container_alloc_hash(
2038                 AO2_ALLOC_OPT_LOCK_MUTEX, AO2_CONTAINER_ALLOC_OPT_DUPS_REJECT,
2039                 37, bridges_channel_hash_fn, bridges_channel_sort_fn, NULL);
2040         app_bridges_playback = ao2_container_alloc_hash(
2041                 AO2_ALLOC_OPT_LOCK_MUTEX, AO2_CONTAINER_ALLOC_OPT_DUPS_REJECT,
2042                 37, bridges_channel_hash_fn, bridges_channel_sort_fn, NULL);
2043         if (!apps_registry || !app_controls || !app_bridges || !app_bridges_moh || !app_bridges_playback) {
2044                 unload_module();
2045                 return AST_MODULE_LOAD_FAILURE;
2046         }
2047
2048         if (messaging_init()) {
2049                 unload_module();
2050                 return AST_MODULE_LOAD_FAILURE;
2051         }
2052
2053         bridge_stasis_init();
2054
2055         stasis_app_register_event_sources();
2056
2057         return AST_MODULE_LOAD_SUCCESS;
2058 }
2059
2060 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS, "Stasis application support",
2061         .support_level = AST_MODULE_SUPPORT_CORE,
2062         .load = load_module,
2063         .unload = unload_module,
2064         );