channel locking: Add locking for channel snapshot creation
[asterisk/asterisk.git] / res / res_stasis.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2012 - 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis application support.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  *
25  * <code>res_stasis.so</code> brings together the various components of the
26  * Stasis application infrastructure.
27  *
28  * First, there's the Stasis application handler, stasis_app_exec(). This is
29  * called by <code>app_stasis.so</code> to give control of a channel to the
30  * Stasis application code from the dialplan.
31  *
32  * While a channel is in stasis_app_exec(), it has a \ref stasis_app_control
33  * object, which may be used to control the channel.
34  *
35  * To control the channel, commands may be sent to channel using
36  * stasis_app_send_command() and stasis_app_send_async_command().
37  *
38  * Alongside this, applications may be registered/unregistered using
39  * stasis_app_register()/stasis_app_unregister(). While a channel is in Stasis,
40  * events received on the channel's topic are converted to JSON and forwarded to
41  * the \ref stasis_app_cb. The application may also subscribe to the channel to
42  * continue to receive messages even after the channel has left Stasis, but it
43  * will not be able to control it.
44  *
45  * Given all the stuff that comes together in this module, it's been broken up
46  * into several pieces that are in <code>res/stasis/</code> and compiled into
47  * <code>res_stasis.so</code>.
48  */
49
50 /*** MODULEINFO
51         <support_level>core</support_level>
52  ***/
53
54 #include "asterisk.h"
55
56 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
57
58 #include "asterisk/astobj2.h"
59 #include "asterisk/callerid.h"
60 #include "asterisk/module.h"
61 #include "asterisk/stasis_app_impl.h"
62 #include "asterisk/stasis_channels.h"
63 #include "asterisk/stasis_bridges.h"
64 #include "asterisk/stasis_message_router.h"
65 #include "asterisk/strings.h"
66 #include "stasis/app.h"
67 #include "stasis/control.h"
68 #include "asterisk/core_unreal.h"
69 #include "asterisk/musiconhold.h"
70 #include "asterisk/causes.h"
71 #include "asterisk/stringfields.h"
72 #include "asterisk/bridge_after.h"
73
74 /*! Time to wait for a frame in the application */
75 #define MAX_WAIT_MS 200
76
77 /*!
78  * \brief Number of buckets for the Stasis application hash table.  Remember to
79  * keep it a prime number!
80  */
81 #define APPS_NUM_BUCKETS 127
82
83 /*!
84  * \brief Number of buckets for the Stasis application hash table.  Remember to
85  * keep it a prime number!
86  */
87 #define CONTROLS_NUM_BUCKETS 127
88
89 /*!
90  * \brief Number of buckets for the Stasis bridges hash table.  Remember to
91  * keep it a prime number!
92  */
93 #define BRIDGES_NUM_BUCKETS 127
94
95 /*!
96  * \brief Stasis application container.
97  */
98 struct ao2_container *apps_registry;
99
100 struct ao2_container *app_controls;
101
102 struct ao2_container *app_bridges;
103
104 struct ao2_container *app_bridges_moh;
105
106 const char *stasis_app_name(const struct stasis_app *app)
107 {
108         return app_name(app);
109 }
110
111 /*! AO2 hash function for \ref app */
112 static int app_hash(const void *obj, const int flags)
113 {
114         const struct stasis_app *app;
115         const char *key;
116
117         switch (flags & OBJ_SEARCH_MASK) {
118         case OBJ_SEARCH_KEY:
119                 key = obj;
120                 break;
121         case OBJ_SEARCH_OBJECT:
122                 app = obj;
123                 key = stasis_app_name(app);
124                 break;
125         default:
126                 /* Hash can only work on something with a full key. */
127                 ast_assert(0);
128                 return 0;
129         }
130         return ast_str_hash(key);
131 }
132
133 /*! AO2 comparison function for \ref app */
134 static int app_compare(void *obj, void *arg, int flags)
135 {
136         const struct stasis_app *object_left = obj;
137         const struct stasis_app *object_right = arg;
138         const char *right_key = arg;
139         int cmp;
140
141         switch (flags & OBJ_SEARCH_MASK) {
142         case OBJ_SEARCH_OBJECT:
143                 right_key = stasis_app_name(object_right);
144                 /* Fall through */
145         case OBJ_SEARCH_KEY:
146                 cmp = strcmp(stasis_app_name(object_left), right_key);
147                 break;
148         case OBJ_SEARCH_PARTIAL_KEY:
149                 /*
150                  * We could also use a partial key struct containing a length
151                  * so strlen() does not get called for every comparison instead.
152                  */
153                 cmp = strncmp(stasis_app_name(object_left), right_key, strlen(right_key));
154                 break;
155         default:
156                 /*
157                  * What arg points to is specific to this traversal callback
158                  * and has no special meaning to astobj2.
159                  */
160                 cmp = 0;
161                 break;
162         }
163         if (cmp) {
164                 return 0;
165         }
166         /*
167          * At this point the traversal callback is identical to a sorted
168          * container.
169          */
170         return CMP_MATCH;
171 }
172
173 /*! AO2 hash function for \ref stasis_app_control */
174 static int control_hash(const void *obj, const int flags)
175 {
176         const struct stasis_app_control *control;
177         const char *key;
178
179         switch (flags & OBJ_SEARCH_MASK) {
180         case OBJ_SEARCH_KEY:
181                 key = obj;
182                 break;
183         case OBJ_SEARCH_OBJECT:
184                 control = obj;
185                 key = stasis_app_control_get_channel_id(control);
186                 break;
187         default:
188                 /* Hash can only work on something with a full key. */
189                 ast_assert(0);
190                 return 0;
191         }
192         return ast_str_hash(key);
193 }
194
195 /*! AO2 comparison function for \ref stasis_app_control */
196 static int control_compare(void *obj, void *arg, int flags)
197 {
198         const struct stasis_app_control *object_left = obj;
199         const struct stasis_app_control *object_right = arg;
200         const char *right_key = arg;
201         int cmp;
202
203         switch (flags & OBJ_SEARCH_MASK) {
204         case OBJ_SEARCH_OBJECT:
205                 right_key = stasis_app_control_get_channel_id(object_right);
206                 /* Fall through */
207         case OBJ_SEARCH_KEY:
208                 cmp = strcmp(stasis_app_control_get_channel_id(object_left), right_key);
209                 break;
210         case OBJ_SEARCH_PARTIAL_KEY:
211                 /*
212                  * We could also use a partial key struct containing a length
213                  * so strlen() does not get called for every comparison instead.
214                  */
215                 cmp = strncmp(stasis_app_control_get_channel_id(object_left), right_key, strlen(right_key));
216                 break;
217         default:
218                 /*
219                  * What arg points to is specific to this traversal callback
220                  * and has no special meaning to astobj2.
221                  */
222                 cmp = 0;
223                 break;
224         }
225         if (cmp) {
226                 return 0;
227         }
228         /*
229          * At this point the traversal callback is identical to a sorted
230          * container.
231          */
232         return CMP_MATCH;
233 }
234
235 static int cleanup_cb(void *obj, void *arg, int flags)
236 {
237         struct stasis_app *app = obj;
238
239         if (!app_is_finished(app)) {
240                 return 0;
241         }
242
243         ast_verb(1, "Shutting down application '%s'\n", stasis_app_name(app));
244         app_shutdown(app);
245
246         return CMP_MATCH;
247
248 }
249
250 /*!
251  * \brief Clean up any old apps that we don't need any more.
252  */
253 static void cleanup(void)
254 {
255         ao2_callback(apps_registry, OBJ_MULTIPLE | OBJ_NODATA | OBJ_UNLINK,
256                 cleanup_cb, NULL);
257 }
258
259 struct stasis_app_control *stasis_app_control_create(struct ast_channel *chan)
260 {
261         return control_create(chan);
262 }
263
264 struct stasis_app_control *stasis_app_control_find_by_channel(
265         const struct ast_channel *chan)
266 {
267         if (chan == NULL) {
268                 return NULL;
269         }
270
271         return stasis_app_control_find_by_channel_id(
272                 ast_channel_uniqueid(chan));
273 }
274
275 struct stasis_app_control *stasis_app_control_find_by_channel_id(
276         const char *channel_id)
277 {
278         return ao2_find(app_controls, channel_id, OBJ_SEARCH_KEY);
279 }
280
281 /*! AO2 hash function for bridges container  */
282 static int bridges_hash(const void *obj, const int flags)
283 {
284         const struct ast_bridge *bridge;
285         const char *key;
286
287         switch (flags & OBJ_SEARCH_MASK) {
288         case OBJ_SEARCH_KEY:
289                 key = obj;
290                 break;
291         case OBJ_SEARCH_OBJECT:
292                 bridge = obj;
293                 key = bridge->uniqueid;
294                 break;
295         default:
296                 /* Hash can only work on something with a full key. */
297                 ast_assert(0);
298                 return 0;
299         }
300         return ast_str_hash(key);
301 }
302
303 /*! AO2 comparison function for bridges container */
304 static int bridges_compare(void *obj, void *arg, int flags)
305 {
306         const struct ast_bridge *object_left = obj;
307         const struct ast_bridge *object_right = arg;
308         const char *right_key = arg;
309         int cmp;
310
311         switch (flags & OBJ_SEARCH_MASK) {
312         case OBJ_SEARCH_OBJECT:
313                 right_key = object_right->uniqueid;
314                 /* Fall through */
315         case OBJ_SEARCH_KEY:
316                 cmp = strcmp(object_left->uniqueid, right_key);
317                 break;
318         case OBJ_SEARCH_PARTIAL_KEY:
319                 /*
320                  * We could also use a partial key struct containing a length
321                  * so strlen() does not get called for every comparison instead.
322                  */
323                 cmp = strncmp(object_left->uniqueid, right_key, strlen(right_key));
324                 break;
325         default:
326                 /*
327                  * What arg points to is specific to this traversal callback
328                  * and has no special meaning to astobj2.
329                  */
330                 cmp = 0;
331                 break;
332         }
333         if (cmp) {
334                 return 0;
335         }
336         /*
337          * At this point the traversal callback is identical to a sorted
338          * container.
339          */
340         return CMP_MATCH;
341 }
342
343 /*!
344  *  Used with app_bridges_moh, provides links between bridges and existing music
345  *  on hold channels that are being used with them.
346  */
347 struct stasis_app_bridge_moh_wrapper {
348         AST_DECLARE_STRING_FIELDS(
349                 AST_STRING_FIELD(channel_id);
350                 AST_STRING_FIELD(bridge_id);
351         );
352 };
353
354 static void stasis_app_bridge_moh_wrapper_destructor(void *obj)
355 {
356         struct stasis_app_bridge_moh_wrapper *wrapper = obj;
357         ast_string_field_free_memory(wrapper);
358 }
359
360 /*! AO2 hash function for the bridges moh container */
361 static int bridges_moh_hash_fn(const void *obj, const int flags)
362 {
363         const struct stasis_app_bridge_moh_wrapper *wrapper;
364         const char *key;
365
366         switch (flags & OBJ_SEARCH_MASK) {
367         case OBJ_SEARCH_KEY:
368                 key = obj;
369                 break;
370         case OBJ_SEARCH_OBJECT:
371                 wrapper = obj;
372                 key = wrapper->bridge_id;
373                 break;
374         default:
375                 /* Hash can only work on something with a full key. */
376                 ast_assert(0);
377                 return 0;
378         }
379         return ast_str_hash(key);
380 }
381
382 static int bridges_moh_sort_fn(const void *obj_left, const void *obj_right, const int flags)
383 {
384         const struct stasis_app_bridge_moh_wrapper *left = obj_left;
385         const struct stasis_app_bridge_moh_wrapper *right = obj_right;
386         const char *right_key = obj_right;
387         int cmp;
388
389         switch (flags & OBJ_SEARCH_MASK) {
390         case OBJ_SEARCH_OBJECT:
391                 right_key = right->bridge_id;
392                 /* Fall through */
393         case OBJ_SEARCH_KEY:
394                 cmp = strcmp(left->bridge_id, right_key);
395                 break;
396         case OBJ_SEARCH_PARTIAL_KEY:
397                 cmp = strncmp(left->bridge_id, right_key, strlen(right_key));
398                 break;
399         default:
400                 /* Sort can only work on something with a full or partial key. */
401                 ast_assert(0);
402                 cmp = 0;
403                 break;
404         }
405         return cmp;
406 }
407
408 /*! Removes the bridge to music on hold channel link */
409 static void remove_bridge_moh(char *bridge_id)
410 {
411         ao2_find(app_bridges_moh, bridge_id, OBJ_SEARCH_KEY | OBJ_UNLINK | OBJ_NODATA);
412         ast_free(bridge_id);
413 }
414
415 /*! After bridge failure callback for moh channels */
416 static void moh_after_bridge_cb_failed(enum ast_bridge_after_cb_reason reason, void *data)
417 {
418         char *bridge_id = data;
419
420         remove_bridge_moh(bridge_id);
421 }
422
423 /*! After bridge callback for moh channels */
424 static void moh_after_bridge_cb(struct ast_channel *chan, void *data)
425 {
426         char *bridge_id = data;
427
428         remove_bridge_moh(bridge_id);
429 }
430
431 /*! Request a bridge MOH channel */
432 static struct ast_channel *prepare_bridge_moh_channel(void)
433 {
434         RAII_VAR(struct ast_format_cap *, cap, NULL, ast_format_cap_destroy);
435         struct ast_format format;
436
437         cap = ast_format_cap_alloc(AST_FORMAT_CAP_FLAG_NOLOCK);
438         if (!cap) {
439                 return NULL;
440         }
441
442         ast_format_cap_add(cap, ast_format_set(&format, AST_FORMAT_SLINEAR, 0));
443
444         return ast_request("Announcer", cap, NULL, "ARI_MOH", NULL);
445 }
446
447 /*! Provides the moh channel with a thread so it can actually play its music */
448 static void *moh_channel_thread(void *data)
449 {
450         struct ast_channel *moh_channel = data;
451
452         while (!ast_safe_sleep(moh_channel, 1000)) {
453         }
454
455         ast_moh_stop(moh_channel);
456         ast_hangup(moh_channel);
457
458         return NULL;
459 }
460
461 /*!
462  * \internal
463  * \brief Creates, pushes, and links a channel for playing music on hold to bridge
464  *
465  * \param bridge Which bridge this moh channel exists for
466  *
467  * \retval NULL if the channel could not be created, pushed, or linked
468  * \retval Reference to the channel on success
469  */
470 static struct ast_channel *bridge_moh_create(struct ast_bridge *bridge)
471 {
472         RAII_VAR(struct stasis_app_bridge_moh_wrapper *, new_wrapper, NULL, ao2_cleanup);
473         RAII_VAR(char *, bridge_id, ast_strdup(bridge->uniqueid), ast_free);
474         struct ast_channel *chan;
475         pthread_t threadid;
476
477         if (!bridge_id) {
478                 return NULL;
479         }
480
481         chan = prepare_bridge_moh_channel();
482         if (!chan) {
483                 return NULL;
484         }
485
486         /* The after bridge callback assumes responsibility of the bridge_id. */
487         if (ast_bridge_set_after_callback(chan,
488                 moh_after_bridge_cb, moh_after_bridge_cb_failed, bridge_id)) {
489                 ast_hangup(chan);
490                 return NULL;
491         }
492         bridge_id = NULL;
493
494         if (ast_unreal_channel_push_to_bridge(chan, bridge,
495                 AST_BRIDGE_CHANNEL_FLAG_IMMOVABLE | AST_BRIDGE_CHANNEL_FLAG_LONELY)) {
496                 ast_hangup(chan);
497                 return NULL;
498         }
499
500         new_wrapper = ao2_alloc_options(sizeof(*new_wrapper),
501                 stasis_app_bridge_moh_wrapper_destructor, AO2_ALLOC_OPT_LOCK_NOLOCK);
502         if (!new_wrapper) {
503                 ast_hangup(chan);
504                 return NULL;
505         }
506
507         if (ast_string_field_init(new_wrapper, 32)) {
508                 ast_hangup(chan);
509                 return NULL;
510         }
511         ast_string_field_set(new_wrapper, bridge_id, bridge->uniqueid);
512         ast_string_field_set(new_wrapper, channel_id, ast_channel_uniqueid(chan));
513
514         if (!ao2_link_flags(app_bridges_moh, new_wrapper, OBJ_NOLOCK)) {
515                 ast_hangup(chan);
516                 return NULL;
517         }
518
519         if (ast_pthread_create_detached(&threadid, NULL, moh_channel_thread, chan)) {
520                 ast_log(LOG_ERROR, "Failed to create channel thread. Abandoning MOH channel creation.\n");
521                 ao2_unlink_flags(app_bridges_moh, new_wrapper, OBJ_NOLOCK);
522                 ast_hangup(chan);
523                 return NULL;
524         }
525
526         return chan;
527 }
528
529 struct ast_channel *stasis_app_bridge_moh_channel(struct ast_bridge *bridge)
530 {
531         RAII_VAR(struct stasis_app_bridge_moh_wrapper *, moh_wrapper, NULL, ao2_cleanup);
532
533         {
534                 SCOPED_AO2LOCK(lock, app_bridges_moh);
535
536                 moh_wrapper = ao2_find(app_bridges_moh, bridge->uniqueid, OBJ_SEARCH_KEY | OBJ_NOLOCK);
537                 if (!moh_wrapper) {
538                         return bridge_moh_create(bridge);
539                 }
540         }
541
542         return ast_channel_get_by_name(moh_wrapper->channel_id);
543 }
544
545 int stasis_app_bridge_moh_stop(struct ast_bridge *bridge)
546 {
547         RAII_VAR(struct stasis_app_bridge_moh_wrapper *, moh_wrapper, NULL, ao2_cleanup);
548         struct ast_channel *chan;
549
550         moh_wrapper = ao2_find(app_bridges_moh, bridge->uniqueid, OBJ_SEARCH_KEY | OBJ_UNLINK);
551         if (!moh_wrapper) {
552                 return -1;
553         }
554
555         chan = ast_channel_get_by_name(moh_wrapper->channel_id);
556         if (!chan) {
557                 return -1;
558         }
559
560         ast_moh_stop(chan);
561         ast_softhangup(chan, AST_CAUSE_NORMAL_CLEARING);
562         ao2_cleanup(chan);
563
564         return 0;
565 }
566
567 struct ast_bridge *stasis_app_bridge_find_by_id(
568         const char *bridge_id)
569 {
570         return ao2_find(app_bridges, bridge_id, OBJ_SEARCH_KEY);
571 }
572
573
574 /*!
575  * \brief In addition to running ao2_cleanup(), this function also removes the
576  * object from the app_controls container.
577  */
578 static void control_unlink(struct stasis_app_control *control)
579 {
580         if (!control) {
581                 return;
582         }
583
584         ao2_unlink(app_controls, control);
585         ao2_cleanup(control);
586 }
587
588 struct ast_bridge *stasis_app_bridge_create(const char *type, const char *name)
589 {
590         struct ast_bridge *bridge;
591         int capabilities;
592         int flags = AST_BRIDGE_FLAG_MERGE_INHIBIT_FROM | AST_BRIDGE_FLAG_MERGE_INHIBIT_TO
593                 | AST_BRIDGE_FLAG_SWAP_INHIBIT_FROM | AST_BRIDGE_FLAG_SWAP_INHIBIT_TO
594                 | AST_BRIDGE_FLAG_TRANSFER_PROHIBITED;
595
596         if (ast_strlen_zero(type) || !strcmp(type, "mixing")) {
597                 capabilities = AST_BRIDGE_CAPABILITY_1TO1MIX |
598                         AST_BRIDGE_CAPABILITY_MULTIMIX |
599                         AST_BRIDGE_CAPABILITY_NATIVE;
600                 flags |= AST_BRIDGE_FLAG_SMART;
601         } else if (!strcmp(type, "holding")) {
602                 capabilities = AST_BRIDGE_CAPABILITY_HOLDING;
603         } else {
604                 return NULL;
605         }
606
607         bridge = ast_bridge_base_new(capabilities, flags, "Stasis", name);
608         if (bridge) {
609                 if (!ao2_link(app_bridges, bridge)) {
610                         ast_bridge_destroy(bridge, 0);
611                         bridge = NULL;
612                 }
613         }
614         return bridge;
615 }
616
617 void stasis_app_bridge_destroy(const char *bridge_id)
618 {
619         struct ast_bridge *bridge = stasis_app_bridge_find_by_id(bridge_id);
620         if (!bridge) {
621                 return;
622         }
623         ao2_unlink(app_bridges, bridge);
624         ast_bridge_destroy(bridge, 0);
625 }
626
627 static int send_start_msg(struct stasis_app *app, struct ast_channel *chan,
628         int argc, char *argv[])
629 {
630         RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
631         RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
632
633         struct ast_json *json_args;
634         int i;
635         struct stasis_message_sanitizer *sanitize = stasis_app_get_sanitizer();
636
637         ast_assert(chan != NULL);
638
639         /* Set channel info */
640         ast_channel_lock(chan);
641         snapshot = ast_channel_snapshot_create(chan);
642         ast_channel_unlock(chan);
643         if (!snapshot) {
644                 return -1;
645         }
646
647         if (sanitize && sanitize->channel_snapshot
648                 && sanitize->channel_snapshot(snapshot)) {
649                 return 0;
650         }
651
652         msg = ast_json_pack("{s: s, s: o, s: [], s: o}",
653                 "type", "StasisStart",
654                 "timestamp", ast_json_timeval(ast_tvnow(), NULL),
655                 "args",
656                 "channel", ast_channel_snapshot_to_json(snapshot, NULL));
657         if (!msg) {
658                 return -1;
659         }
660
661         /* Append arguments to args array */
662         json_args = ast_json_object_get(msg, "args");
663         ast_assert(json_args != NULL);
664         for (i = 0; i < argc; ++i) {
665                 int r = ast_json_array_append(json_args,
666                                               ast_json_string_create(argv[i]));
667                 if (r != 0) {
668                         ast_log(LOG_ERROR, "Error appending start message\n");
669                         return -1;
670                 }
671         }
672
673         app_send(app, msg);
674         return 0;
675 }
676
677 static int send_end_msg(struct stasis_app *app, struct ast_channel *chan)
678 {
679         RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
680         RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
681         struct stasis_message_sanitizer *sanitize = stasis_app_get_sanitizer();
682
683         ast_assert(chan != NULL);
684
685         /* Set channel info */
686         ast_channel_lock(chan);
687         snapshot = ast_channel_snapshot_create(chan);
688         ast_channel_unlock(chan);
689         if (snapshot == NULL) {
690                 return -1;
691         }
692
693         if (sanitize && sanitize->channel_snapshot
694                 && sanitize->channel_snapshot(snapshot)) {
695                 return 0;
696         }
697
698         msg = ast_json_pack("{s: s, s: o, s: o}",
699                 "type", "StasisEnd",
700                 "timestamp", ast_json_timeval(ast_tvnow(), NULL),
701                 "channel", ast_channel_snapshot_to_json(snapshot, NULL));
702         if (!msg) {
703                 return -1;
704         }
705
706         app_send(app, msg);
707         return 0;
708 }
709
710 void stasis_app_control_execute_until_exhausted(struct ast_channel *chan, struct stasis_app_control *control)
711 {
712         while (!control_is_done(control)) {
713                 int command_count = control_dispatch_all(control, chan);
714                 if (command_count == 0 || ast_channel_fdno(chan) == -1) {
715                         break;
716                 }
717         }
718 }
719
720 /*! /brief Stasis dialplan application callback */
721 int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc,
722                     char *argv[])
723 {
724         SCOPED_MODULE_USE(ast_module_info->self);
725
726         RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
727         RAII_VAR(struct stasis_app_control *, control, NULL, control_unlink);
728         struct ast_bridge *last_bridge = NULL;
729         int res = 0;
730
731         ast_assert(chan != NULL);
732
733         app = ao2_find(apps_registry, app_name, OBJ_SEARCH_KEY);
734         if (!app) {
735                 ast_log(LOG_ERROR,
736                         "Stasis app '%s' not registered\n", app_name);
737                 return -1;
738         }
739         if (!app_is_active(app)) {
740                 ast_log(LOG_ERROR,
741                         "Stasis app '%s' not active\n", app_name);
742                 return -1;
743         }
744
745         control = control_create(chan);
746         if (!control) {
747                 ast_log(LOG_ERROR, "Allocated failed\n");
748                 return -1;
749         }
750         ao2_link(app_controls, control);
751
752         res = send_start_msg(app, chan, argc, argv);
753         if (res != 0) {
754                 ast_log(LOG_ERROR,
755                         "Error sending start message to '%s'\n", app_name);
756                 return -1;
757         }
758
759         res = app_subscribe_channel(app, chan);
760         if (res != 0) {
761                 ast_log(LOG_ERROR, "Error subscribing app '%s' to channel '%s'\n",
762                         app_name, ast_channel_name(chan));
763                 return -1;
764         }
765
766         while (!control_is_done(control)) {
767                 RAII_VAR(struct ast_frame *, f, NULL, ast_frame_dtor);
768                 int r;
769                 int command_count;
770                 struct ast_bridge *bridge = NULL;
771
772                 /* Check to see if a bridge absorbed our hangup frame */
773                 if (ast_check_hangup_locked(chan)) {
774                         break;
775                 }
776
777                 last_bridge = bridge;
778                 bridge = stasis_app_get_bridge(control);
779
780                 if (bridge != last_bridge) {
781                         app_unsubscribe_bridge(app, last_bridge);
782                         app_subscribe_bridge(app, bridge);
783                 }
784
785                 if (bridge) {
786                         /* Bridge is handling channel frames */
787                         control_wait(control);
788                         control_dispatch_all(control, chan);
789                         continue;
790                 }
791
792                 r = ast_waitfor(chan, MAX_WAIT_MS);
793
794                 if (r < 0) {
795                         ast_debug(3, "%s: Poll error\n",
796                                   ast_channel_uniqueid(chan));
797                         break;
798                 }
799
800                 command_count = control_dispatch_all(control, chan);
801
802                 if (command_count > 0 && ast_channel_fdno(chan) == -1) {
803                         /* Command drained the channel; wait for next frame */
804                         continue;
805                 }
806
807                 if (r == 0) {
808                         /* Timeout */
809                         continue;
810                 }
811
812                 f = ast_read(chan);
813                 if (!f) {
814                         /* Continue on in the dialplan */
815                         ast_debug(3, "%s: Hangup (no more frames)\n",
816                                 ast_channel_uniqueid(chan));
817                         break;
818                 }
819
820                 if (f->frametype == AST_FRAME_CONTROL) {
821                         if (f->subclass.integer == AST_CONTROL_HANGUP) {
822                                 /* Continue on in the dialplan */
823                                 ast_debug(3, "%s: Hangup\n",
824                                         ast_channel_uniqueid(chan));
825                                 break;
826                         }
827                 }
828         }
829
830         app_unsubscribe_bridge(app, stasis_app_get_bridge(control));
831         app_unsubscribe_channel(app, chan);
832
833         res = send_end_msg(app, chan);
834         if (res != 0) {
835                 ast_log(LOG_ERROR,
836                         "Error sending end message to %s\n", app_name);
837                 return res;
838         }
839
840         /* There's an off chance that app is ready for cleanup. Go ahead
841          * and clean up, just in case
842          */
843         cleanup();
844
845         return res;
846 }
847
848 int stasis_app_send(const char *app_name, struct ast_json *message)
849 {
850         RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
851
852         app = ao2_find(apps_registry, app_name, OBJ_SEARCH_KEY);
853         if (!app) {
854                 /* XXX We can do a better job handling late binding, queueing up
855                  * the call for a few seconds to wait for the app to register.
856                  */
857                 ast_log(LOG_WARNING,
858                         "Stasis app '%s' not registered\n", app_name);
859                 return -1;
860         }
861         app_send(app, message);
862         return 0;
863 }
864
865 static struct stasis_app *find_app_by_name(const char *app_name)
866 {
867         struct stasis_app *res = NULL;
868
869         if (!ast_strlen_zero(app_name)) {
870                 res = ao2_find(apps_registry, app_name, OBJ_SEARCH_KEY);
871         }
872
873         if (!res) {
874                 ast_log(LOG_WARNING, "Could not find app '%s'\n",
875                         app_name ? : "(null)");
876         }
877         return res;
878 }
879
880 static int append_name(void *obj, void *arg, int flags)
881 {
882         struct stasis_app *app = obj;
883         struct ao2_container *apps = arg;
884
885         ast_str_container_add(apps, stasis_app_name(app));
886         return 0;
887 }
888
889 struct ao2_container *stasis_app_get_all(void)
890 {
891         RAII_VAR(struct ao2_container *, apps, NULL, ao2_cleanup);
892
893         apps = ast_str_container_alloc(1);
894         if (!apps) {
895                 return NULL;
896         }
897
898         ao2_callback(apps_registry, OBJ_NODATA, append_name, apps);
899
900         return ao2_bump(apps);
901 }
902
903 int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data)
904 {
905         RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
906
907         SCOPED_LOCK(apps_lock, apps_registry, ao2_lock, ao2_unlock);
908
909         app = ao2_find(apps_registry, app_name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
910         if (app) {
911                 app_update(app, handler, data);
912         } else {
913                 app = app_create(app_name, handler, data);
914                 if (app) {
915                         ao2_link_flags(apps_registry, app, OBJ_NOLOCK);
916                 } else {
917                         return -1;
918                 }
919         }
920
921         /* We lazily clean up the apps_registry, because it's good enough to
922          * prevent memory leaks, and we're lazy.
923          */
924         cleanup();
925         return 0;
926 }
927
928 void stasis_app_unregister(const char *app_name)
929 {
930         RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
931
932         if (!app_name) {
933                 return;
934         }
935
936         app = ao2_find(apps_registry, app_name, OBJ_SEARCH_KEY);
937         if (!app) {
938                 ast_log(LOG_ERROR,
939                         "Stasis app '%s' not registered\n", app_name);
940                 return;
941         }
942
943         app_deactivate(app);
944
945         /* There's a decent chance that app is ready for cleanup. Go ahead
946          * and clean up, just in case
947          */
948         cleanup();
949 }
950
951 /*!
952  * \internal \brief List of registered event sources.
953  */
954 AST_RWLIST_HEAD_STATIC(event_sources, stasis_app_event_source);
955
956 void stasis_app_register_event_source(struct stasis_app_event_source *obj)
957 {
958         SCOPED_LOCK(lock, &event_sources, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
959         AST_LIST_INSERT_TAIL(&event_sources, obj, next);
960         /* only need to bump the module ref on non-core sources because the
961            core ones are [un]registered by this module. */
962         if (!stasis_app_is_core_event_source(obj)) {
963                 ast_module_ref(ast_module_info->self);
964         }
965 }
966
967 void stasis_app_unregister_event_source(struct stasis_app_event_source *obj)
968 {
969         struct stasis_app_event_source *source;
970         SCOPED_LOCK(lock, &event_sources, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
971         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&event_sources, source, next) {
972                 if (source == obj) {
973                         AST_RWLIST_REMOVE_CURRENT(next);
974                         if (!stasis_app_is_core_event_source(obj)) {
975                                 ast_module_unref(ast_module_info->self);
976                         }
977                         break;
978                 }
979         }
980         AST_RWLIST_TRAVERSE_SAFE_END;
981 }
982
983 /*!
984  * \internal
985  * \brief Convert event source data to JSON.
986  *
987  * Calls each event source that has a "to_json" handler allowing each
988  * source to add data to the given JSON object.
989  *
990  * \param app application associated with the event source
991  * \param json a json object to "fill"
992  *
993  * \retval The given json object.
994  */
995 static struct ast_json *app_event_sources_to_json(
996         const struct stasis_app *app, struct ast_json *json)
997 {
998         struct stasis_app_event_source *source;
999         SCOPED_LOCK(lock, &event_sources, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
1000         AST_LIST_TRAVERSE(&event_sources, source, next) {
1001                 if (source->to_json) {
1002                         source->to_json(app, json);
1003                 }
1004         }
1005         return json;
1006 }
1007
1008 static struct ast_json *stasis_app_object_to_json(struct stasis_app *app)
1009 {
1010         if (!app) {
1011                 return NULL;
1012         }
1013
1014         return app_event_sources_to_json(app, app_to_json(app));
1015 }
1016
1017 struct ast_json *stasis_app_to_json(const char *app_name)
1018 {
1019         RAII_VAR(struct stasis_app *, app, find_app_by_name(app_name), ao2_cleanup);
1020
1021         return stasis_app_object_to_json(app);
1022 }
1023
1024 /*!
1025  * \internal
1026  * \brief Finds an event source that matches a uri scheme.
1027  *
1028  * Uri(s) should begin with a particular scheme that can be matched
1029  * against an event source.
1030  *
1031  * \param uri uri containing a scheme to match
1032  *
1033  * \retval an event source if found, NULL otherwise.
1034  */
1035 static struct stasis_app_event_source *app_event_source_find(const char *uri)
1036 {
1037         struct stasis_app_event_source *source;
1038         SCOPED_LOCK(lock, &event_sources, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
1039         AST_LIST_TRAVERSE(&event_sources, source, next) {
1040                 if (ast_begins_with(uri, source->scheme)) {
1041                         return source;
1042                 }
1043         }
1044         return NULL;
1045 }
1046
1047 /*!
1048  * \internal
1049  * \brief Callback for subscription handling
1050  *
1051  * \param app [un]subscribing application
1052  * \param uri scheme:id of an event source
1053  * \param event_source being [un]subscribed [from]to
1054  *
1055  * \retval stasis_app_subscribe_res return code.
1056  */
1057 typedef enum stasis_app_subscribe_res (*app_subscription_handler)(
1058         struct stasis_app *app, const char *uri,
1059         struct stasis_app_event_source *event_source);
1060
1061 /*!
1062  * \internal
1063  * \brief Subscriptions handler for application [un]subscribing.
1064  *
1065  * \param app_name Name of the application to subscribe.
1066  * \param event_source_uris URIs for the event sources to subscribe to.
1067  * \param event_sources_count Array size of event_source_uris.
1068  * \param json Optional output pointer for JSON representation of the app
1069  *             after adding the subscription.
1070  * \param handler [un]subscribe handler
1071  *
1072  * \retval stasis_app_subscribe_res return code.
1073  */
1074 static enum stasis_app_subscribe_res app_handle_subscriptions(
1075         const char *app_name, const char **event_source_uris,
1076         int event_sources_count, struct ast_json **json,
1077         app_subscription_handler handler)
1078 {
1079         RAII_VAR(struct stasis_app *, app, find_app_by_name(app_name), ao2_cleanup);
1080         int i;
1081
1082         if (!app) {
1083                 return STASIS_ASR_APP_NOT_FOUND;
1084         }
1085
1086         for (i = 0; i < event_sources_count; ++i) {
1087                 const char *uri = event_source_uris[i];
1088                 enum stasis_app_subscribe_res res = STASIS_ASR_INTERNAL_ERROR;
1089                 struct stasis_app_event_source *event_source;
1090
1091                 if (!(event_source = app_event_source_find(uri))) {
1092                         ast_log(LOG_WARNING, "Invalid scheme: %s\n", uri);
1093                         return STASIS_ASR_EVENT_SOURCE_BAD_SCHEME;
1094                 }
1095
1096                 if (handler &&
1097                     ((res = handler(app, uri, event_source)))) {
1098                         return res;
1099                 }
1100         }
1101
1102         if (json) {
1103                 ast_debug(3, "%s: Successful; setting results\n", app_name);
1104                 *json = stasis_app_object_to_json(app);
1105         }
1106
1107         return STASIS_ASR_OK;
1108 }
1109
1110 /*!
1111  * \internal
1112  * \brief Subscribe an app to an event source.
1113  *
1114  * \param app subscribing application
1115  * \param uri scheme:id of an event source
1116  * \param event_source being subscribed to
1117  *
1118  * \retval stasis_app_subscribe_res return code.
1119  */
1120 static enum stasis_app_subscribe_res app_subscribe(
1121         struct stasis_app *app, const char *uri,
1122         struct stasis_app_event_source *event_source)
1123 {
1124         const char *app_name = stasis_app_name(app);
1125         RAII_VAR(void *, obj, NULL, ao2_cleanup);
1126
1127         ast_debug(3, "%s: Checking %s\n", app_name, uri);
1128
1129         if (!event_source->find ||
1130             (!(obj = event_source->find(app, uri + strlen(event_source->scheme))))) {
1131                 ast_log(LOG_WARNING, "Event source not found: %s\n", uri);
1132                 return STASIS_ASR_EVENT_SOURCE_NOT_FOUND;
1133         }
1134
1135         ast_debug(3, "%s: Subscribing to %s\n", app_name, uri);
1136
1137         if (!event_source->subscribe || (event_source->subscribe(app, obj))) {
1138                 ast_log(LOG_WARNING, "Error subscribing app '%s' to '%s'\n",
1139                         app_name, uri);
1140                 return STASIS_ASR_INTERNAL_ERROR;
1141         }
1142
1143         return STASIS_ASR_OK;
1144 }
1145
1146 enum stasis_app_subscribe_res stasis_app_subscribe(const char *app_name,
1147         const char **event_source_uris, int event_sources_count,
1148         struct ast_json **json)
1149 {
1150         return app_handle_subscriptions(
1151                 app_name, event_source_uris, event_sources_count,
1152                 json, app_subscribe);
1153 }
1154
1155 /*!
1156  * \internal
1157  * \brief Unsubscribe an app from an event source.
1158  *
1159  * \param app application to unsubscribe
1160  * \param uri scheme:id of an event source
1161  * \param event_source being unsubscribed from
1162  *
1163  * \retval stasis_app_subscribe_res return code.
1164  */
1165 static enum stasis_app_subscribe_res app_unsubscribe(
1166         struct stasis_app *app, const char *uri,
1167         struct stasis_app_event_source *event_source)
1168 {
1169         const char *app_name = stasis_app_name(app);
1170         const char *id = uri + strlen(event_source->scheme);
1171
1172         if (!event_source->is_subscribed ||
1173             (!event_source->is_subscribed(app, id))) {
1174                 return STASIS_ASR_EVENT_SOURCE_NOT_FOUND;
1175         }
1176
1177         ast_debug(3, "%s: Unsubscribing from %s\n", app_name, uri);
1178
1179         if (!event_source->unsubscribe || (event_source->unsubscribe(app, id))) {
1180                 ast_log(LOG_WARNING, "Error unsubscribing app '%s' to '%s'\n",
1181                         app_name, uri);
1182                 return -1;
1183         }
1184         return 0;
1185 }
1186
1187 enum stasis_app_subscribe_res stasis_app_unsubscribe(const char *app_name,
1188         const char **event_source_uris, int event_sources_count,
1189         struct ast_json **json)
1190 {
1191         return app_handle_subscriptions(
1192                 app_name, event_source_uris, event_sources_count,
1193                 json, app_unsubscribe);
1194 }
1195
1196 void stasis_app_ref(void)
1197 {
1198         ast_module_ref(ast_module_info->self);
1199 }
1200
1201 void stasis_app_unref(void)
1202 {
1203         ast_module_unref(ast_module_info->self);
1204 }
1205
1206 static int unload_module(void)
1207 {
1208         stasis_app_unregister_event_sources();
1209
1210         ao2_cleanup(apps_registry);
1211         apps_registry = NULL;
1212
1213         ao2_cleanup(app_controls);
1214         app_controls = NULL;
1215
1216         ao2_cleanup(app_bridges);
1217         app_bridges = NULL;
1218
1219         ao2_cleanup(app_bridges_moh);
1220         app_bridges_moh = NULL;
1221
1222         return 0;
1223 }
1224
1225 /* \brief Sanitization callback for channel snapshots */
1226 static int channel_snapshot_sanitizer(const struct ast_channel_snapshot *snapshot)
1227 {
1228         if (!snapshot || !(snapshot->tech_properties & AST_CHAN_TP_INTERNAL)) {
1229                 return 0;
1230         }
1231         return 1;
1232 }
1233
1234 /* \brief Sanitization callback for channel unique IDs */
1235 static int channel_id_sanitizer(const char *id)
1236 {
1237         RAII_VAR(struct ast_channel_snapshot *, snapshot, ast_channel_snapshot_get_latest(id), ao2_cleanup);
1238
1239         return channel_snapshot_sanitizer(snapshot);
1240 }
1241
1242 /* \brief Sanitization callbacks for communication to Stasis applications */
1243 struct stasis_message_sanitizer app_sanitizer = {
1244         .channel_id = channel_id_sanitizer,
1245         .channel_snapshot = channel_snapshot_sanitizer,
1246 };
1247
1248 struct stasis_message_sanitizer *stasis_app_get_sanitizer(void)
1249 {
1250         return &app_sanitizer;
1251 }
1252
1253 static int load_module(void)
1254 {
1255         apps_registry = ao2_container_alloc(APPS_NUM_BUCKETS, app_hash, app_compare);
1256         app_controls = ao2_container_alloc(CONTROLS_NUM_BUCKETS, control_hash, control_compare);
1257         app_bridges = ao2_container_alloc(BRIDGES_NUM_BUCKETS, bridges_hash, bridges_compare);
1258         app_bridges_moh = ao2_container_alloc_hash(
1259                 AO2_ALLOC_OPT_LOCK_MUTEX, AO2_CONTAINER_ALLOC_OPT_DUPS_REJECT,
1260                 37, bridges_moh_hash_fn, bridges_moh_sort_fn, NULL);
1261         if (!apps_registry || !app_controls || !app_bridges || !app_bridges_moh) {
1262                 unload_module();
1263                 return AST_MODULE_LOAD_FAILURE;
1264         }
1265
1266         stasis_app_register_event_sources();
1267
1268         return AST_MODULE_LOAD_SUCCESS;
1269 }
1270
1271 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS, "Stasis application support",
1272         .load = load_module,
1273         .unload = unload_module,
1274         );