res_stasis.c: Made use the ao2_container callback templates.
[asterisk/asterisk.git] / res / res_stasis.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2012 - 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis application support.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  *
25  * <code>res_stasis.so</code> brings together the various components of the
26  * Stasis application infrastructure.
27  *
28  * First, there's the Stasis application handler, stasis_app_exec(). This is
29  * called by <code>app_stasis.so</code> to give control of a channel to the
30  * Stasis application code from the dialplan.
31  *
32  * While a channel is in stasis_app_exec(), it has a \ref stasis_app_control
33  * object, which may be used to control the channel.
34  *
35  * To control the channel, commands may be sent to channel using
36  * stasis_app_send_command() and stasis_app_send_async_command().
37  *
38  * Alongside this, applications may be registered/unregistered using
39  * stasis_app_register()/stasis_app_unregister(). While a channel is in Stasis,
40  * events received on the channel's topic are converted to JSON and forwarded to
41  * the \ref stasis_app_cb. The application may also subscribe to the channel to
42  * continue to receive messages even after the channel has left Stasis, but it
43  * will not be able to control it.
44  *
45  * Given all the stuff that comes together in this module, it's been broken up
46  * into several pieces that are in <code>res/stasis/</code> and compiled into
47  * <code>res_stasis.so</code>.
48  */
49
50 /*** MODULEINFO
51         <support_level>core</support_level>
52  ***/
53
54 #include "asterisk.h"
55
56 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
57
58 #include "asterisk/astobj2.h"
59 #include "asterisk/callerid.h"
60 #include "asterisk/module.h"
61 #include "asterisk/stasis_app_impl.h"
62 #include "asterisk/stasis_channels.h"
63 #include "asterisk/stasis_bridges.h"
64 #include "asterisk/stasis_message_router.h"
65 #include "asterisk/strings.h"
66 #include "stasis/app.h"
67 #include "stasis/control.h"
68 #include "asterisk/core_unreal.h"
69 #include "asterisk/musiconhold.h"
70 #include "asterisk/causes.h"
71 #include "asterisk/stringfields.h"
72 #include "asterisk/bridge_after.h"
73
74 /*! Time to wait for a frame in the application */
75 #define MAX_WAIT_MS 200
76
77 /*!
78  * \brief Number of buckets for the Stasis application hash table.  Remember to
79  * keep it a prime number!
80  */
81 #define APPS_NUM_BUCKETS 127
82
83 /*!
84  * \brief Number of buckets for the Stasis application hash table.  Remember to
85  * keep it a prime number!
86  */
87 #define CONTROLS_NUM_BUCKETS 127
88
89 /*!
90  * \brief Number of buckets for the Stasis bridges hash table.  Remember to
91  * keep it a prime number!
92  */
93 #define BRIDGES_NUM_BUCKETS 127
94
95 /*!
96  * \brief Stasis application container.
97  */
98 struct ao2_container *apps_registry;
99
100 struct ao2_container *app_controls;
101
102 struct ao2_container *app_bridges;
103
104 struct ao2_container *app_bridges_moh;
105
106 /*! AO2 hash function for \ref app */
107 static int app_hash(const void *obj, const int flags)
108 {
109         const struct app *app;
110         const char *key;
111
112         switch (flags & OBJ_SEARCH_MASK) {
113         case OBJ_SEARCH_KEY:
114                 key = obj;
115                 break;
116         case OBJ_SEARCH_OBJECT:
117                 app = obj;
118                 key = app_name(app);
119                 break;
120         default:
121                 /* Hash can only work on something with a full key. */
122                 ast_assert(0);
123                 return 0;
124         }
125         return ast_str_hash(key);
126 }
127
128 /*! AO2 comparison function for \ref app */
129 static int app_compare(void *obj, void *arg, int flags)
130 {
131         const struct app *object_left = obj;
132         const struct app *object_right = arg;
133         const char *right_key = arg;
134         int cmp;
135
136         switch (flags & OBJ_SEARCH_MASK) {
137         case OBJ_SEARCH_OBJECT:
138                 right_key = app_name(object_right);
139                 /* Fall through */
140         case OBJ_SEARCH_KEY:
141                 cmp = strcmp(app_name(object_left), right_key);
142                 break;
143         case OBJ_SEARCH_PARTIAL_KEY:
144                 /*
145                  * We could also use a partial key struct containing a length
146                  * so strlen() does not get called for every comparison instead.
147                  */
148                 cmp = strncmp(app_name(object_left), right_key, strlen(right_key));
149                 break;
150         default:
151                 /*
152                  * What arg points to is specific to this traversal callback
153                  * and has no special meaning to astobj2.
154                  */
155                 cmp = 0;
156                 break;
157         }
158         if (cmp) {
159                 return 0;
160         }
161         /*
162          * At this point the traversal callback is identical to a sorted
163          * container.
164          */
165         return CMP_MATCH;
166 }
167
168 /*! AO2 hash function for \ref stasis_app_control */
169 static int control_hash(const void *obj, const int flags)
170 {
171         const struct stasis_app_control *control;
172         const char *key;
173
174         switch (flags & OBJ_SEARCH_MASK) {
175         case OBJ_SEARCH_KEY:
176                 key = obj;
177                 break;
178         case OBJ_SEARCH_OBJECT:
179                 control = obj;
180                 key = stasis_app_control_get_channel_id(control);
181                 break;
182         default:
183                 /* Hash can only work on something with a full key. */
184                 ast_assert(0);
185                 return 0;
186         }
187         return ast_str_hash(key);
188 }
189
190 /*! AO2 comparison function for \ref stasis_app_control */
191 static int control_compare(void *obj, void *arg, int flags)
192 {
193         const struct stasis_app_control *object_left = obj;
194         const struct stasis_app_control *object_right = arg;
195         const char *right_key = arg;
196         int cmp;
197
198         switch (flags & OBJ_SEARCH_MASK) {
199         case OBJ_SEARCH_OBJECT:
200                 right_key = stasis_app_control_get_channel_id(object_right);
201                 /* Fall through */
202         case OBJ_SEARCH_KEY:
203                 cmp = strcmp(stasis_app_control_get_channel_id(object_left), right_key);
204                 break;
205         case OBJ_SEARCH_PARTIAL_KEY:
206                 /*
207                  * We could also use a partial key struct containing a length
208                  * so strlen() does not get called for every comparison instead.
209                  */
210                 cmp = strncmp(stasis_app_control_get_channel_id(object_left), right_key, strlen(right_key));
211                 break;
212         default:
213                 /*
214                  * What arg points to is specific to this traversal callback
215                  * and has no special meaning to astobj2.
216                  */
217                 cmp = 0;
218                 break;
219         }
220         if (cmp) {
221                 return 0;
222         }
223         /*
224          * At this point the traversal callback is identical to a sorted
225          * container.
226          */
227         return CMP_MATCH;
228 }
229
230 static int cleanup_cb(void *obj, void *arg, int flags)
231 {
232         struct app *app = obj;
233
234         if (!app_is_finished(app)) {
235                 return 0;
236         }
237
238         ast_verb(1, "Shutting down application '%s'\n", app_name(app));
239         app_shutdown(app);
240
241         return CMP_MATCH;
242
243 }
244
245 /*!
246  * \brief Clean up any old apps that we don't need any more.
247  */
248 static void cleanup(void)
249 {
250         ao2_callback(apps_registry, OBJ_MULTIPLE | OBJ_NODATA | OBJ_UNLINK,
251                 cleanup_cb, NULL);
252 }
253
254 struct stasis_app_control *stasis_app_control_create(struct ast_channel *chan)
255 {
256         return control_create(chan);
257 }
258
259 struct stasis_app_control *stasis_app_control_find_by_channel(
260         const struct ast_channel *chan)
261 {
262         if (chan == NULL) {
263                 return NULL;
264         }
265
266         return stasis_app_control_find_by_channel_id(
267                 ast_channel_uniqueid(chan));
268 }
269
270 struct stasis_app_control *stasis_app_control_find_by_channel_id(
271         const char *channel_id)
272 {
273         return ao2_find(app_controls, channel_id, OBJ_SEARCH_KEY);
274 }
275
276 /*! AO2 hash function for bridges container  */
277 static int bridges_hash(const void *obj, const int flags)
278 {
279         const struct ast_bridge *bridge;
280         const char *key;
281
282         switch (flags & OBJ_SEARCH_MASK) {
283         case OBJ_SEARCH_KEY:
284                 key = obj;
285                 break;
286         case OBJ_SEARCH_OBJECT:
287                 bridge = obj;
288                 key = bridge->uniqueid;
289                 break;
290         default:
291                 /* Hash can only work on something with a full key. */
292                 ast_assert(0);
293                 return 0;
294         }
295         return ast_str_hash(key);
296 }
297
298 /*! AO2 comparison function for bridges container */
299 static int bridges_compare(void *obj, void *arg, int flags)
300 {
301         const struct ast_bridge *object_left = obj;
302         const struct ast_bridge *object_right = arg;
303         const char *right_key = arg;
304         int cmp;
305
306         switch (flags & OBJ_SEARCH_MASK) {
307         case OBJ_SEARCH_OBJECT:
308                 right_key = object_right->uniqueid;
309                 /* Fall through */
310         case OBJ_SEARCH_KEY:
311                 cmp = strcmp(object_left->uniqueid, right_key);
312                 break;
313         case OBJ_SEARCH_PARTIAL_KEY:
314                 /*
315                  * We could also use a partial key struct containing a length
316                  * so strlen() does not get called for every comparison instead.
317                  */
318                 cmp = strncmp(object_left->uniqueid, right_key, strlen(right_key));
319                 break;
320         default:
321                 /*
322                  * What arg points to is specific to this traversal callback
323                  * and has no special meaning to astobj2.
324                  */
325                 cmp = 0;
326                 break;
327         }
328         if (cmp) {
329                 return 0;
330         }
331         /*
332          * At this point the traversal callback is identical to a sorted
333          * container.
334          */
335         return CMP_MATCH;
336 }
337
338 /*!
339  *  Used with app_bridges_moh, provides links between bridges and existing music
340  *  on hold channels that are being used with them.
341  */
342 struct stasis_app_bridge_moh_wrapper {
343         AST_DECLARE_STRING_FIELDS(
344                 AST_STRING_FIELD(channel_id);
345                 AST_STRING_FIELD(bridge_id);
346         );
347 };
348
349 static void stasis_app_bridge_moh_wrapper_destructor(void *obj)
350 {
351         struct stasis_app_bridge_moh_wrapper *wrapper = obj;
352         ast_string_field_free_memory(wrapper);
353 }
354
355 /*! AO2 hash function for the bridges moh container */
356 static int bridges_moh_hash_fn(const void *obj, const int flags)
357 {
358         const struct stasis_app_bridge_moh_wrapper *wrapper;
359         const char *key;
360
361         switch (flags & OBJ_SEARCH_MASK) {
362         case OBJ_SEARCH_KEY:
363                 key = obj;
364                 break;
365         case OBJ_SEARCH_OBJECT:
366                 wrapper = obj;
367                 key = wrapper->bridge_id;
368                 break;
369         default:
370                 /* Hash can only work on something with a full key. */
371                 ast_assert(0);
372                 return 0;
373         }
374         return ast_str_hash(key);
375 }
376
377 static int bridges_moh_sort_fn(const void *obj_left, const void *obj_right, const int flags)
378 {
379         const struct stasis_app_bridge_moh_wrapper *left = obj_left;
380         const struct stasis_app_bridge_moh_wrapper *right = obj_right;
381         const char *right_key = obj_right;
382         int cmp;
383
384         switch (flags & OBJ_SEARCH_MASK) {
385         case OBJ_SEARCH_OBJECT:
386                 right_key = right->bridge_id;
387                 /* Fall through */
388         case OBJ_SEARCH_KEY:
389                 cmp = strcmp(left->bridge_id, right_key);
390                 break;
391         case OBJ_SEARCH_PARTIAL_KEY:
392                 cmp = strncmp(left->bridge_id, right_key, strlen(right_key));
393                 break;
394         default:
395                 /* Sort can only work on something with a full or partial key. */
396                 ast_assert(0);
397                 cmp = 0;
398                 break;
399         }
400         return cmp;
401 }
402
403 /*! Removes the bridge to music on hold channel link */
404 static void remove_bridge_moh(char *bridge_id)
405 {
406         RAII_VAR(struct stasis_app_bridge_moh_wrapper *, moh_wrapper, ao2_find(app_bridges_moh, bridge_id, OBJ_SEARCH_KEY), ao2_cleanup);
407
408         if (moh_wrapper) {
409                 ao2_unlink_flags(app_bridges_moh, moh_wrapper, OBJ_NOLOCK);
410         }
411         ast_free(bridge_id);
412 }
413
414 /*! After bridge failure callback for moh channels */
415 static void moh_after_bridge_cb_failed(enum ast_bridge_after_cb_reason reason, void *data)
416 {
417         char *bridge_id = data;
418
419         remove_bridge_moh(bridge_id);
420 }
421
422 /*! After bridge callback for moh channels */
423 static void moh_after_bridge_cb(struct ast_channel *chan, void *data)
424 {
425         char *bridge_id = data;
426
427         remove_bridge_moh(bridge_id);
428 }
429
430 /*! Request a bridge MOH channel */
431 static struct ast_channel *prepare_bridge_moh_channel(void)
432 {
433         RAII_VAR(struct ast_format_cap *, cap, NULL, ast_format_cap_destroy);
434         struct ast_format format;
435
436         cap = ast_format_cap_alloc(AST_FORMAT_CAP_FLAG_NOLOCK);
437         if (!cap) {
438                 return NULL;
439         }
440
441         ast_format_cap_add(cap, ast_format_set(&format, AST_FORMAT_SLINEAR, 0));
442
443         return ast_request("Announcer", cap, NULL, "ARI_MOH", NULL);
444 }
445
446 /*! Provides the moh channel with a thread so it can actually play its music */
447 static void *moh_channel_thread(void *data)
448 {
449         struct ast_channel *moh_channel = data;
450
451         while (!ast_safe_sleep(moh_channel, 1000));
452
453         ast_moh_stop(moh_channel);
454         ast_hangup(moh_channel);
455
456         return NULL;
457 }
458
459 /*!
460  * \internal
461  * \brief Creates, pushes, and links a channel for playing music on hold to bridge
462  *
463  * \param bridge Which bridge this moh channel exists for
464  *
465  * \retval NULL if the channel could not be created, pushed, or linked
466  * \retval Reference to the channel on success
467  */
468 static struct ast_channel *bridge_moh_create(struct ast_bridge *bridge)
469 {
470         RAII_VAR(struct stasis_app_bridge_moh_wrapper *, new_wrapper, NULL, ao2_cleanup);
471         RAII_VAR(char *, bridge_id, ast_strdup(bridge->uniqueid), ast_free);
472         struct ast_channel *chan;
473         pthread_t threadid;
474
475         if (!bridge_id) {
476                 return NULL;
477         }
478
479         chan = prepare_bridge_moh_channel();
480
481         if (!chan) {
482                 return NULL;
483         }
484
485         /* The after bridge callback assumes responsibility of the bridge_id. */
486         ast_bridge_set_after_callback(chan, moh_after_bridge_cb, moh_after_bridge_cb_failed, bridge_id);
487
488         bridge_id = NULL;
489
490         if (ast_unreal_channel_push_to_bridge(chan, bridge,
491                 AST_BRIDGE_CHANNEL_FLAG_IMMOVABLE | AST_BRIDGE_CHANNEL_FLAG_LONELY)) {
492                 ast_hangup(chan);
493                 return NULL;
494         }
495
496         new_wrapper = ao2_alloc_options(sizeof(*new_wrapper), stasis_app_bridge_moh_wrapper_destructor, AO2_ALLOC_OPT_LOCK_NOLOCK);
497         if (!new_wrapper) {
498                 ast_hangup(chan);
499                 return NULL;
500         }
501
502         if (ast_string_field_init(new_wrapper, 32)) {
503                 ast_hangup(chan);
504                 return NULL;
505         }
506
507         ast_string_field_set(new_wrapper, bridge_id, bridge->uniqueid);
508         ast_string_field_set(new_wrapper, channel_id, ast_channel_uniqueid(chan));
509
510         if (!ao2_link(app_bridges_moh, new_wrapper)) {
511                 ast_hangup(chan);
512                 return NULL;
513         }
514
515         if (ast_pthread_create_detached(&threadid, NULL, moh_channel_thread, chan)) {
516                 ast_log(LOG_ERROR, "Failed to create channel thread. Abandoning MOH channel creation.\n");
517                 ao2_unlink_flags(app_bridges_moh, new_wrapper, OBJ_NOLOCK);
518                 ast_hangup(chan);
519                 return NULL;
520         }
521
522         return chan;
523 }
524
525 struct ast_channel *stasis_app_bridge_moh_channel(struct ast_bridge *bridge)
526 {
527         RAII_VAR(struct stasis_app_bridge_moh_wrapper *, moh_wrapper, NULL, ao2_cleanup);
528
529         SCOPED_AO2LOCK(lock, app_bridges_moh);
530
531         moh_wrapper = ao2_find(app_bridges_moh, bridge->uniqueid, OBJ_SEARCH_KEY | OBJ_NOLOCK);
532
533         if (!moh_wrapper) {
534                 struct ast_channel *bridge_moh_channel = bridge_moh_create(bridge);
535                 return bridge_moh_channel;
536         }
537
538         return ast_channel_get_by_name(moh_wrapper->channel_id);
539 }
540
541 int stasis_app_bridge_moh_stop(struct ast_bridge *bridge)
542 {
543         RAII_VAR(struct stasis_app_bridge_moh_wrapper *, moh_wrapper, NULL, ao2_cleanup);
544         struct ast_channel *chan;
545
546         SCOPED_AO2LOCK(lock, app_bridges_moh);
547
548         moh_wrapper = ao2_find(app_bridges_moh, bridge->uniqueid, OBJ_SEARCH_KEY | OBJ_NOLOCK);
549
550         if (!moh_wrapper) {
551                 return -1;
552         }
553
554         chan = ast_channel_get_by_name(moh_wrapper->channel_id);
555         if (!chan) {
556                 return -1;
557         }
558
559         ast_moh_stop(chan);
560         ast_softhangup(chan, AST_CAUSE_NORMAL_CLEARING);
561         ao2_cleanup(chan);
562
563         ao2_unlink_flags(app_bridges_moh, moh_wrapper, OBJ_NOLOCK);
564
565         return 0;
566 }
567
568 struct ast_bridge *stasis_app_bridge_find_by_id(
569         const char *bridge_id)
570 {
571         return ao2_find(app_bridges, bridge_id, OBJ_SEARCH_KEY);
572 }
573
574
575 /*!
576  * \brief In addition to running ao2_cleanup(), this function also removes the
577  * object from the app_controls container.
578  */
579 static void control_unlink(struct stasis_app_control *control)
580 {
581         if (!control) {
582                 return;
583         }
584
585         ao2_unlink_flags(app_controls, control,
586                 OBJ_SEARCH_OBJECT | OBJ_UNLINK | OBJ_NODATA);
587         ao2_cleanup(control);
588 }
589
590 struct ast_bridge *stasis_app_bridge_create(const char *type)
591 {
592         struct ast_bridge *bridge;
593         int capabilities, flags = AST_BRIDGE_FLAG_MERGE_INHIBIT_FROM | AST_BRIDGE_FLAG_MERGE_INHIBIT_TO
594                 | AST_BRIDGE_FLAG_SWAP_INHIBIT_FROM | AST_BRIDGE_FLAG_SWAP_INHIBIT_TO
595                 | AST_BRIDGE_FLAG_TRANSFER_PROHIBITED;
596
597         if (ast_strlen_zero(type) || !strcmp(type, "mixing")) {
598                 capabilities = AST_BRIDGE_CAPABILITY_1TO1MIX |
599                         AST_BRIDGE_CAPABILITY_MULTIMIX |
600                         AST_BRIDGE_CAPABILITY_NATIVE;
601                 flags |= AST_BRIDGE_FLAG_SMART;
602         } else if (!strcmp(type, "holding")) {
603                 capabilities = AST_BRIDGE_CAPABILITY_HOLDING;
604         } else {
605                 return NULL;
606         }
607
608         bridge = ast_bridge_base_new(capabilities, flags);
609         if (bridge) {
610                 ao2_link(app_bridges, bridge);
611         }
612         return bridge;
613 }
614
615 void stasis_app_bridge_destroy(const char *bridge_id)
616 {
617         struct ast_bridge *bridge = stasis_app_bridge_find_by_id(bridge_id);
618         if (!bridge) {
619                 return;
620         }
621         ao2_unlink(app_bridges, bridge);
622         ast_bridge_destroy(bridge, 0);
623 }
624
625 static int send_start_msg(struct app *app, struct ast_channel *chan,
626         int argc, char *argv[])
627 {
628         RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
629         RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
630
631         struct ast_json *json_args;
632         int i;
633
634         ast_assert(chan != NULL);
635
636         /* Set channel info */
637         snapshot = ast_channel_snapshot_create(chan);
638         if (!snapshot) {
639                 return -1;
640         }
641
642         msg = ast_json_pack("{s: s, s: o, s: [], s: o}",
643                 "type", "StasisStart",
644                 "timestamp", ast_json_timeval(ast_tvnow(), NULL),
645                 "args",
646                 "channel", ast_channel_snapshot_to_json(snapshot));
647         if (!msg) {
648                 return -1;
649         }
650
651         /* Append arguments to args array */
652         json_args = ast_json_object_get(msg, "args");
653         ast_assert(json_args != NULL);
654         for (i = 0; i < argc; ++i) {
655                 int r = ast_json_array_append(json_args,
656                                               ast_json_string_create(argv[i]));
657                 if (r != 0) {
658                         ast_log(LOG_ERROR, "Error appending start message\n");
659                         return -1;
660                 }
661         }
662
663         app_send(app, msg);
664         return 0;
665 }
666
667 static int send_end_msg(struct app *app, struct ast_channel *chan)
668 {
669         RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
670         RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
671
672         ast_assert(chan != NULL);
673
674         /* Set channel info */
675         snapshot = ast_channel_snapshot_create(chan);
676         if (snapshot == NULL) {
677                 return -1;
678         }
679
680         msg = ast_json_pack("{s: s, s: o, s: o}",
681                 "type", "StasisEnd",
682                 "timestamp", ast_json_timeval(ast_tvnow(), NULL),
683                 "channel", ast_channel_snapshot_to_json(snapshot));
684         if (!msg) {
685                 return -1;
686         }
687
688         app_send(app, msg);
689         return 0;
690 }
691
692 void stasis_app_control_execute_until_exhausted(struct ast_channel *chan, struct stasis_app_control *control)
693 {
694         while (!control_is_done(control)) {
695                 int command_count = control_dispatch_all(control, chan);
696                 if (command_count == 0 || ast_channel_fdno(chan) == -1) {
697                         break;
698                 }
699         }
700 }
701
702 /*! /brief Stasis dialplan application callback */
703 int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc,
704                     char *argv[])
705 {
706         SCOPED_MODULE_USE(ast_module_info->self);
707
708         RAII_VAR(struct app *, app, NULL, ao2_cleanup);
709         RAII_VAR(struct stasis_app_control *, control, NULL, control_unlink);
710         int res = 0;
711
712         ast_assert(chan != NULL);
713
714         app = ao2_find(apps_registry, app_name, OBJ_SEARCH_KEY);
715         if (!app) {
716                 ast_log(LOG_ERROR,
717                         "Stasis app '%s' not registered\n", app_name);
718                 return -1;
719         }
720         if (!app_is_active(app)) {
721                 ast_log(LOG_ERROR,
722                         "Stasis app '%s' not active\n", app_name);
723                 return -1;
724         }
725
726         control = control_create(chan);
727         if (!control) {
728                 ast_log(LOG_ERROR, "Allocated failed\n");
729                 return -1;
730         }
731         ao2_link(app_controls, control);
732
733         res = send_start_msg(app, chan, argc, argv);
734         if (res != 0) {
735                 ast_log(LOG_ERROR,
736                         "Error sending start message to '%s'\n", app_name);
737                 return -1;
738         }
739
740         res = app_subscribe_channel(app, chan);
741         if (res != 0) {
742                 ast_log(LOG_ERROR, "Error subscribing app '%s' to channel '%s'\n",
743                         app_name, ast_channel_name(chan));
744                 return -1;
745         }
746
747         while (!control_is_done(control)) {
748                 RAII_VAR(struct ast_frame *, f, NULL, ast_frame_dtor);
749                 int r;
750                 int command_count;
751                 struct ast_bridge *last_bridge = NULL;
752                 struct ast_bridge *bridge = NULL;
753
754                 /* Check to see if a bridge absorbed our hangup frame */
755                 if (ast_check_hangup_locked(chan)) {
756                         break;
757                 }
758
759                 last_bridge = bridge;
760                 bridge = stasis_app_get_bridge(control);
761
762                 if (bridge != last_bridge) {
763                         app_unsubscribe_bridge(app, last_bridge);
764                         app_subscribe_bridge(app, bridge);
765                 }
766
767                 if (bridge) {
768                         /* Bridge is handling channel frames */
769                         control_wait(control);
770                         control_dispatch_all(control, chan);
771                         continue;
772                 }
773
774                 r = ast_waitfor(chan, MAX_WAIT_MS);
775
776                 if (r < 0) {
777                         ast_debug(3, "%s: Poll error\n",
778                                   ast_channel_uniqueid(chan));
779                         break;
780                 }
781
782                 command_count = control_dispatch_all(control, chan);
783
784                 if (command_count > 0 && ast_channel_fdno(chan) == -1) {
785                         /* Command drained the channel; wait for next frame */
786                         continue;
787                 }
788
789                 if (r == 0) {
790                         /* Timeout */
791                         continue;
792                 }
793
794                 f = ast_read(chan);
795                 if (!f) {
796                         /* Continue on in the dialplan */
797                         ast_debug(3, "%s: Hangup (no more frames)\n",
798                                 ast_channel_uniqueid(chan));
799                         break;
800                 }
801
802                 if (f->frametype == AST_FRAME_CONTROL) {
803                         if (f->subclass.integer == AST_CONTROL_HANGUP) {
804                                 /* Continue on in the dialplan */
805                                 ast_debug(3, "%s: Hangup\n",
806                                         ast_channel_uniqueid(chan));
807                                 break;
808                         }
809                 }
810         }
811
812         app_unsubscribe_bridge(app, stasis_app_get_bridge(control));
813         app_unsubscribe_channel(app, chan);
814
815         res = send_end_msg(app, chan);
816         if (res != 0) {
817                 ast_log(LOG_ERROR,
818                         "Error sending end message to %s\n", app_name);
819                 return res;
820         }
821
822         /* There's an off chance that app is ready for cleanup. Go ahead
823          * and clean up, just in case
824          */
825         cleanup();
826
827         return res;
828 }
829
830 int stasis_app_send(const char *app_name, struct ast_json *message)
831 {
832         RAII_VAR(struct app *, app, NULL, ao2_cleanup);
833
834         app = ao2_find(apps_registry, app_name, OBJ_SEARCH_KEY);
835         if (!app) {
836                 /* XXX We can do a better job handling late binding, queueing up
837                  * the call for a few seconds to wait for the app to register.
838                  */
839                 ast_log(LOG_WARNING,
840                         "Stasis app '%s' not registered\n", app_name);
841                 return -1;
842         }
843
844         app_send(app, message);
845         return 0;
846 }
847
848 static int append_name(void *obj, void *arg, int flags)
849 {
850         struct app *app = obj;
851         struct ao2_container *apps = arg;
852
853         ast_str_container_add(apps, app_name(app));
854         return 0;
855 }
856
857 struct ao2_container *stasis_app_get_all(void)
858 {
859         RAII_VAR(struct ao2_container *, apps, NULL, ao2_cleanup);
860
861         apps = ast_str_container_alloc(1);
862         if (!apps) {
863                 return NULL;
864         }
865
866         ao2_callback(apps_registry, OBJ_NODATA, append_name, apps);
867
868         return ao2_bump(apps);
869 }
870
871 int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data)
872 {
873         RAII_VAR(struct app *, app, NULL, ao2_cleanup);
874
875         SCOPED_LOCK(apps_lock, apps_registry, ao2_lock, ao2_unlock);
876
877         app = ao2_find(apps_registry, app_name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
878         if (app) {
879                 app_update(app, handler, data);
880         } else {
881                 app = app_create(app_name, handler, data);
882                 if (app) {
883                         ao2_link_flags(apps_registry, app, OBJ_NOLOCK);
884                 } else {
885                         return -1;
886                 }
887         }
888
889         /* We lazily clean up the apps_registry, because it's good enough to
890          * prevent memory leaks, and we're lazy.
891          */
892         cleanup();
893         return 0;
894 }
895
896 void stasis_app_unregister(const char *app_name)
897 {
898         RAII_VAR(struct app *, app, NULL, ao2_cleanup);
899
900         if (!app_name) {
901                 return;
902         }
903
904         app = ao2_find(apps_registry, app_name, OBJ_SEARCH_KEY);
905         if (!app) {
906                 ast_log(LOG_ERROR,
907                         "Stasis app '%s' not registered\n", app_name);
908                 return;
909         }
910
911         app_deactivate(app);
912
913         /* There's a decent chance that app is ready for cleanup. Go ahead
914          * and clean up, just in case
915          */
916         cleanup();
917 }
918
919 struct ast_json *stasis_app_to_json(const char *app_name)
920 {
921         RAII_VAR(struct app *, app, NULL, ao2_cleanup);
922
923         if (app_name) {
924                 app = ao2_find(apps_registry, app_name, OBJ_SEARCH_KEY);
925         }
926
927         if (!app) {
928                 return NULL;
929         }
930
931         return app_to_json(app);
932 }
933
934 #define CHANNEL_SCHEME "channel:"
935 #define BRIDGE_SCHEME "bridge:"
936 #define ENDPOINT_SCHEME "endpoint:"
937
938 /*! Struct for capturing event source information */
939 struct event_source {
940         enum {
941                 EVENT_SOURCE_CHANNEL,
942                 EVENT_SOURCE_BRIDGE,
943                 EVENT_SOURCE_ENDPOINT,
944         } event_source_type;
945         union {
946                 struct ast_channel *channel;
947                 struct ast_bridge *bridge;
948                 struct ast_endpoint *endpoint;
949         };
950 };
951
952 enum stasis_app_subscribe_res stasis_app_subscribe(const char *app_name,
953         const char **event_source_uris, int event_sources_count,
954         struct ast_json **json)
955 {
956         RAII_VAR(struct app *, app, NULL, ao2_cleanup);
957         RAII_VAR(struct event_source *, event_sources, NULL, ast_free);
958         enum stasis_app_subscribe_res res = STASIS_ASR_OK;
959         int i;
960
961         if (app_name) {
962                 app = ao2_find(apps_registry, app_name, OBJ_SEARCH_KEY);
963         }
964
965         if (!app) {
966                 ast_log(LOG_WARNING, "Could not find app '%s'\n",
967                         app_name ? : "(null)");
968                 return STASIS_ASR_APP_NOT_FOUND;
969         }
970
971         event_sources = ast_calloc(event_sources_count, sizeof(*event_sources));
972         if (!event_sources) {
973                 return STASIS_ASR_INTERNAL_ERROR;
974         }
975
976         for (i = 0; res == STASIS_ASR_OK && i < event_sources_count; ++i) {
977                 const char *uri = event_source_uris[i];
978                 ast_debug(3, "%s: Checking %s\n", app_name,
979                         uri);
980                 if (ast_begins_with(uri, CHANNEL_SCHEME)) {
981                         event_sources[i].event_source_type =
982                                 EVENT_SOURCE_CHANNEL;
983                         event_sources[i].channel = ast_channel_get_by_name(
984                                 uri + strlen(CHANNEL_SCHEME));
985                         if (!event_sources[i].channel) {
986                                 ast_log(LOG_WARNING, "Channel not found: %s\n", uri);
987                                 res = STASIS_ASR_EVENT_SOURCE_NOT_FOUND;
988                         }
989                 } else if (ast_begins_with(uri, BRIDGE_SCHEME)) {
990                         event_sources[i].event_source_type =
991                                 EVENT_SOURCE_BRIDGE;
992                         event_sources[i].bridge = stasis_app_bridge_find_by_id(
993                                 uri + strlen(BRIDGE_SCHEME));
994                         if (!event_sources[i].bridge) {
995                                 ast_log(LOG_WARNING, "Bridge not found: %s\n", uri);
996                                 res = STASIS_ASR_EVENT_SOURCE_NOT_FOUND;
997                         }
998                 } else if (ast_begins_with(uri, ENDPOINT_SCHEME)) {
999                         event_sources[i].event_source_type =
1000                                 EVENT_SOURCE_ENDPOINT;
1001                         event_sources[i].endpoint = ast_endpoint_find_by_id(
1002                                 uri + strlen(ENDPOINT_SCHEME));
1003                         if (!event_sources[i].endpoint) {
1004                                 ast_log(LOG_WARNING, "Endpoint not found: %s\n", uri);
1005                                 res = STASIS_ASR_EVENT_SOURCE_NOT_FOUND;
1006                         }
1007                 } else {
1008                         ast_log(LOG_WARNING, "Invalid scheme: %s\n", uri);
1009                         res = STASIS_ASR_EVENT_SOURCE_BAD_SCHEME;
1010                 }
1011         }
1012
1013         for (i = 0; res == STASIS_ASR_OK && i < event_sources_count; ++i) {
1014                 int sub_res = -1;
1015                 ast_debug(1, "%s: Subscribing to %s\n", app_name,
1016                         event_source_uris[i]);
1017
1018                 switch (event_sources[i].event_source_type) {
1019                 case EVENT_SOURCE_CHANNEL:
1020                         sub_res = app_subscribe_channel(app,
1021                                 event_sources[i].channel);
1022                         break;
1023                 case EVENT_SOURCE_BRIDGE:
1024                         sub_res = app_subscribe_bridge(app,
1025                                 event_sources[i].bridge);
1026                         break;
1027                 case EVENT_SOURCE_ENDPOINT:
1028                         sub_res = app_subscribe_endpoint(app,
1029                                 event_sources[i].endpoint);
1030                         break;
1031                 }
1032
1033                 if (sub_res != 0) {
1034                         ast_log(LOG_WARNING,
1035                                 "Error subscribing app '%s' to '%s'\n",
1036                                 app_name, event_source_uris[i]);
1037                         res = STASIS_ASR_INTERNAL_ERROR;
1038                 }
1039         }
1040
1041         if (res == STASIS_ASR_OK && json) {
1042                 ast_debug(1, "%s: Successful; setting results\n", app_name);
1043                 *json = app_to_json(app);
1044         }
1045
1046         for (i = 0; i < event_sources_count; ++i) {
1047                 switch (event_sources[i].event_source_type) {
1048                 case EVENT_SOURCE_CHANNEL:
1049                         event_sources[i].channel =
1050                                 ast_channel_cleanup(event_sources[i].channel);
1051                         break;
1052                 case EVENT_SOURCE_BRIDGE:
1053                         ao2_cleanup(event_sources[i].bridge);
1054                         event_sources[i].bridge = NULL;
1055                         break;
1056                 case EVENT_SOURCE_ENDPOINT:
1057                         ao2_cleanup(event_sources[i].endpoint);
1058                         event_sources[i].endpoint = NULL;
1059                         break;
1060                 }
1061         }
1062
1063         return res;
1064 }
1065
1066 enum stasis_app_subscribe_res stasis_app_unsubscribe(const char *app_name,
1067         const char **event_source_uris, int event_sources_count,
1068         struct ast_json **json)
1069 {
1070         RAII_VAR(struct app *, app, NULL, ao2_cleanup);
1071         enum stasis_app_subscribe_res res = STASIS_ASR_OK;
1072         int i;
1073
1074         if (app_name) {
1075                 ast_log(LOG_WARNING, "Could not find app '%s'\n",
1076                         app_name ? : "(null)");
1077                 app = ao2_find(apps_registry, app_name, OBJ_SEARCH_KEY);
1078         }
1079
1080         if (!app) {
1081                 return STASIS_ASR_APP_NOT_FOUND;
1082         }
1083
1084         /* Validate the input */
1085         for (i = 0; res == STASIS_ASR_OK && i < event_sources_count; ++i) {
1086                 if (ast_begins_with(event_source_uris[i], CHANNEL_SCHEME)) {
1087                         const char *channel_id = event_source_uris[i] +
1088                                 strlen(CHANNEL_SCHEME);
1089                         if (!app_is_subscribed_channel_id(app, channel_id)) {
1090                                 res = STASIS_ASR_EVENT_SOURCE_NOT_FOUND;
1091                         }
1092                 } else if (ast_begins_with(event_source_uris[i], BRIDGE_SCHEME)) {
1093                         const char *bridge_id = event_source_uris[i] +
1094                                 strlen(BRIDGE_SCHEME);
1095                         if (!app_is_subscribed_bridge_id(app, bridge_id)) {
1096                                 res = STASIS_ASR_EVENT_SOURCE_NOT_FOUND;
1097                         }
1098                 } else if (ast_begins_with(event_source_uris[i], ENDPOINT_SCHEME)) {
1099                         const char *endpoint_id = event_source_uris[i] +
1100                                 strlen(ENDPOINT_SCHEME);
1101                         if (!app_is_subscribed_endpoint_id(app, endpoint_id)) {
1102                                 res = STASIS_ASR_EVENT_SOURCE_NOT_FOUND;
1103                         }
1104                 } else {
1105                         res = STASIS_ASR_EVENT_SOURCE_BAD_SCHEME;
1106                 }
1107         }
1108
1109         for (i = 0; res == STASIS_ASR_OK && i < event_sources_count; ++i) {
1110                 if (ast_begins_with(event_source_uris[i], CHANNEL_SCHEME)) {
1111                         const char *channel_id = event_source_uris[i] +
1112                                 strlen(CHANNEL_SCHEME);
1113                         app_unsubscribe_channel_id(app, channel_id);
1114                 } else if (ast_begins_with(event_source_uris[i], BRIDGE_SCHEME)) {
1115                         const char *bridge_id = event_source_uris[i] +
1116                                 strlen(BRIDGE_SCHEME);
1117                         app_unsubscribe_bridge_id(app, bridge_id);
1118                 } else if (ast_begins_with(event_source_uris[i], ENDPOINT_SCHEME)) {
1119                         const char *endpoint_id = event_source_uris[i] +
1120                                 strlen(ENDPOINT_SCHEME);
1121                         app_unsubscribe_endpoint_id(app, endpoint_id);
1122                 }
1123         }
1124
1125         if (res == STASIS_ASR_OK && json) {
1126                 *json = app_to_json(app);
1127         }
1128
1129         return res;
1130 }
1131
1132 void stasis_app_ref(void)
1133 {
1134         ast_module_ref(ast_module_info->self);
1135 }
1136
1137 void stasis_app_unref(void)
1138 {
1139         ast_module_unref(ast_module_info->self);
1140 }
1141
1142 static int load_module(void)
1143 {
1144         apps_registry = ao2_container_alloc(APPS_NUM_BUCKETS, app_hash,
1145                 app_compare);
1146         if (apps_registry == NULL) {
1147                 return AST_MODULE_LOAD_FAILURE;
1148         }
1149
1150         app_controls = ao2_container_alloc(CONTROLS_NUM_BUCKETS, control_hash,
1151                 control_compare);
1152         if (app_controls == NULL) {
1153                 return AST_MODULE_LOAD_FAILURE;
1154         }
1155
1156         app_bridges = ao2_container_alloc(BRIDGES_NUM_BUCKETS, bridges_hash,
1157                 bridges_compare);
1158
1159         app_bridges_moh = ao2_container_alloc_hash(
1160                 AO2_ALLOC_OPT_LOCK_MUTEX, AO2_CONTAINER_ALLOC_OPT_DUPS_REJECT,
1161                 37, bridges_moh_hash_fn, bridges_moh_sort_fn, NULL);
1162
1163         if (!app_bridges_moh) {
1164                 return AST_MODULE_LOAD_FAILURE;
1165         }
1166
1167         return AST_MODULE_LOAD_SUCCESS;
1168 }
1169
1170 static int unload_module(void)
1171 {
1172         ao2_cleanup(apps_registry);
1173         apps_registry = NULL;
1174
1175         ao2_cleanup(app_controls);
1176         app_controls = NULL;
1177
1178         ao2_cleanup(app_bridges);
1179         app_bridges = NULL;
1180
1181         ao2_cleanup(app_bridges_moh);
1182         app_bridges_moh = NULL;
1183
1184         return 0;
1185 }
1186
1187 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS, "Stasis application support",
1188         .load = load_module,
1189         .unload = unload_module,
1190         );