stasis: Fixed scoping problem with bridge tracking.
[asterisk/asterisk.git] / res / res_stasis.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2012 - 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis application support.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  *
25  * <code>res_stasis.so</code> brings together the various components of the
26  * Stasis application infrastructure.
27  *
28  * First, there's the Stasis application handler, stasis_app_exec(). This is
29  * called by <code>app_stasis.so</code> to give control of a channel to the
30  * Stasis application code from the dialplan.
31  *
32  * While a channel is in stasis_app_exec(), it has a \ref stasis_app_control
33  * object, which may be used to control the channel.
34  *
35  * To control the channel, commands may be sent to channel using
36  * stasis_app_send_command() and stasis_app_send_async_command().
37  *
38  * Alongside this, applications may be registered/unregistered using
39  * stasis_app_register()/stasis_app_unregister(). While a channel is in Stasis,
40  * events received on the channel's topic are converted to JSON and forwarded to
41  * the \ref stasis_app_cb. The application may also subscribe to the channel to
42  * continue to receive messages even after the channel has left Stasis, but it
43  * will not be able to control it.
44  *
45  * Given all the stuff that comes together in this module, it's been broken up
46  * into several pieces that are in <code>res/stasis/</code> and compiled into
47  * <code>res_stasis.so</code>.
48  */
49
50 /*** MODULEINFO
51         <support_level>core</support_level>
52  ***/
53
54 #include "asterisk.h"
55
56 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
57
58 #include "asterisk/astobj2.h"
59 #include "asterisk/callerid.h"
60 #include "asterisk/module.h"
61 #include "asterisk/stasis_app_impl.h"
62 #include "asterisk/stasis_channels.h"
63 #include "asterisk/stasis_bridges.h"
64 #include "asterisk/stasis_message_router.h"
65 #include "asterisk/strings.h"
66 #include "stasis/app.h"
67 #include "stasis/control.h"
68 #include "asterisk/core_unreal.h"
69 #include "asterisk/musiconhold.h"
70 #include "asterisk/causes.h"
71 #include "asterisk/stringfields.h"
72 #include "asterisk/bridge_after.h"
73
74 /*! Time to wait for a frame in the application */
75 #define MAX_WAIT_MS 200
76
77 /*!
78  * \brief Number of buckets for the Stasis application hash table.  Remember to
79  * keep it a prime number!
80  */
81 #define APPS_NUM_BUCKETS 127
82
83 /*!
84  * \brief Number of buckets for the Stasis application hash table.  Remember to
85  * keep it a prime number!
86  */
87 #define CONTROLS_NUM_BUCKETS 127
88
89 /*!
90  * \brief Number of buckets for the Stasis bridges hash table.  Remember to
91  * keep it a prime number!
92  */
93 #define BRIDGES_NUM_BUCKETS 127
94
95 /*!
96  * \brief Stasis application container.
97  */
98 struct ao2_container *apps_registry;
99
100 struct ao2_container *app_controls;
101
102 struct ao2_container *app_bridges;
103
104 struct ao2_container *app_bridges_moh;
105
106 /*! AO2 hash function for \ref app */
107 static int app_hash(const void *obj, const int flags)
108 {
109         const struct app *app;
110         const char *key;
111
112         switch (flags & OBJ_SEARCH_MASK) {
113         case OBJ_SEARCH_KEY:
114                 key = obj;
115                 break;
116         case OBJ_SEARCH_OBJECT:
117                 app = obj;
118                 key = app_name(app);
119                 break;
120         default:
121                 /* Hash can only work on something with a full key. */
122                 ast_assert(0);
123                 return 0;
124         }
125         return ast_str_hash(key);
126 }
127
128 /*! AO2 comparison function for \ref app */
129 static int app_compare(void *obj, void *arg, int flags)
130 {
131         const struct app *object_left = obj;
132         const struct app *object_right = arg;
133         const char *right_key = arg;
134         int cmp;
135
136         switch (flags & OBJ_SEARCH_MASK) {
137         case OBJ_SEARCH_OBJECT:
138                 right_key = app_name(object_right);
139                 /* Fall through */
140         case OBJ_SEARCH_KEY:
141                 cmp = strcmp(app_name(object_left), right_key);
142                 break;
143         case OBJ_SEARCH_PARTIAL_KEY:
144                 /*
145                  * We could also use a partial key struct containing a length
146                  * so strlen() does not get called for every comparison instead.
147                  */
148                 cmp = strncmp(app_name(object_left), right_key, strlen(right_key));
149                 break;
150         default:
151                 /*
152                  * What arg points to is specific to this traversal callback
153                  * and has no special meaning to astobj2.
154                  */
155                 cmp = 0;
156                 break;
157         }
158         if (cmp) {
159                 return 0;
160         }
161         /*
162          * At this point the traversal callback is identical to a sorted
163          * container.
164          */
165         return CMP_MATCH;
166 }
167
168 /*! AO2 hash function for \ref stasis_app_control */
169 static int control_hash(const void *obj, const int flags)
170 {
171         const struct stasis_app_control *control;
172         const char *key;
173
174         switch (flags & OBJ_SEARCH_MASK) {
175         case OBJ_SEARCH_KEY:
176                 key = obj;
177                 break;
178         case OBJ_SEARCH_OBJECT:
179                 control = obj;
180                 key = stasis_app_control_get_channel_id(control);
181                 break;
182         default:
183                 /* Hash can only work on something with a full key. */
184                 ast_assert(0);
185                 return 0;
186         }
187         return ast_str_hash(key);
188 }
189
190 /*! AO2 comparison function for \ref stasis_app_control */
191 static int control_compare(void *obj, void *arg, int flags)
192 {
193         const struct stasis_app_control *object_left = obj;
194         const struct stasis_app_control *object_right = arg;
195         const char *right_key = arg;
196         int cmp;
197
198         switch (flags & OBJ_SEARCH_MASK) {
199         case OBJ_SEARCH_OBJECT:
200                 right_key = stasis_app_control_get_channel_id(object_right);
201                 /* Fall through */
202         case OBJ_SEARCH_KEY:
203                 cmp = strcmp(stasis_app_control_get_channel_id(object_left), right_key);
204                 break;
205         case OBJ_SEARCH_PARTIAL_KEY:
206                 /*
207                  * We could also use a partial key struct containing a length
208                  * so strlen() does not get called for every comparison instead.
209                  */
210                 cmp = strncmp(stasis_app_control_get_channel_id(object_left), right_key, strlen(right_key));
211                 break;
212         default:
213                 /*
214                  * What arg points to is specific to this traversal callback
215                  * and has no special meaning to astobj2.
216                  */
217                 cmp = 0;
218                 break;
219         }
220         if (cmp) {
221                 return 0;
222         }
223         /*
224          * At this point the traversal callback is identical to a sorted
225          * container.
226          */
227         return CMP_MATCH;
228 }
229
230 static int cleanup_cb(void *obj, void *arg, int flags)
231 {
232         struct app *app = obj;
233
234         if (!app_is_finished(app)) {
235                 return 0;
236         }
237
238         ast_verb(1, "Shutting down application '%s'\n", app_name(app));
239         app_shutdown(app);
240
241         return CMP_MATCH;
242
243 }
244
245 /*!
246  * \brief Clean up any old apps that we don't need any more.
247  */
248 static void cleanup(void)
249 {
250         ao2_callback(apps_registry, OBJ_MULTIPLE | OBJ_NODATA | OBJ_UNLINK,
251                 cleanup_cb, NULL);
252 }
253
254 struct stasis_app_control *stasis_app_control_create(struct ast_channel *chan)
255 {
256         return control_create(chan);
257 }
258
259 struct stasis_app_control *stasis_app_control_find_by_channel(
260         const struct ast_channel *chan)
261 {
262         if (chan == NULL) {
263                 return NULL;
264         }
265
266         return stasis_app_control_find_by_channel_id(
267                 ast_channel_uniqueid(chan));
268 }
269
270 struct stasis_app_control *stasis_app_control_find_by_channel_id(
271         const char *channel_id)
272 {
273         return ao2_find(app_controls, channel_id, OBJ_SEARCH_KEY);
274 }
275
276 /*! AO2 hash function for bridges container  */
277 static int bridges_hash(const void *obj, const int flags)
278 {
279         const struct ast_bridge *bridge;
280         const char *key;
281
282         switch (flags & OBJ_SEARCH_MASK) {
283         case OBJ_SEARCH_KEY:
284                 key = obj;
285                 break;
286         case OBJ_SEARCH_OBJECT:
287                 bridge = obj;
288                 key = bridge->uniqueid;
289                 break;
290         default:
291                 /* Hash can only work on something with a full key. */
292                 ast_assert(0);
293                 return 0;
294         }
295         return ast_str_hash(key);
296 }
297
298 /*! AO2 comparison function for bridges container */
299 static int bridges_compare(void *obj, void *arg, int flags)
300 {
301         const struct ast_bridge *object_left = obj;
302         const struct ast_bridge *object_right = arg;
303         const char *right_key = arg;
304         int cmp;
305
306         switch (flags & OBJ_SEARCH_MASK) {
307         case OBJ_SEARCH_OBJECT:
308                 right_key = object_right->uniqueid;
309                 /* Fall through */
310         case OBJ_SEARCH_KEY:
311                 cmp = strcmp(object_left->uniqueid, right_key);
312                 break;
313         case OBJ_SEARCH_PARTIAL_KEY:
314                 /*
315                  * We could also use a partial key struct containing a length
316                  * so strlen() does not get called for every comparison instead.
317                  */
318                 cmp = strncmp(object_left->uniqueid, right_key, strlen(right_key));
319                 break;
320         default:
321                 /*
322                  * What arg points to is specific to this traversal callback
323                  * and has no special meaning to astobj2.
324                  */
325                 cmp = 0;
326                 break;
327         }
328         if (cmp) {
329                 return 0;
330         }
331         /*
332          * At this point the traversal callback is identical to a sorted
333          * container.
334          */
335         return CMP_MATCH;
336 }
337
338 /*!
339  *  Used with app_bridges_moh, provides links between bridges and existing music
340  *  on hold channels that are being used with them.
341  */
342 struct stasis_app_bridge_moh_wrapper {
343         AST_DECLARE_STRING_FIELDS(
344                 AST_STRING_FIELD(channel_id);
345                 AST_STRING_FIELD(bridge_id);
346         );
347 };
348
349 static void stasis_app_bridge_moh_wrapper_destructor(void *obj)
350 {
351         struct stasis_app_bridge_moh_wrapper *wrapper = obj;
352         ast_string_field_free_memory(wrapper);
353 }
354
355 /*! AO2 hash function for the bridges moh container */
356 static int bridges_moh_hash_fn(const void *obj, const int flags)
357 {
358         const struct stasis_app_bridge_moh_wrapper *wrapper;
359         const char *key;
360
361         switch (flags & OBJ_SEARCH_MASK) {
362         case OBJ_SEARCH_KEY:
363                 key = obj;
364                 break;
365         case OBJ_SEARCH_OBJECT:
366                 wrapper = obj;
367                 key = wrapper->bridge_id;
368                 break;
369         default:
370                 /* Hash can only work on something with a full key. */
371                 ast_assert(0);
372                 return 0;
373         }
374         return ast_str_hash(key);
375 }
376
377 static int bridges_moh_sort_fn(const void *obj_left, const void *obj_right, const int flags)
378 {
379         const struct stasis_app_bridge_moh_wrapper *left = obj_left;
380         const struct stasis_app_bridge_moh_wrapper *right = obj_right;
381         const char *right_key = obj_right;
382         int cmp;
383
384         switch (flags & OBJ_SEARCH_MASK) {
385         case OBJ_SEARCH_OBJECT:
386                 right_key = right->bridge_id;
387                 /* Fall through */
388         case OBJ_SEARCH_KEY:
389                 cmp = strcmp(left->bridge_id, right_key);
390                 break;
391         case OBJ_SEARCH_PARTIAL_KEY:
392                 cmp = strncmp(left->bridge_id, right_key, strlen(right_key));
393                 break;
394         default:
395                 /* Sort can only work on something with a full or partial key. */
396                 ast_assert(0);
397                 cmp = 0;
398                 break;
399         }
400         return cmp;
401 }
402
403 /*! Removes the bridge to music on hold channel link */
404 static void remove_bridge_moh(char *bridge_id)
405 {
406         ao2_find(app_bridges_moh, bridge_id, OBJ_SEARCH_KEY | OBJ_UNLINK | OBJ_NODATA);
407         ast_free(bridge_id);
408 }
409
410 /*! After bridge failure callback for moh channels */
411 static void moh_after_bridge_cb_failed(enum ast_bridge_after_cb_reason reason, void *data)
412 {
413         char *bridge_id = data;
414
415         remove_bridge_moh(bridge_id);
416 }
417
418 /*! After bridge callback for moh channels */
419 static void moh_after_bridge_cb(struct ast_channel *chan, void *data)
420 {
421         char *bridge_id = data;
422
423         remove_bridge_moh(bridge_id);
424 }
425
426 /*! Request a bridge MOH channel */
427 static struct ast_channel *prepare_bridge_moh_channel(void)
428 {
429         RAII_VAR(struct ast_format_cap *, cap, NULL, ast_format_cap_destroy);
430         struct ast_format format;
431
432         cap = ast_format_cap_alloc(AST_FORMAT_CAP_FLAG_NOLOCK);
433         if (!cap) {
434                 return NULL;
435         }
436
437         ast_format_cap_add(cap, ast_format_set(&format, AST_FORMAT_SLINEAR, 0));
438
439         return ast_request("Announcer", cap, NULL, "ARI_MOH", NULL);
440 }
441
442 /*! Provides the moh channel with a thread so it can actually play its music */
443 static void *moh_channel_thread(void *data)
444 {
445         struct ast_channel *moh_channel = data;
446
447         while (!ast_safe_sleep(moh_channel, 1000)) {
448         }
449
450         ast_moh_stop(moh_channel);
451         ast_hangup(moh_channel);
452
453         return NULL;
454 }
455
456 /*!
457  * \internal
458  * \brief Creates, pushes, and links a channel for playing music on hold to bridge
459  *
460  * \param bridge Which bridge this moh channel exists for
461  *
462  * \retval NULL if the channel could not be created, pushed, or linked
463  * \retval Reference to the channel on success
464  */
465 static struct ast_channel *bridge_moh_create(struct ast_bridge *bridge)
466 {
467         RAII_VAR(struct stasis_app_bridge_moh_wrapper *, new_wrapper, NULL, ao2_cleanup);
468         RAII_VAR(char *, bridge_id, ast_strdup(bridge->uniqueid), ast_free);
469         struct ast_channel *chan;
470         pthread_t threadid;
471
472         if (!bridge_id) {
473                 return NULL;
474         }
475
476         chan = prepare_bridge_moh_channel();
477         if (!chan) {
478                 return NULL;
479         }
480
481         /* The after bridge callback assumes responsibility of the bridge_id. */
482         if (ast_bridge_set_after_callback(chan,
483                 moh_after_bridge_cb, moh_after_bridge_cb_failed, bridge_id)) {
484                 ast_hangup(chan);
485                 return NULL;
486         }
487         bridge_id = NULL;
488
489         if (ast_unreal_channel_push_to_bridge(chan, bridge,
490                 AST_BRIDGE_CHANNEL_FLAG_IMMOVABLE | AST_BRIDGE_CHANNEL_FLAG_LONELY)) {
491                 ast_hangup(chan);
492                 return NULL;
493         }
494
495         new_wrapper = ao2_alloc_options(sizeof(*new_wrapper),
496                 stasis_app_bridge_moh_wrapper_destructor, AO2_ALLOC_OPT_LOCK_NOLOCK);
497         if (!new_wrapper) {
498                 ast_hangup(chan);
499                 return NULL;
500         }
501
502         if (ast_string_field_init(new_wrapper, 32)) {
503                 ast_hangup(chan);
504                 return NULL;
505         }
506         ast_string_field_set(new_wrapper, bridge_id, bridge->uniqueid);
507         ast_string_field_set(new_wrapper, channel_id, ast_channel_uniqueid(chan));
508
509         if (!ao2_link_flags(app_bridges_moh, new_wrapper, OBJ_NOLOCK)) {
510                 ast_hangup(chan);
511                 return NULL;
512         }
513
514         if (ast_pthread_create_detached(&threadid, NULL, moh_channel_thread, chan)) {
515                 ast_log(LOG_ERROR, "Failed to create channel thread. Abandoning MOH channel creation.\n");
516                 ao2_unlink_flags(app_bridges_moh, new_wrapper, OBJ_NOLOCK);
517                 ast_hangup(chan);
518                 return NULL;
519         }
520
521         return chan;
522 }
523
524 struct ast_channel *stasis_app_bridge_moh_channel(struct ast_bridge *bridge)
525 {
526         RAII_VAR(struct stasis_app_bridge_moh_wrapper *, moh_wrapper, NULL, ao2_cleanup);
527
528         {
529                 SCOPED_AO2LOCK(lock, app_bridges_moh);
530
531                 moh_wrapper = ao2_find(app_bridges_moh, bridge->uniqueid, OBJ_SEARCH_KEY | OBJ_NOLOCK);
532                 if (!moh_wrapper) {
533                         return bridge_moh_create(bridge);
534                 }
535         }
536
537         return ast_channel_get_by_name(moh_wrapper->channel_id);
538 }
539
540 int stasis_app_bridge_moh_stop(struct ast_bridge *bridge)
541 {
542         RAII_VAR(struct stasis_app_bridge_moh_wrapper *, moh_wrapper, NULL, ao2_cleanup);
543         struct ast_channel *chan;
544
545         moh_wrapper = ao2_find(app_bridges_moh, bridge->uniqueid, OBJ_SEARCH_KEY | OBJ_UNLINK);
546         if (!moh_wrapper) {
547                 return -1;
548         }
549
550         chan = ast_channel_get_by_name(moh_wrapper->channel_id);
551         if (!chan) {
552                 return -1;
553         }
554
555         ast_moh_stop(chan);
556         ast_softhangup(chan, AST_CAUSE_NORMAL_CLEARING);
557         ao2_cleanup(chan);
558
559         return 0;
560 }
561
562 struct ast_bridge *stasis_app_bridge_find_by_id(
563         const char *bridge_id)
564 {
565         return ao2_find(app_bridges, bridge_id, OBJ_SEARCH_KEY);
566 }
567
568
569 /*!
570  * \brief In addition to running ao2_cleanup(), this function also removes the
571  * object from the app_controls container.
572  */
573 static void control_unlink(struct stasis_app_control *control)
574 {
575         if (!control) {
576                 return;
577         }
578
579         ao2_unlink(app_controls, control);
580         ao2_cleanup(control);
581 }
582
583 struct ast_bridge *stasis_app_bridge_create(const char *type)
584 {
585         struct ast_bridge *bridge;
586         int capabilities;
587         int flags = AST_BRIDGE_FLAG_MERGE_INHIBIT_FROM | AST_BRIDGE_FLAG_MERGE_INHIBIT_TO
588                 | AST_BRIDGE_FLAG_SWAP_INHIBIT_FROM | AST_BRIDGE_FLAG_SWAP_INHIBIT_TO
589                 | AST_BRIDGE_FLAG_TRANSFER_PROHIBITED;
590
591         if (ast_strlen_zero(type) || !strcmp(type, "mixing")) {
592                 capabilities = AST_BRIDGE_CAPABILITY_1TO1MIX |
593                         AST_BRIDGE_CAPABILITY_MULTIMIX |
594                         AST_BRIDGE_CAPABILITY_NATIVE;
595                 flags |= AST_BRIDGE_FLAG_SMART;
596         } else if (!strcmp(type, "holding")) {
597                 capabilities = AST_BRIDGE_CAPABILITY_HOLDING;
598         } else {
599                 return NULL;
600         }
601
602         bridge = ast_bridge_base_new(capabilities, flags);
603         if (bridge) {
604                 if (!ao2_link(app_bridges, bridge)) {
605                         ast_bridge_destroy(bridge, 0);
606                         bridge = NULL;
607                 }
608         }
609         return bridge;
610 }
611
612 void stasis_app_bridge_destroy(const char *bridge_id)
613 {
614         struct ast_bridge *bridge = stasis_app_bridge_find_by_id(bridge_id);
615         if (!bridge) {
616                 return;
617         }
618         ao2_unlink(app_bridges, bridge);
619         ast_bridge_destroy(bridge, 0);
620 }
621
622 static int send_start_msg(struct app *app, struct ast_channel *chan,
623         int argc, char *argv[])
624 {
625         RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
626         RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
627
628         struct ast_json *json_args;
629         int i;
630
631         ast_assert(chan != NULL);
632
633         /* Set channel info */
634         snapshot = ast_channel_snapshot_create(chan);
635         if (!snapshot) {
636                 return -1;
637         }
638
639         msg = ast_json_pack("{s: s, s: o, s: [], s: o}",
640                 "type", "StasisStart",
641                 "timestamp", ast_json_timeval(ast_tvnow(), NULL),
642                 "args",
643                 "channel", ast_channel_snapshot_to_json(snapshot));
644         if (!msg) {
645                 return -1;
646         }
647
648         /* Append arguments to args array */
649         json_args = ast_json_object_get(msg, "args");
650         ast_assert(json_args != NULL);
651         for (i = 0; i < argc; ++i) {
652                 int r = ast_json_array_append(json_args,
653                                               ast_json_string_create(argv[i]));
654                 if (r != 0) {
655                         ast_log(LOG_ERROR, "Error appending start message\n");
656                         return -1;
657                 }
658         }
659
660         app_send(app, msg);
661         return 0;
662 }
663
664 static int send_end_msg(struct app *app, struct ast_channel *chan)
665 {
666         RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
667         RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
668
669         ast_assert(chan != NULL);
670
671         /* Set channel info */
672         snapshot = ast_channel_snapshot_create(chan);
673         if (snapshot == NULL) {
674                 return -1;
675         }
676
677         msg = ast_json_pack("{s: s, s: o, s: o}",
678                 "type", "StasisEnd",
679                 "timestamp", ast_json_timeval(ast_tvnow(), NULL),
680                 "channel", ast_channel_snapshot_to_json(snapshot));
681         if (!msg) {
682                 return -1;
683         }
684
685         app_send(app, msg);
686         return 0;
687 }
688
689 void stasis_app_control_execute_until_exhausted(struct ast_channel *chan, struct stasis_app_control *control)
690 {
691         while (!control_is_done(control)) {
692                 int command_count = control_dispatch_all(control, chan);
693                 if (command_count == 0 || ast_channel_fdno(chan) == -1) {
694                         break;
695                 }
696         }
697 }
698
699 /*! /brief Stasis dialplan application callback */
700 int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc,
701                     char *argv[])
702 {
703         SCOPED_MODULE_USE(ast_module_info->self);
704
705         RAII_VAR(struct app *, app, NULL, ao2_cleanup);
706         RAII_VAR(struct stasis_app_control *, control, NULL, control_unlink);
707         struct ast_bridge *last_bridge = NULL;
708         int res = 0;
709
710         ast_assert(chan != NULL);
711
712         app = ao2_find(apps_registry, app_name, OBJ_SEARCH_KEY);
713         if (!app) {
714                 ast_log(LOG_ERROR,
715                         "Stasis app '%s' not registered\n", app_name);
716                 return -1;
717         }
718         if (!app_is_active(app)) {
719                 ast_log(LOG_ERROR,
720                         "Stasis app '%s' not active\n", app_name);
721                 return -1;
722         }
723
724         control = control_create(chan);
725         if (!control) {
726                 ast_log(LOG_ERROR, "Allocated failed\n");
727                 return -1;
728         }
729         ao2_link(app_controls, control);
730
731         res = send_start_msg(app, chan, argc, argv);
732         if (res != 0) {
733                 ast_log(LOG_ERROR,
734                         "Error sending start message to '%s'\n", app_name);
735                 return -1;
736         }
737
738         res = app_subscribe_channel(app, chan);
739         if (res != 0) {
740                 ast_log(LOG_ERROR, "Error subscribing app '%s' to channel '%s'\n",
741                         app_name, ast_channel_name(chan));
742                 return -1;
743         }
744
745         while (!control_is_done(control)) {
746                 RAII_VAR(struct ast_frame *, f, NULL, ast_frame_dtor);
747                 int r;
748                 int command_count;
749                 struct ast_bridge *bridge = NULL;
750
751                 /* Check to see if a bridge absorbed our hangup frame */
752                 if (ast_check_hangup_locked(chan)) {
753                         break;
754                 }
755
756                 last_bridge = bridge;
757                 bridge = stasis_app_get_bridge(control);
758
759                 if (bridge != last_bridge) {
760                         app_unsubscribe_bridge(app, last_bridge);
761                         app_subscribe_bridge(app, bridge);
762                 }
763
764                 if (bridge) {
765                         /* Bridge is handling channel frames */
766                         control_wait(control);
767                         control_dispatch_all(control, chan);
768                         continue;
769                 }
770
771                 r = ast_waitfor(chan, MAX_WAIT_MS);
772
773                 if (r < 0) {
774                         ast_debug(3, "%s: Poll error\n",
775                                   ast_channel_uniqueid(chan));
776                         break;
777                 }
778
779                 command_count = control_dispatch_all(control, chan);
780
781                 if (command_count > 0 && ast_channel_fdno(chan) == -1) {
782                         /* Command drained the channel; wait for next frame */
783                         continue;
784                 }
785
786                 if (r == 0) {
787                         /* Timeout */
788                         continue;
789                 }
790
791                 f = ast_read(chan);
792                 if (!f) {
793                         /* Continue on in the dialplan */
794                         ast_debug(3, "%s: Hangup (no more frames)\n",
795                                 ast_channel_uniqueid(chan));
796                         break;
797                 }
798
799                 if (f->frametype == AST_FRAME_CONTROL) {
800                         if (f->subclass.integer == AST_CONTROL_HANGUP) {
801                                 /* Continue on in the dialplan */
802                                 ast_debug(3, "%s: Hangup\n",
803                                         ast_channel_uniqueid(chan));
804                                 break;
805                         }
806                 }
807         }
808
809         app_unsubscribe_bridge(app, stasis_app_get_bridge(control));
810         app_unsubscribe_channel(app, chan);
811
812         res = send_end_msg(app, chan);
813         if (res != 0) {
814                 ast_log(LOG_ERROR,
815                         "Error sending end message to %s\n", app_name);
816                 return res;
817         }
818
819         /* There's an off chance that app is ready for cleanup. Go ahead
820          * and clean up, just in case
821          */
822         cleanup();
823
824         return res;
825 }
826
827 int stasis_app_send(const char *app_name, struct ast_json *message)
828 {
829         RAII_VAR(struct app *, app, NULL, ao2_cleanup);
830
831         app = ao2_find(apps_registry, app_name, OBJ_SEARCH_KEY);
832         if (!app) {
833                 /* XXX We can do a better job handling late binding, queueing up
834                  * the call for a few seconds to wait for the app to register.
835                  */
836                 ast_log(LOG_WARNING,
837                         "Stasis app '%s' not registered\n", app_name);
838                 return -1;
839         }
840
841         app_send(app, message);
842         return 0;
843 }
844
845 static int append_name(void *obj, void *arg, int flags)
846 {
847         struct app *app = obj;
848         struct ao2_container *apps = arg;
849
850         ast_str_container_add(apps, app_name(app));
851         return 0;
852 }
853
854 struct ao2_container *stasis_app_get_all(void)
855 {
856         RAII_VAR(struct ao2_container *, apps, NULL, ao2_cleanup);
857
858         apps = ast_str_container_alloc(1);
859         if (!apps) {
860                 return NULL;
861         }
862
863         ao2_callback(apps_registry, OBJ_NODATA, append_name, apps);
864
865         return ao2_bump(apps);
866 }
867
868 int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data)
869 {
870         RAII_VAR(struct app *, app, NULL, ao2_cleanup);
871
872         SCOPED_LOCK(apps_lock, apps_registry, ao2_lock, ao2_unlock);
873
874         app = ao2_find(apps_registry, app_name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
875         if (app) {
876                 app_update(app, handler, data);
877         } else {
878                 app = app_create(app_name, handler, data);
879                 if (app) {
880                         ao2_link_flags(apps_registry, app, OBJ_NOLOCK);
881                 } else {
882                         return -1;
883                 }
884         }
885
886         /* We lazily clean up the apps_registry, because it's good enough to
887          * prevent memory leaks, and we're lazy.
888          */
889         cleanup();
890         return 0;
891 }
892
893 void stasis_app_unregister(const char *app_name)
894 {
895         RAII_VAR(struct app *, app, NULL, ao2_cleanup);
896
897         if (!app_name) {
898                 return;
899         }
900
901         app = ao2_find(apps_registry, app_name, OBJ_SEARCH_KEY);
902         if (!app) {
903                 ast_log(LOG_ERROR,
904                         "Stasis app '%s' not registered\n", app_name);
905                 return;
906         }
907
908         app_deactivate(app);
909
910         /* There's a decent chance that app is ready for cleanup. Go ahead
911          * and clean up, just in case
912          */
913         cleanup();
914 }
915
916 struct ast_json *stasis_app_to_json(const char *app_name)
917 {
918         RAII_VAR(struct app *, app, NULL, ao2_cleanup);
919
920         if (app_name) {
921                 app = ao2_find(apps_registry, app_name, OBJ_SEARCH_KEY);
922         }
923
924         if (!app) {
925                 return NULL;
926         }
927
928         return app_to_json(app);
929 }
930
931 #define CHANNEL_SCHEME "channel:"
932 #define BRIDGE_SCHEME "bridge:"
933 #define ENDPOINT_SCHEME "endpoint:"
934
935 /*! Struct for capturing event source information */
936 struct event_source {
937         enum {
938                 EVENT_SOURCE_CHANNEL,
939                 EVENT_SOURCE_BRIDGE,
940                 EVENT_SOURCE_ENDPOINT,
941         } event_source_type;
942         union {
943                 struct ast_channel *channel;
944                 struct ast_bridge *bridge;
945                 struct ast_endpoint *endpoint;
946         };
947 };
948
949 enum stasis_app_subscribe_res stasis_app_subscribe(const char *app_name,
950         const char **event_source_uris, int event_sources_count,
951         struct ast_json **json)
952 {
953         RAII_VAR(struct app *, app, NULL, ao2_cleanup);
954         RAII_VAR(struct event_source *, event_sources, NULL, ast_free);
955         enum stasis_app_subscribe_res res = STASIS_ASR_OK;
956         int i;
957
958         if (app_name) {
959                 app = ao2_find(apps_registry, app_name, OBJ_SEARCH_KEY);
960         }
961
962         if (!app) {
963                 ast_log(LOG_WARNING, "Could not find app '%s'\n",
964                         app_name ? : "(null)");
965                 return STASIS_ASR_APP_NOT_FOUND;
966         }
967
968         event_sources = ast_calloc(event_sources_count, sizeof(*event_sources));
969         if (!event_sources) {
970                 return STASIS_ASR_INTERNAL_ERROR;
971         }
972
973         for (i = 0; res == STASIS_ASR_OK && i < event_sources_count; ++i) {
974                 const char *uri = event_source_uris[i];
975                 ast_debug(3, "%s: Checking %s\n", app_name,
976                         uri);
977                 if (ast_begins_with(uri, CHANNEL_SCHEME)) {
978                         event_sources[i].event_source_type =
979                                 EVENT_SOURCE_CHANNEL;
980                         event_sources[i].channel = ast_channel_get_by_name(
981                                 uri + strlen(CHANNEL_SCHEME));
982                         if (!event_sources[i].channel) {
983                                 ast_log(LOG_WARNING, "Channel not found: %s\n", uri);
984                                 res = STASIS_ASR_EVENT_SOURCE_NOT_FOUND;
985                         }
986                 } else if (ast_begins_with(uri, BRIDGE_SCHEME)) {
987                         event_sources[i].event_source_type =
988                                 EVENT_SOURCE_BRIDGE;
989                         event_sources[i].bridge = stasis_app_bridge_find_by_id(
990                                 uri + strlen(BRIDGE_SCHEME));
991                         if (!event_sources[i].bridge) {
992                                 ast_log(LOG_WARNING, "Bridge not found: %s\n", uri);
993                                 res = STASIS_ASR_EVENT_SOURCE_NOT_FOUND;
994                         }
995                 } else if (ast_begins_with(uri, ENDPOINT_SCHEME)) {
996                         event_sources[i].event_source_type =
997                                 EVENT_SOURCE_ENDPOINT;
998                         event_sources[i].endpoint = ast_endpoint_find_by_id(
999                                 uri + strlen(ENDPOINT_SCHEME));
1000                         if (!event_sources[i].endpoint) {
1001                                 ast_log(LOG_WARNING, "Endpoint not found: %s\n", uri);
1002                                 res = STASIS_ASR_EVENT_SOURCE_NOT_FOUND;
1003                         }
1004                 } else {
1005                         ast_log(LOG_WARNING, "Invalid scheme: %s\n", uri);
1006                         res = STASIS_ASR_EVENT_SOURCE_BAD_SCHEME;
1007                 }
1008         }
1009
1010         for (i = 0; res == STASIS_ASR_OK && i < event_sources_count; ++i) {
1011                 int sub_res = -1;
1012                 ast_debug(1, "%s: Subscribing to %s\n", app_name,
1013                         event_source_uris[i]);
1014
1015                 switch (event_sources[i].event_source_type) {
1016                 case EVENT_SOURCE_CHANNEL:
1017                         sub_res = app_subscribe_channel(app,
1018                                 event_sources[i].channel);
1019                         break;
1020                 case EVENT_SOURCE_BRIDGE:
1021                         sub_res = app_subscribe_bridge(app,
1022                                 event_sources[i].bridge);
1023                         break;
1024                 case EVENT_SOURCE_ENDPOINT:
1025                         sub_res = app_subscribe_endpoint(app,
1026                                 event_sources[i].endpoint);
1027                         break;
1028                 }
1029
1030                 if (sub_res != 0) {
1031                         ast_log(LOG_WARNING,
1032                                 "Error subscribing app '%s' to '%s'\n",
1033                                 app_name, event_source_uris[i]);
1034                         res = STASIS_ASR_INTERNAL_ERROR;
1035                 }
1036         }
1037
1038         if (res == STASIS_ASR_OK && json) {
1039                 ast_debug(1, "%s: Successful; setting results\n", app_name);
1040                 *json = app_to_json(app);
1041         }
1042
1043         for (i = 0; i < event_sources_count; ++i) {
1044                 switch (event_sources[i].event_source_type) {
1045                 case EVENT_SOURCE_CHANNEL:
1046                         event_sources[i].channel =
1047                                 ast_channel_cleanup(event_sources[i].channel);
1048                         break;
1049                 case EVENT_SOURCE_BRIDGE:
1050                         ao2_cleanup(event_sources[i].bridge);
1051                         event_sources[i].bridge = NULL;
1052                         break;
1053                 case EVENT_SOURCE_ENDPOINT:
1054                         ao2_cleanup(event_sources[i].endpoint);
1055                         event_sources[i].endpoint = NULL;
1056                         break;
1057                 }
1058         }
1059
1060         return res;
1061 }
1062
1063 enum stasis_app_subscribe_res stasis_app_unsubscribe(const char *app_name,
1064         const char **event_source_uris, int event_sources_count,
1065         struct ast_json **json)
1066 {
1067         RAII_VAR(struct app *, app, NULL, ao2_cleanup);
1068         enum stasis_app_subscribe_res res = STASIS_ASR_OK;
1069         int i;
1070
1071         if (app_name) {
1072                 app = ao2_find(apps_registry, app_name, OBJ_SEARCH_KEY);
1073         }
1074
1075         if (!app) {
1076                 ast_log(LOG_WARNING, "Could not find app '%s'\n",
1077                         app_name ? : "(null)");
1078                 return STASIS_ASR_APP_NOT_FOUND;
1079         }
1080
1081         /* Validate the input */
1082         for (i = 0; res == STASIS_ASR_OK && i < event_sources_count; ++i) {
1083                 if (ast_begins_with(event_source_uris[i], CHANNEL_SCHEME)) {
1084                         const char *channel_id = event_source_uris[i] +
1085                                 strlen(CHANNEL_SCHEME);
1086                         if (!app_is_subscribed_channel_id(app, channel_id)) {
1087                                 res = STASIS_ASR_EVENT_SOURCE_NOT_FOUND;
1088                         }
1089                 } else if (ast_begins_with(event_source_uris[i], BRIDGE_SCHEME)) {
1090                         const char *bridge_id = event_source_uris[i] +
1091                                 strlen(BRIDGE_SCHEME);
1092                         if (!app_is_subscribed_bridge_id(app, bridge_id)) {
1093                                 res = STASIS_ASR_EVENT_SOURCE_NOT_FOUND;
1094                         }
1095                 } else if (ast_begins_with(event_source_uris[i], ENDPOINT_SCHEME)) {
1096                         const char *endpoint_id = event_source_uris[i] +
1097                                 strlen(ENDPOINT_SCHEME);
1098                         if (!app_is_subscribed_endpoint_id(app, endpoint_id)) {
1099                                 res = STASIS_ASR_EVENT_SOURCE_NOT_FOUND;
1100                         }
1101                 } else {
1102                         res = STASIS_ASR_EVENT_SOURCE_BAD_SCHEME;
1103                 }
1104         }
1105
1106         for (i = 0; res == STASIS_ASR_OK && i < event_sources_count; ++i) {
1107                 if (ast_begins_with(event_source_uris[i], CHANNEL_SCHEME)) {
1108                         const char *channel_id = event_source_uris[i] +
1109                                 strlen(CHANNEL_SCHEME);
1110                         app_unsubscribe_channel_id(app, channel_id);
1111                 } else if (ast_begins_with(event_source_uris[i], BRIDGE_SCHEME)) {
1112                         const char *bridge_id = event_source_uris[i] +
1113                                 strlen(BRIDGE_SCHEME);
1114                         app_unsubscribe_bridge_id(app, bridge_id);
1115                 } else if (ast_begins_with(event_source_uris[i], ENDPOINT_SCHEME)) {
1116                         const char *endpoint_id = event_source_uris[i] +
1117                                 strlen(ENDPOINT_SCHEME);
1118                         app_unsubscribe_endpoint_id(app, endpoint_id);
1119                 }
1120         }
1121
1122         if (res == STASIS_ASR_OK && json) {
1123                 *json = app_to_json(app);
1124         }
1125
1126         return res;
1127 }
1128
1129 void stasis_app_ref(void)
1130 {
1131         ast_module_ref(ast_module_info->self);
1132 }
1133
1134 void stasis_app_unref(void)
1135 {
1136         ast_module_unref(ast_module_info->self);
1137 }
1138
1139 static int unload_module(void)
1140 {
1141         ao2_cleanup(apps_registry);
1142         apps_registry = NULL;
1143
1144         ao2_cleanup(app_controls);
1145         app_controls = NULL;
1146
1147         ao2_cleanup(app_bridges);
1148         app_bridges = NULL;
1149
1150         ao2_cleanup(app_bridges_moh);
1151         app_bridges_moh = NULL;
1152
1153         return 0;
1154 }
1155
1156 static int load_module(void)
1157 {
1158         apps_registry = ao2_container_alloc(APPS_NUM_BUCKETS, app_hash, app_compare);
1159         app_controls = ao2_container_alloc(CONTROLS_NUM_BUCKETS, control_hash, control_compare);
1160         app_bridges = ao2_container_alloc(BRIDGES_NUM_BUCKETS, bridges_hash, bridges_compare);
1161         app_bridges_moh = ao2_container_alloc_hash(
1162                 AO2_ALLOC_OPT_LOCK_MUTEX, AO2_CONTAINER_ALLOC_OPT_DUPS_REJECT,
1163                 37, bridges_moh_hash_fn, bridges_moh_sort_fn, NULL);
1164         if (!apps_registry || !app_controls || !app_bridges || !app_bridges_moh) {
1165                 unload_module();
1166                 return AST_MODULE_LOAD_FAILURE;
1167         }
1168
1169         return AST_MODULE_LOAD_SUCCESS;
1170 }
1171
1172 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS, "Stasis application support",
1173         .load = load_module,
1174         .unload = unload_module,
1175         );