691462722e549eea34944b771ec83a2f41ce0000
[asterisk/asterisk.git] / res / res_stasis.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2012 - 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis application support.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  *
25  * <code>res_stasis.so</code> brings together the various components of the
26  * Stasis application infrastructure.
27  *
28  * First, there's the Stasis application handler, stasis_app_exec(). This is
29  * called by <code>app_stasis.so</code> to give control of a channel to the
30  * Stasis application code from the dialplan.
31  *
32  * While a channel is in stasis_app_exec(), it has a \ref stasis_app_control
33  * object, which may be used to control the channel.
34  *
35  * To control the channel, commands may be sent to channel using
36  * stasis_app_send_command() and stasis_app_send_async_command().
37  *
38  * Alongside this, applications may be registered/unregistered using
39  * stasis_app_register()/stasis_app_unregister(). While a channel is in Stasis,
40  * events received on the channel's topic are converted to JSON and forwarded to
41  * the \ref stasis_app_cb. The application may also subscribe to the channel to
42  * continue to receive messages even after the channel has left Stasis, but it
43  * will not be able to control it.
44  *
45  * Given all the stuff that comes together in this module, it's been broken up
46  * into several pieces that are in <code>res/stasis/</code> and compiled into
47  * <code>res_stasis.so</code>.
48  */
49
50 /*** MODULEINFO
51         <support_level>core</support_level>
52  ***/
53
54 #include "asterisk.h"
55
56 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
57
58 #include "asterisk/astobj2.h"
59 #include "asterisk/callerid.h"
60 #include "asterisk/module.h"
61 #include "asterisk/stasis_app_impl.h"
62 #include "asterisk/stasis_channels.h"
63 #include "asterisk/stasis_bridges.h"
64 #include "asterisk/stasis_message_router.h"
65 #include "asterisk/strings.h"
66 #include "stasis/app.h"
67 #include "stasis/control.h"
68 #include "asterisk/core_unreal.h"
69 #include "asterisk/musiconhold.h"
70 #include "asterisk/causes.h"
71 #include "asterisk/stringfields.h"
72 #include "asterisk/bridge_after.h"
73
74 /*! Time to wait for a frame in the application */
75 #define MAX_WAIT_MS 200
76
77 /*!
78  * \brief Number of buckets for the Stasis application hash table.  Remember to
79  * keep it a prime number!
80  */
81 #define APPS_NUM_BUCKETS 127
82
83 /*!
84  * \brief Number of buckets for the Stasis application hash table.  Remember to
85  * keep it a prime number!
86  */
87 #define CONTROLS_NUM_BUCKETS 127
88
89 /*!
90  * \brief Number of buckets for the Stasis bridges hash table.  Remember to
91  * keep it a prime number!
92  */
93 #define BRIDGES_NUM_BUCKETS 127
94
95 /*!
96  * \brief Stasis application container.
97  */
98 struct ao2_container *apps_registry;
99
100 struct ao2_container *app_controls;
101
102 struct ao2_container *app_bridges;
103
104 struct ao2_container *app_bridges_moh;
105
106 const char *stasis_app_name(const struct stasis_app *app)
107 {
108         return app_name(app);
109 }
110
111 /*! AO2 hash function for \ref app */
112 static int app_hash(const void *obj, const int flags)
113 {
114         const struct stasis_app *app;
115         const char *key;
116
117         switch (flags & OBJ_SEARCH_MASK) {
118         case OBJ_SEARCH_KEY:
119                 key = obj;
120                 break;
121         case OBJ_SEARCH_OBJECT:
122                 app = obj;
123                 key = stasis_app_name(app);
124                 break;
125         default:
126                 /* Hash can only work on something with a full key. */
127                 ast_assert(0);
128                 return 0;
129         }
130         return ast_str_hash(key);
131 }
132
133 /*! AO2 comparison function for \ref app */
134 static int app_compare(void *obj, void *arg, int flags)
135 {
136         const struct stasis_app *object_left = obj;
137         const struct stasis_app *object_right = arg;
138         const char *right_key = arg;
139         int cmp;
140
141         switch (flags & OBJ_SEARCH_MASK) {
142         case OBJ_SEARCH_OBJECT:
143                 right_key = stasis_app_name(object_right);
144                 /* Fall through */
145         case OBJ_SEARCH_KEY:
146                 cmp = strcmp(stasis_app_name(object_left), right_key);
147                 break;
148         case OBJ_SEARCH_PARTIAL_KEY:
149                 /*
150                  * We could also use a partial key struct containing a length
151                  * so strlen() does not get called for every comparison instead.
152                  */
153                 cmp = strncmp(stasis_app_name(object_left), right_key, strlen(right_key));
154                 break;
155         default:
156                 /*
157                  * What arg points to is specific to this traversal callback
158                  * and has no special meaning to astobj2.
159                  */
160                 cmp = 0;
161                 break;
162         }
163         if (cmp) {
164                 return 0;
165         }
166         /*
167          * At this point the traversal callback is identical to a sorted
168          * container.
169          */
170         return CMP_MATCH;
171 }
172
173 /*! AO2 hash function for \ref stasis_app_control */
174 static int control_hash(const void *obj, const int flags)
175 {
176         const struct stasis_app_control *control;
177         const char *key;
178
179         switch (flags & OBJ_SEARCH_MASK) {
180         case OBJ_SEARCH_KEY:
181                 key = obj;
182                 break;
183         case OBJ_SEARCH_OBJECT:
184                 control = obj;
185                 key = stasis_app_control_get_channel_id(control);
186                 break;
187         default:
188                 /* Hash can only work on something with a full key. */
189                 ast_assert(0);
190                 return 0;
191         }
192         return ast_str_hash(key);
193 }
194
195 /*! AO2 comparison function for \ref stasis_app_control */
196 static int control_compare(void *obj, void *arg, int flags)
197 {
198         const struct stasis_app_control *object_left = obj;
199         const struct stasis_app_control *object_right = arg;
200         const char *right_key = arg;
201         int cmp;
202
203         switch (flags & OBJ_SEARCH_MASK) {
204         case OBJ_SEARCH_OBJECT:
205                 right_key = stasis_app_control_get_channel_id(object_right);
206                 /* Fall through */
207         case OBJ_SEARCH_KEY:
208                 cmp = strcmp(stasis_app_control_get_channel_id(object_left), right_key);
209                 break;
210         case OBJ_SEARCH_PARTIAL_KEY:
211                 /*
212                  * We could also use a partial key struct containing a length
213                  * so strlen() does not get called for every comparison instead.
214                  */
215                 cmp = strncmp(stasis_app_control_get_channel_id(object_left), right_key, strlen(right_key));
216                 break;
217         default:
218                 /*
219                  * What arg points to is specific to this traversal callback
220                  * and has no special meaning to astobj2.
221                  */
222                 cmp = 0;
223                 break;
224         }
225         if (cmp) {
226                 return 0;
227         }
228         /*
229          * At this point the traversal callback is identical to a sorted
230          * container.
231          */
232         return CMP_MATCH;
233 }
234
235 static int cleanup_cb(void *obj, void *arg, int flags)
236 {
237         struct stasis_app *app = obj;
238
239         if (!app_is_finished(app)) {
240                 return 0;
241         }
242
243         ast_verb(1, "Shutting down application '%s'\n", stasis_app_name(app));
244         app_shutdown(app);
245
246         return CMP_MATCH;
247
248 }
249
250 /*!
251  * \brief Clean up any old apps that we don't need any more.
252  */
253 static void cleanup(void)
254 {
255         ao2_callback(apps_registry, OBJ_MULTIPLE | OBJ_NODATA | OBJ_UNLINK,
256                 cleanup_cb, NULL);
257 }
258
259 struct stasis_app_control *stasis_app_control_create(struct ast_channel *chan)
260 {
261         return control_create(chan);
262 }
263
264 struct stasis_app_control *stasis_app_control_find_by_channel(
265         const struct ast_channel *chan)
266 {
267         if (chan == NULL) {
268                 return NULL;
269         }
270
271         return stasis_app_control_find_by_channel_id(
272                 ast_channel_uniqueid(chan));
273 }
274
275 struct stasis_app_control *stasis_app_control_find_by_channel_id(
276         const char *channel_id)
277 {
278         return ao2_find(app_controls, channel_id, OBJ_SEARCH_KEY);
279 }
280
281 /*! AO2 hash function for bridges container  */
282 static int bridges_hash(const void *obj, const int flags)
283 {
284         const struct ast_bridge *bridge;
285         const char *key;
286
287         switch (flags & OBJ_SEARCH_MASK) {
288         case OBJ_SEARCH_KEY:
289                 key = obj;
290                 break;
291         case OBJ_SEARCH_OBJECT:
292                 bridge = obj;
293                 key = bridge->uniqueid;
294                 break;
295         default:
296                 /* Hash can only work on something with a full key. */
297                 ast_assert(0);
298                 return 0;
299         }
300         return ast_str_hash(key);
301 }
302
303 /*! AO2 comparison function for bridges container */
304 static int bridges_compare(void *obj, void *arg, int flags)
305 {
306         const struct ast_bridge *object_left = obj;
307         const struct ast_bridge *object_right = arg;
308         const char *right_key = arg;
309         int cmp;
310
311         switch (flags & OBJ_SEARCH_MASK) {
312         case OBJ_SEARCH_OBJECT:
313                 right_key = object_right->uniqueid;
314                 /* Fall through */
315         case OBJ_SEARCH_KEY:
316                 cmp = strcmp(object_left->uniqueid, right_key);
317                 break;
318         case OBJ_SEARCH_PARTIAL_KEY:
319                 /*
320                  * We could also use a partial key struct containing a length
321                  * so strlen() does not get called for every comparison instead.
322                  */
323                 cmp = strncmp(object_left->uniqueid, right_key, strlen(right_key));
324                 break;
325         default:
326                 /*
327                  * What arg points to is specific to this traversal callback
328                  * and has no special meaning to astobj2.
329                  */
330                 cmp = 0;
331                 break;
332         }
333         if (cmp) {
334                 return 0;
335         }
336         /*
337          * At this point the traversal callback is identical to a sorted
338          * container.
339          */
340         return CMP_MATCH;
341 }
342
343 /*!
344  *  Used with app_bridges_moh, provides links between bridges and existing music
345  *  on hold channels that are being used with them.
346  */
347 struct stasis_app_bridge_moh_wrapper {
348         AST_DECLARE_STRING_FIELDS(
349                 AST_STRING_FIELD(channel_id);
350                 AST_STRING_FIELD(bridge_id);
351         );
352 };
353
354 static void stasis_app_bridge_moh_wrapper_destructor(void *obj)
355 {
356         struct stasis_app_bridge_moh_wrapper *wrapper = obj;
357         ast_string_field_free_memory(wrapper);
358 }
359
360 /*! AO2 hash function for the bridges moh container */
361 static int bridges_moh_hash_fn(const void *obj, const int flags)
362 {
363         const struct stasis_app_bridge_moh_wrapper *wrapper;
364         const char *key;
365
366         switch (flags & OBJ_SEARCH_MASK) {
367         case OBJ_SEARCH_KEY:
368                 key = obj;
369                 break;
370         case OBJ_SEARCH_OBJECT:
371                 wrapper = obj;
372                 key = wrapper->bridge_id;
373                 break;
374         default:
375                 /* Hash can only work on something with a full key. */
376                 ast_assert(0);
377                 return 0;
378         }
379         return ast_str_hash(key);
380 }
381
382 static int bridges_moh_sort_fn(const void *obj_left, const void *obj_right, const int flags)
383 {
384         const struct stasis_app_bridge_moh_wrapper *left = obj_left;
385         const struct stasis_app_bridge_moh_wrapper *right = obj_right;
386         const char *right_key = obj_right;
387         int cmp;
388
389         switch (flags & OBJ_SEARCH_MASK) {
390         case OBJ_SEARCH_OBJECT:
391                 right_key = right->bridge_id;
392                 /* Fall through */
393         case OBJ_SEARCH_KEY:
394                 cmp = strcmp(left->bridge_id, right_key);
395                 break;
396         case OBJ_SEARCH_PARTIAL_KEY:
397                 cmp = strncmp(left->bridge_id, right_key, strlen(right_key));
398                 break;
399         default:
400                 /* Sort can only work on something with a full or partial key. */
401                 ast_assert(0);
402                 cmp = 0;
403                 break;
404         }
405         return cmp;
406 }
407
408 /*! Removes the bridge to music on hold channel link */
409 static void remove_bridge_moh(char *bridge_id)
410 {
411         ao2_find(app_bridges_moh, bridge_id, OBJ_SEARCH_KEY | OBJ_UNLINK | OBJ_NODATA);
412         ast_free(bridge_id);
413 }
414
415 /*! After bridge failure callback for moh channels */
416 static void moh_after_bridge_cb_failed(enum ast_bridge_after_cb_reason reason, void *data)
417 {
418         char *bridge_id = data;
419
420         remove_bridge_moh(bridge_id);
421 }
422
423 /*! After bridge callback for moh channels */
424 static void moh_after_bridge_cb(struct ast_channel *chan, void *data)
425 {
426         char *bridge_id = data;
427
428         remove_bridge_moh(bridge_id);
429 }
430
431 /*! Request a bridge MOH channel */
432 static struct ast_channel *prepare_bridge_moh_channel(void)
433 {
434         RAII_VAR(struct ast_format_cap *, cap, NULL, ast_format_cap_destroy);
435         struct ast_format format;
436
437         cap = ast_format_cap_alloc(AST_FORMAT_CAP_FLAG_NOLOCK);
438         if (!cap) {
439                 return NULL;
440         }
441
442         ast_format_cap_add(cap, ast_format_set(&format, AST_FORMAT_SLINEAR, 0));
443
444         return ast_request("Announcer", cap, NULL, "ARI_MOH", NULL);
445 }
446
447 /*! Provides the moh channel with a thread so it can actually play its music */
448 static void *moh_channel_thread(void *data)
449 {
450         struct ast_channel *moh_channel = data;
451
452         while (!ast_safe_sleep(moh_channel, 1000)) {
453         }
454
455         ast_moh_stop(moh_channel);
456         ast_hangup(moh_channel);
457
458         return NULL;
459 }
460
461 /*!
462  * \internal
463  * \brief Creates, pushes, and links a channel for playing music on hold to bridge
464  *
465  * \param bridge Which bridge this moh channel exists for
466  *
467  * \retval NULL if the channel could not be created, pushed, or linked
468  * \retval Reference to the channel on success
469  */
470 static struct ast_channel *bridge_moh_create(struct ast_bridge *bridge)
471 {
472         RAII_VAR(struct stasis_app_bridge_moh_wrapper *, new_wrapper, NULL, ao2_cleanup);
473         RAII_VAR(char *, bridge_id, ast_strdup(bridge->uniqueid), ast_free);
474         struct ast_channel *chan;
475         pthread_t threadid;
476
477         if (!bridge_id) {
478                 return NULL;
479         }
480
481         chan = prepare_bridge_moh_channel();
482         if (!chan) {
483                 return NULL;
484         }
485
486         /* The after bridge callback assumes responsibility of the bridge_id. */
487         if (ast_bridge_set_after_callback(chan,
488                 moh_after_bridge_cb, moh_after_bridge_cb_failed, bridge_id)) {
489                 ast_hangup(chan);
490                 return NULL;
491         }
492         bridge_id = NULL;
493
494         if (ast_unreal_channel_push_to_bridge(chan, bridge,
495                 AST_BRIDGE_CHANNEL_FLAG_IMMOVABLE | AST_BRIDGE_CHANNEL_FLAG_LONELY)) {
496                 ast_hangup(chan);
497                 return NULL;
498         }
499
500         new_wrapper = ao2_alloc_options(sizeof(*new_wrapper),
501                 stasis_app_bridge_moh_wrapper_destructor, AO2_ALLOC_OPT_LOCK_NOLOCK);
502         if (!new_wrapper) {
503                 ast_hangup(chan);
504                 return NULL;
505         }
506
507         if (ast_string_field_init(new_wrapper, 32)) {
508                 ast_hangup(chan);
509                 return NULL;
510         }
511         ast_string_field_set(new_wrapper, bridge_id, bridge->uniqueid);
512         ast_string_field_set(new_wrapper, channel_id, ast_channel_uniqueid(chan));
513
514         if (!ao2_link_flags(app_bridges_moh, new_wrapper, OBJ_NOLOCK)) {
515                 ast_hangup(chan);
516                 return NULL;
517         }
518
519         if (ast_pthread_create_detached(&threadid, NULL, moh_channel_thread, chan)) {
520                 ast_log(LOG_ERROR, "Failed to create channel thread. Abandoning MOH channel creation.\n");
521                 ao2_unlink_flags(app_bridges_moh, new_wrapper, OBJ_NOLOCK);
522                 ast_hangup(chan);
523                 return NULL;
524         }
525
526         return chan;
527 }
528
529 struct ast_channel *stasis_app_bridge_moh_channel(struct ast_bridge *bridge)
530 {
531         RAII_VAR(struct stasis_app_bridge_moh_wrapper *, moh_wrapper, NULL, ao2_cleanup);
532
533         {
534                 SCOPED_AO2LOCK(lock, app_bridges_moh);
535
536                 moh_wrapper = ao2_find(app_bridges_moh, bridge->uniqueid, OBJ_SEARCH_KEY | OBJ_NOLOCK);
537                 if (!moh_wrapper) {
538                         return bridge_moh_create(bridge);
539                 }
540         }
541
542         return ast_channel_get_by_name(moh_wrapper->channel_id);
543 }
544
545 int stasis_app_bridge_moh_stop(struct ast_bridge *bridge)
546 {
547         RAII_VAR(struct stasis_app_bridge_moh_wrapper *, moh_wrapper, NULL, ao2_cleanup);
548         struct ast_channel *chan;
549
550         moh_wrapper = ao2_find(app_bridges_moh, bridge->uniqueid, OBJ_SEARCH_KEY | OBJ_UNLINK);
551         if (!moh_wrapper) {
552                 return -1;
553         }
554
555         chan = ast_channel_get_by_name(moh_wrapper->channel_id);
556         if (!chan) {
557                 return -1;
558         }
559
560         ast_moh_stop(chan);
561         ast_softhangup(chan, AST_CAUSE_NORMAL_CLEARING);
562         ao2_cleanup(chan);
563
564         return 0;
565 }
566
567 struct ast_bridge *stasis_app_bridge_find_by_id(
568         const char *bridge_id)
569 {
570         return ao2_find(app_bridges, bridge_id, OBJ_SEARCH_KEY);
571 }
572
573
574 /*!
575  * \brief In addition to running ao2_cleanup(), this function also removes the
576  * object from the app_controls container.
577  */
578 static void control_unlink(struct stasis_app_control *control)
579 {
580         if (!control) {
581                 return;
582         }
583
584         ao2_unlink(app_controls, control);
585         ao2_cleanup(control);
586 }
587
588 struct ast_bridge *stasis_app_bridge_create(const char *type)
589 {
590         struct ast_bridge *bridge;
591         int capabilities;
592         int flags = AST_BRIDGE_FLAG_MERGE_INHIBIT_FROM | AST_BRIDGE_FLAG_MERGE_INHIBIT_TO
593                 | AST_BRIDGE_FLAG_SWAP_INHIBIT_FROM | AST_BRIDGE_FLAG_SWAP_INHIBIT_TO
594                 | AST_BRIDGE_FLAG_TRANSFER_PROHIBITED;
595
596         if (ast_strlen_zero(type) || !strcmp(type, "mixing")) {
597                 capabilities = AST_BRIDGE_CAPABILITY_1TO1MIX |
598                         AST_BRIDGE_CAPABILITY_MULTIMIX |
599                         AST_BRIDGE_CAPABILITY_NATIVE;
600                 flags |= AST_BRIDGE_FLAG_SMART;
601         } else if (!strcmp(type, "holding")) {
602                 capabilities = AST_BRIDGE_CAPABILITY_HOLDING;
603         } else {
604                 return NULL;
605         }
606
607         bridge = ast_bridge_base_new(capabilities, flags);
608         if (bridge) {
609                 if (!ao2_link(app_bridges, bridge)) {
610                         ast_bridge_destroy(bridge, 0);
611                         bridge = NULL;
612                 }
613         }
614         return bridge;
615 }
616
617 void stasis_app_bridge_destroy(const char *bridge_id)
618 {
619         struct ast_bridge *bridge = stasis_app_bridge_find_by_id(bridge_id);
620         if (!bridge) {
621                 return;
622         }
623         ao2_unlink(app_bridges, bridge);
624         ast_bridge_destroy(bridge, 0);
625 }
626
627 static int send_start_msg(struct stasis_app *app, struct ast_channel *chan,
628         int argc, char *argv[])
629 {
630         RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
631         RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
632
633         struct ast_json *json_args;
634         int i;
635         struct stasis_message_sanitizer *sanitize = stasis_app_get_sanitizer();
636
637         ast_assert(chan != NULL);
638
639         /* Set channel info */
640         snapshot = ast_channel_snapshot_create(chan);
641         if (!snapshot) {
642                 return -1;
643         }
644
645         if (sanitize && sanitize->channel_snapshot
646                 && sanitize->channel_snapshot(snapshot)) {
647                 return 0;
648         }
649
650         msg = ast_json_pack("{s: s, s: o, s: [], s: o}",
651                 "type", "StasisStart",
652                 "timestamp", ast_json_timeval(ast_tvnow(), NULL),
653                 "args",
654                 "channel", ast_channel_snapshot_to_json(snapshot, NULL));
655         if (!msg) {
656                 return -1;
657         }
658
659         /* Append arguments to args array */
660         json_args = ast_json_object_get(msg, "args");
661         ast_assert(json_args != NULL);
662         for (i = 0; i < argc; ++i) {
663                 int r = ast_json_array_append(json_args,
664                                               ast_json_string_create(argv[i]));
665                 if (r != 0) {
666                         ast_log(LOG_ERROR, "Error appending start message\n");
667                         return -1;
668                 }
669         }
670
671         app_send(app, msg);
672         return 0;
673 }
674
675 static int send_end_msg(struct stasis_app *app, struct ast_channel *chan)
676 {
677         RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
678         RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
679         struct stasis_message_sanitizer *sanitize = stasis_app_get_sanitizer();
680
681         ast_assert(chan != NULL);
682
683         /* Set channel info */
684         snapshot = ast_channel_snapshot_create(chan);
685         if (snapshot == NULL) {
686                 return -1;
687         }
688
689         if (sanitize && sanitize->channel_snapshot
690                 && sanitize->channel_snapshot(snapshot)) {
691                 return 0;
692         }
693
694         msg = ast_json_pack("{s: s, s: o, s: o}",
695                 "type", "StasisEnd",
696                 "timestamp", ast_json_timeval(ast_tvnow(), NULL),
697                 "channel", ast_channel_snapshot_to_json(snapshot, NULL));
698         if (!msg) {
699                 return -1;
700         }
701
702         app_send(app, msg);
703         return 0;
704 }
705
706 void stasis_app_control_execute_until_exhausted(struct ast_channel *chan, struct stasis_app_control *control)
707 {
708         while (!control_is_done(control)) {
709                 int command_count = control_dispatch_all(control, chan);
710                 if (command_count == 0 || ast_channel_fdno(chan) == -1) {
711                         break;
712                 }
713         }
714 }
715
716 /*! /brief Stasis dialplan application callback */
717 int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc,
718                     char *argv[])
719 {
720         SCOPED_MODULE_USE(ast_module_info->self);
721
722         RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
723         RAII_VAR(struct stasis_app_control *, control, NULL, control_unlink);
724         struct ast_bridge *last_bridge = NULL;
725         int res = 0;
726
727         ast_assert(chan != NULL);
728
729         app = ao2_find(apps_registry, app_name, OBJ_SEARCH_KEY);
730         if (!app) {
731                 ast_log(LOG_ERROR,
732                         "Stasis app '%s' not registered\n", app_name);
733                 return -1;
734         }
735         if (!app_is_active(app)) {
736                 ast_log(LOG_ERROR,
737                         "Stasis app '%s' not active\n", app_name);
738                 return -1;
739         }
740
741         control = control_create(chan);
742         if (!control) {
743                 ast_log(LOG_ERROR, "Allocated failed\n");
744                 return -1;
745         }
746         ao2_link(app_controls, control);
747
748         res = send_start_msg(app, chan, argc, argv);
749         if (res != 0) {
750                 ast_log(LOG_ERROR,
751                         "Error sending start message to '%s'\n", app_name);
752                 return -1;
753         }
754
755         res = app_subscribe_channel(app, chan);
756         if (res != 0) {
757                 ast_log(LOG_ERROR, "Error subscribing app '%s' to channel '%s'\n",
758                         app_name, ast_channel_name(chan));
759                 return -1;
760         }
761
762         while (!control_is_done(control)) {
763                 RAII_VAR(struct ast_frame *, f, NULL, ast_frame_dtor);
764                 int r;
765                 int command_count;
766                 struct ast_bridge *bridge = NULL;
767
768                 /* Check to see if a bridge absorbed our hangup frame */
769                 if (ast_check_hangup_locked(chan)) {
770                         break;
771                 }
772
773                 last_bridge = bridge;
774                 bridge = stasis_app_get_bridge(control);
775
776                 if (bridge != last_bridge) {
777                         app_unsubscribe_bridge(app, last_bridge);
778                         app_subscribe_bridge(app, bridge);
779                 }
780
781                 if (bridge) {
782                         /* Bridge is handling channel frames */
783                         control_wait(control);
784                         control_dispatch_all(control, chan);
785                         continue;
786                 }
787
788                 r = ast_waitfor(chan, MAX_WAIT_MS);
789
790                 if (r < 0) {
791                         ast_debug(3, "%s: Poll error\n",
792                                   ast_channel_uniqueid(chan));
793                         break;
794                 }
795
796                 command_count = control_dispatch_all(control, chan);
797
798                 if (command_count > 0 && ast_channel_fdno(chan) == -1) {
799                         /* Command drained the channel; wait for next frame */
800                         continue;
801                 }
802
803                 if (r == 0) {
804                         /* Timeout */
805                         continue;
806                 }
807
808                 f = ast_read(chan);
809                 if (!f) {
810                         /* Continue on in the dialplan */
811                         ast_debug(3, "%s: Hangup (no more frames)\n",
812                                 ast_channel_uniqueid(chan));
813                         break;
814                 }
815
816                 if (f->frametype == AST_FRAME_CONTROL) {
817                         if (f->subclass.integer == AST_CONTROL_HANGUP) {
818                                 /* Continue on in the dialplan */
819                                 ast_debug(3, "%s: Hangup\n",
820                                         ast_channel_uniqueid(chan));
821                                 break;
822                         }
823                 }
824         }
825
826         app_unsubscribe_bridge(app, stasis_app_get_bridge(control));
827         app_unsubscribe_channel(app, chan);
828
829         res = send_end_msg(app, chan);
830         if (res != 0) {
831                 ast_log(LOG_ERROR,
832                         "Error sending end message to %s\n", app_name);
833                 return res;
834         }
835
836         /* There's an off chance that app is ready for cleanup. Go ahead
837          * and clean up, just in case
838          */
839         cleanup();
840
841         return res;
842 }
843
844 int stasis_app_send(const char *app_name, struct ast_json *message)
845 {
846         RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
847
848         app = ao2_find(apps_registry, app_name, OBJ_SEARCH_KEY);
849         if (!app) {
850                 /* XXX We can do a better job handling late binding, queueing up
851                  * the call for a few seconds to wait for the app to register.
852                  */
853                 ast_log(LOG_WARNING,
854                         "Stasis app '%s' not registered\n", app_name);
855                 return -1;
856         }
857         app_send(app, message);
858         return 0;
859 }
860
861 static struct stasis_app *find_app_by_name(const char *app_name)
862 {
863         struct stasis_app *res = NULL;
864
865         if (!ast_strlen_zero(app_name)) {
866                 res = ao2_find(apps_registry, app_name, OBJ_SEARCH_KEY);
867         }
868
869         if (!res) {
870                 ast_log(LOG_WARNING, "Could not find app '%s'\n",
871                         app_name ? : "(null)");
872         }
873         return res;
874 }
875
876 static int append_name(void *obj, void *arg, int flags)
877 {
878         struct stasis_app *app = obj;
879         struct ao2_container *apps = arg;
880
881         ast_str_container_add(apps, stasis_app_name(app));
882         return 0;
883 }
884
885 struct ao2_container *stasis_app_get_all(void)
886 {
887         RAII_VAR(struct ao2_container *, apps, NULL, ao2_cleanup);
888
889         apps = ast_str_container_alloc(1);
890         if (!apps) {
891                 return NULL;
892         }
893
894         ao2_callback(apps_registry, OBJ_NODATA, append_name, apps);
895
896         return ao2_bump(apps);
897 }
898
899 int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data)
900 {
901         RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
902
903         SCOPED_LOCK(apps_lock, apps_registry, ao2_lock, ao2_unlock);
904
905         app = ao2_find(apps_registry, app_name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
906         if (app) {
907                 app_update(app, handler, data);
908         } else {
909                 app = app_create(app_name, handler, data);
910                 if (app) {
911                         ao2_link_flags(apps_registry, app, OBJ_NOLOCK);
912                 } else {
913                         return -1;
914                 }
915         }
916
917         /* We lazily clean up the apps_registry, because it's good enough to
918          * prevent memory leaks, and we're lazy.
919          */
920         cleanup();
921         return 0;
922 }
923
924 void stasis_app_unregister(const char *app_name)
925 {
926         RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
927
928         if (!app_name) {
929                 return;
930         }
931
932         app = ao2_find(apps_registry, app_name, OBJ_SEARCH_KEY);
933         if (!app) {
934                 ast_log(LOG_ERROR,
935                         "Stasis app '%s' not registered\n", app_name);
936                 return;
937         }
938
939         app_deactivate(app);
940
941         /* There's a decent chance that app is ready for cleanup. Go ahead
942          * and clean up, just in case
943          */
944         cleanup();
945 }
946
947 /*!
948  * \internal \brief List of registered event sources.
949  */
950 AST_RWLIST_HEAD_STATIC(event_sources, stasis_app_event_source);
951
952 void stasis_app_register_event_source(struct stasis_app_event_source *obj)
953 {
954         SCOPED_LOCK(lock, &event_sources, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
955         AST_LIST_INSERT_TAIL(&event_sources, obj, next);
956         /* only need to bump the module ref on non-core sources because the
957            core ones are [un]registered by this module. */
958         if (!stasis_app_is_core_event_source(obj)) {
959                 ast_module_ref(ast_module_info->self);
960         }
961 }
962
963 void stasis_app_unregister_event_source(struct stasis_app_event_source *obj)
964 {
965         struct stasis_app_event_source *source;
966         SCOPED_LOCK(lock, &event_sources, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
967         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&event_sources, source, next) {
968                 if (source == obj) {
969                         AST_RWLIST_REMOVE_CURRENT(next);
970                         if (!stasis_app_is_core_event_source(obj)) {
971                                 ast_module_unref(ast_module_info->self);
972                         }
973                         break;
974                 }
975         }
976         AST_RWLIST_TRAVERSE_SAFE_END;
977 }
978
979 /*!
980  * \internal
981  * \brief Convert event source data to JSON.
982  *
983  * Calls each event source that has a "to_json" handler allowing each
984  * source to add data to the given JSON object.
985  *
986  * \param app application associated with the event source
987  * \param json a json object to "fill"
988  *
989  * \retval The given json object.
990  */
991 static struct ast_json *app_event_sources_to_json(
992         const struct stasis_app *app, struct ast_json *json)
993 {
994         struct stasis_app_event_source *source;
995         SCOPED_LOCK(lock, &event_sources, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
996         AST_LIST_TRAVERSE(&event_sources, source, next) {
997                 if (source->to_json) {
998                         source->to_json(app, json);
999                 }
1000         }
1001         return json;
1002 }
1003
1004 static struct ast_json *stasis_app_object_to_json(struct stasis_app *app)
1005 {
1006         if (!app) {
1007                 return NULL;
1008         }
1009
1010         return app_event_sources_to_json(app, app_to_json(app));
1011 }
1012
1013 struct ast_json *stasis_app_to_json(const char *app_name)
1014 {
1015         RAII_VAR(struct stasis_app *, app, find_app_by_name(app_name), ao2_cleanup);
1016
1017         return stasis_app_object_to_json(app);
1018 }
1019
1020 /*!
1021  * \internal
1022  * \brief Finds an event source that matches a uri scheme.
1023  *
1024  * Uri(s) should begin with a particular scheme that can be matched
1025  * against an event source.
1026  *
1027  * \param uri uri containing a scheme to match
1028  *
1029  * \retval an event source if found, NULL otherwise.
1030  */
1031 static struct stasis_app_event_source *app_event_source_find(const char *uri)
1032 {
1033         struct stasis_app_event_source *source;
1034         SCOPED_LOCK(lock, &event_sources, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
1035         AST_LIST_TRAVERSE(&event_sources, source, next) {
1036                 if (ast_begins_with(uri, source->scheme)) {
1037                         return source;
1038                 }
1039         }
1040         return NULL;
1041 }
1042
1043 /*!
1044  * \internal
1045  * \brief Callback for subscription handling
1046  *
1047  * \param app [un]subscribing application
1048  * \param uri scheme:id of an event source
1049  * \param event_source being [un]subscribed [from]to
1050  *
1051  * \retval stasis_app_subscribe_res return code.
1052  */
1053 typedef enum stasis_app_subscribe_res (*app_subscription_handler)(
1054         struct stasis_app *app, const char *uri,
1055         struct stasis_app_event_source *event_source);
1056
1057 /*!
1058  * \internal
1059  * \brief Subscriptions handler for application [un]subscribing.
1060  *
1061  * \param app_name Name of the application to subscribe.
1062  * \param event_source_uris URIs for the event sources to subscribe to.
1063  * \param event_sources_count Array size of event_source_uris.
1064  * \param json Optional output pointer for JSON representation of the app
1065  *             after adding the subscription.
1066  * \param handler [un]subscribe handler
1067  *
1068  * \retval stasis_app_subscribe_res return code.
1069  */
1070 static enum stasis_app_subscribe_res app_handle_subscriptions(
1071         const char *app_name, const char **event_source_uris,
1072         int event_sources_count, struct ast_json **json,
1073         app_subscription_handler handler)
1074 {
1075         RAII_VAR(struct stasis_app *, app, find_app_by_name(app_name), ao2_cleanup);
1076         int i;
1077
1078         if (!app) {
1079                 return STASIS_ASR_APP_NOT_FOUND;
1080         }
1081
1082         for (i = 0; i < event_sources_count; ++i) {
1083                 const char *uri = event_source_uris[i];
1084                 enum stasis_app_subscribe_res res = STASIS_ASR_INTERNAL_ERROR;
1085                 struct stasis_app_event_source *event_source;
1086
1087                 if (!(event_source = app_event_source_find(uri))) {
1088                         ast_log(LOG_WARNING, "Invalid scheme: %s\n", uri);
1089                         return STASIS_ASR_EVENT_SOURCE_BAD_SCHEME;
1090                 }
1091
1092                 if (handler &&
1093                     ((res = handler(app, uri, event_source)))) {
1094                         return res;
1095                 }
1096         }
1097
1098         if (json) {
1099                 ast_debug(3, "%s: Successful; setting results\n", app_name);
1100                 *json = stasis_app_object_to_json(app);
1101         }
1102
1103         return STASIS_ASR_OK;
1104 }
1105
1106 /*!
1107  * \internal
1108  * \brief Subscribe an app to an event source.
1109  *
1110  * \param app subscribing application
1111  * \param uri scheme:id of an event source
1112  * \param event_source being subscribed to
1113  *
1114  * \retval stasis_app_subscribe_res return code.
1115  */
1116 static enum stasis_app_subscribe_res app_subscribe(
1117         struct stasis_app *app, const char *uri,
1118         struct stasis_app_event_source *event_source)
1119 {
1120         const char *app_name = stasis_app_name(app);
1121         RAII_VAR(void *, obj, NULL, ao2_cleanup);
1122
1123         ast_debug(3, "%s: Checking %s\n", app_name, uri);
1124
1125         if (!event_source->find ||
1126             (!(obj = event_source->find(app, uri + strlen(event_source->scheme))))) {
1127                 ast_log(LOG_WARNING, "Event source not found: %s\n", uri);
1128                 return STASIS_ASR_EVENT_SOURCE_NOT_FOUND;
1129         }
1130
1131         ast_debug(3, "%s: Subscribing to %s\n", app_name, uri);
1132
1133         if (!event_source->subscribe || (event_source->subscribe(app, obj))) {
1134                 ast_log(LOG_WARNING, "Error subscribing app '%s' to '%s'\n",
1135                         app_name, uri);
1136                 return STASIS_ASR_INTERNAL_ERROR;
1137         }
1138
1139         return STASIS_ASR_OK;
1140 }
1141
1142 enum stasis_app_subscribe_res stasis_app_subscribe(const char *app_name,
1143         const char **event_source_uris, int event_sources_count,
1144         struct ast_json **json)
1145 {
1146         return app_handle_subscriptions(
1147                 app_name, event_source_uris, event_sources_count,
1148                 json, app_subscribe);
1149 }
1150
1151 /*!
1152  * \internal
1153  * \brief Unsubscribe an app from an event source.
1154  *
1155  * \param app application to unsubscribe
1156  * \param uri scheme:id of an event source
1157  * \param event_source being unsubscribed from
1158  *
1159  * \retval stasis_app_subscribe_res return code.
1160  */
1161 static enum stasis_app_subscribe_res app_unsubscribe(
1162         struct stasis_app *app, const char *uri,
1163         struct stasis_app_event_source *event_source)
1164 {
1165         const char *app_name = stasis_app_name(app);
1166         const char *id = uri + strlen(event_source->scheme);
1167
1168         if (!event_source->is_subscribed ||
1169             (!event_source->is_subscribed(app, id))) {
1170                 return STASIS_ASR_EVENT_SOURCE_NOT_FOUND;
1171         }
1172
1173         ast_debug(3, "%s: Unsubscribing from %s\n", app_name, uri);
1174
1175         if (!event_source->unsubscribe || (event_source->unsubscribe(app, id))) {
1176                 ast_log(LOG_WARNING, "Error unsubscribing app '%s' to '%s'\n",
1177                         app_name, uri);
1178                 return -1;
1179         }
1180         return 0;
1181 }
1182
1183 enum stasis_app_subscribe_res stasis_app_unsubscribe(const char *app_name,
1184         const char **event_source_uris, int event_sources_count,
1185         struct ast_json **json)
1186 {
1187         return app_handle_subscriptions(
1188                 app_name, event_source_uris, event_sources_count,
1189                 json, app_unsubscribe);
1190 }
1191
1192 void stasis_app_ref(void)
1193 {
1194         ast_module_ref(ast_module_info->self);
1195 }
1196
1197 void stasis_app_unref(void)
1198 {
1199         ast_module_unref(ast_module_info->self);
1200 }
1201
1202 static int unload_module(void)
1203 {
1204         stasis_app_unregister_event_sources();
1205
1206         ao2_cleanup(apps_registry);
1207         apps_registry = NULL;
1208
1209         ao2_cleanup(app_controls);
1210         app_controls = NULL;
1211
1212         ao2_cleanup(app_bridges);
1213         app_bridges = NULL;
1214
1215         ao2_cleanup(app_bridges_moh);
1216         app_bridges_moh = NULL;
1217
1218         return 0;
1219 }
1220
1221 /* \brief Sanitization callback for channel snapshots */
1222 static int channel_snapshot_sanitizer(const struct ast_channel_snapshot *snapshot)
1223 {
1224         if (!snapshot || !(snapshot->tech_properties & AST_CHAN_TP_INTERNAL)) {
1225                 return 0;
1226         }
1227         return 1;
1228 }
1229
1230 /* \brief Sanitization callback for channel unique IDs */
1231 static int channel_id_sanitizer(const char *id)
1232 {
1233         RAII_VAR(struct ast_channel_snapshot *, snapshot, ast_channel_snapshot_get_latest(id), ao2_cleanup);
1234
1235         return channel_snapshot_sanitizer(snapshot);
1236 }
1237
1238 /* \brief Sanitization callbacks for communication to Stasis applications */
1239 struct stasis_message_sanitizer app_sanitizer = {
1240         .channel_id = channel_id_sanitizer,
1241         .channel_snapshot = channel_snapshot_sanitizer,
1242 };
1243
1244 struct stasis_message_sanitizer *stasis_app_get_sanitizer(void)
1245 {
1246         return &app_sanitizer;
1247 }
1248
1249 static int load_module(void)
1250 {
1251         apps_registry = ao2_container_alloc(APPS_NUM_BUCKETS, app_hash, app_compare);
1252         app_controls = ao2_container_alloc(CONTROLS_NUM_BUCKETS, control_hash, control_compare);
1253         app_bridges = ao2_container_alloc(BRIDGES_NUM_BUCKETS, bridges_hash, bridges_compare);
1254         app_bridges_moh = ao2_container_alloc_hash(
1255                 AO2_ALLOC_OPT_LOCK_MUTEX, AO2_CONTAINER_ALLOC_OPT_DUPS_REJECT,
1256                 37, bridges_moh_hash_fn, bridges_moh_sort_fn, NULL);
1257         if (!apps_registry || !app_controls || !app_bridges || !app_bridges_moh) {
1258                 unload_module();
1259                 return AST_MODULE_LOAD_FAILURE;
1260         }
1261
1262         stasis_app_register_event_sources();
1263
1264         return AST_MODULE_LOAD_SUCCESS;
1265 }
1266
1267 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS, "Stasis application support",
1268         .load = load_module,
1269         .unload = unload_module,
1270         );