f0204c8ea621f90883c908078adea9b189b18319
[asterisk/asterisk.git] / res / res_stasis.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2012 - 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis application support.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  *
25  * <code>res_stasis.so</code> brings together the various components of the
26  * Stasis application infrastructure.
27  *
28  * First, there's the Stasis application handler, stasis_app_exec(). This is
29  * called by <code>app_stasis.so</code> to give control of a channel to the
30  * Stasis application code from the dialplan.
31  *
32  * While a channel is in stasis_app_exec(), it has a \ref stasis_app_control
33  * object, which may be used to control the channel.
34  *
35  * To control the channel, commands may be sent to channel using
36  * stasis_app_send_command() and stasis_app_send_async_command().
37  *
38  * Alongside this, applications may be registered/unregistered using
39  * stasis_app_register()/stasis_app_unregister(). While a channel is in Stasis,
40  * events received on the channel's topic are converted to JSON and forwarded to
41  * the \ref stasis_app_cb. The application may also subscribe to the channel to
42  * continue to receive messages even after the channel has left Stasis, but it
43  * will not be able to control it.
44  *
45  * Given all the stuff that comes together in this module, it's been broken up
46  * into several pieces that are in <code>res/stasis/</code> and compiled into
47  * <code>res_stasis.so</code>.
48  */
49
50 /*** MODULEINFO
51         <support_level>core</support_level>
52  ***/
53
54 #include "asterisk.h"
55
56 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
57
58 #include "asterisk/astobj2.h"
59 #include "asterisk/callerid.h"
60 #include "asterisk/module.h"
61 #include "asterisk/stasis_app_impl.h"
62 #include "asterisk/stasis_channels.h"
63 #include "asterisk/stasis_bridges.h"
64 #include "asterisk/stasis_message_router.h"
65 #include "asterisk/strings.h"
66 #include "stasis/app.h"
67 #include "stasis/control.h"
68 #include "asterisk/core_unreal.h"
69 #include "asterisk/musiconhold.h"
70 #include "asterisk/causes.h"
71 #include "asterisk/stringfields.h"
72 #include "asterisk/bridge_after.h"
73
74 /*! Time to wait for a frame in the application */
75 #define MAX_WAIT_MS 200
76
77 /*!
78  * \brief Number of buckets for the Stasis application hash table.  Remember to
79  * keep it a prime number!
80  */
81 #define APPS_NUM_BUCKETS 127
82
83 /*!
84  * \brief Number of buckets for the Stasis application hash table.  Remember to
85  * keep it a prime number!
86  */
87 #define CONTROLS_NUM_BUCKETS 127
88
89 /*!
90  * \brief Number of buckets for the Stasis bridges hash table.  Remember to
91  * keep it a prime number!
92  */
93 #define BRIDGES_NUM_BUCKETS 127
94
95 /*!
96  * \brief Stasis application container.
97  */
98 struct ao2_container *apps_registry;
99
100 struct ao2_container *app_controls;
101
102 struct ao2_container *app_bridges;
103
104 struct ao2_container *app_bridges_moh;
105
106 /*! AO2 hash function for \ref app */
107 static int app_hash(const void *obj, const int flags)
108 {
109         const struct app *app = obj;
110         const char *name = flags & OBJ_KEY ? obj : app_name(app);
111
112         return ast_str_hash(name);
113 }
114
115 /*! AO2 comparison function for \ref app */
116 static int app_compare(void *lhs, void *rhs, int flags)
117 {
118         const struct app *lhs_app = lhs;
119         const struct app *rhs_app = rhs;
120         const char *lhs_name = app_name(lhs_app);
121         const char *rhs_name = flags & OBJ_KEY ? rhs : app_name(rhs_app);
122
123         if (strcmp(lhs_name, rhs_name) == 0) {
124                 return CMP_MATCH | CMP_STOP;
125         } else {
126                 return 0;
127         }
128 }
129
130 /*! AO2 hash function for \ref stasis_app_control */
131 static int control_hash(const void *obj, const int flags)
132 {
133         const struct stasis_app_control *control = obj;
134         const char *id = flags & OBJ_KEY ?
135                 obj : stasis_app_control_get_channel_id(control);
136
137         return ast_str_hash(id);
138 }
139
140 /*! AO2 comparison function for \ref stasis_app_control */
141 static int control_compare(void *lhs, void *rhs, int flags)
142 {
143         const struct stasis_app_control *lhs_control = lhs;
144         const struct stasis_app_control *rhs_control = rhs;
145         const char *lhs_id = stasis_app_control_get_channel_id(lhs_control);
146         const char *rhs_id = flags & OBJ_KEY ?
147                 rhs : stasis_app_control_get_channel_id(rhs_control);
148
149         if (strcmp(lhs_id, rhs_id) == 0) {
150                 return CMP_MATCH | CMP_STOP;
151         } else {
152                 return 0;
153         }
154 }
155
156 static int cleanup_cb(void *obj, void *arg, int flags)
157 {
158         struct app *app = obj;
159
160         if (!app_is_finished(app)) {
161                 return 0;
162         }
163
164         ast_verb(1, "Shutting down application '%s'\n", app_name(app));
165         app_shutdown(app);
166
167         return CMP_MATCH;
168
169 }
170
171 /*!
172  * \brief Clean up any old apps that we don't need any more.
173  */
174 static void cleanup(void)
175 {
176         ao2_callback(apps_registry, OBJ_MULTIPLE | OBJ_NODATA | OBJ_UNLINK,
177                 cleanup_cb, NULL);
178 }
179
180 struct stasis_app_control *stasis_app_control_create(struct ast_channel *chan)
181 {
182         return control_create(chan);
183 }
184
185 struct stasis_app_control *stasis_app_control_find_by_channel(
186         const struct ast_channel *chan)
187 {
188         if (chan == NULL) {
189                 return NULL;
190         }
191
192         return stasis_app_control_find_by_channel_id(
193                 ast_channel_uniqueid(chan));
194 }
195
196 struct stasis_app_control *stasis_app_control_find_by_channel_id(
197         const char *channel_id)
198 {
199         return ao2_find(app_controls, channel_id, OBJ_KEY);
200 }
201
202 /*! AO2 hash function for bridges container  */
203 static int bridges_hash(const void *obj, const int flags)
204 {
205         const struct ast_bridge *bridge = obj;
206         const char *id = flags & OBJ_KEY ?
207                 obj : bridge->uniqueid;
208
209         return ast_str_hash(id);
210 }
211
212 /*! AO2 comparison function for bridges container */
213 static int bridges_compare(void *lhs, void *rhs, int flags)
214 {
215         const struct ast_bridge *lhs_bridge = lhs;
216         const struct ast_bridge *rhs_bridge = rhs;
217         const char *lhs_id = lhs_bridge->uniqueid;
218         const char *rhs_id = flags & OBJ_KEY ?
219                 rhs : rhs_bridge->uniqueid;
220
221         if (strcmp(lhs_id, rhs_id) == 0) {
222                 return CMP_MATCH | CMP_STOP;
223         } else {
224                 return 0;
225         }
226 }
227
228 /*!
229  *  Used with app_bridges_moh, provides links between bridges and existing music
230  *  on hold channels that are being used with them.
231  */
232 struct stasis_app_bridge_moh_wrapper {
233         AST_DECLARE_STRING_FIELDS(
234                 AST_STRING_FIELD(channel_id);
235                 AST_STRING_FIELD(bridge_id);
236         );
237 };
238
239 static void stasis_app_bridge_moh_wrapper_destructor(void *obj)
240 {
241         struct stasis_app_bridge_moh_wrapper *wrapper = obj;
242         ast_string_field_free_memory(wrapper);
243 }
244
245 /*! AO2 hash function for the bridges moh container */
246 static int bridges_moh_hash_fn(const void *obj, const int flags)
247 {
248         const struct stasis_app_bridge_moh_wrapper *wrapper;
249         const char *key;
250
251         switch (flags & (OBJ_POINTER | OBJ_KEY | OBJ_PARTIAL_KEY)) {
252         case OBJ_KEY:
253                 key = obj;
254                 return ast_str_hash(key);
255         case OBJ_POINTER:
256                 wrapper = obj;
257                 return ast_str_hash(wrapper->bridge_id);
258         default:
259                 /* Hash can only work on something with a full key. */
260                 ast_assert(0);
261                 return 0;
262         }
263 }
264
265 static int bridges_moh_sort_fn(const void *obj_left, const void *obj_right, const int flags)
266 {
267         const struct stasis_app_bridge_moh_wrapper *left = obj_left;
268         const struct stasis_app_bridge_moh_wrapper *right = obj_right;
269         const char *right_key = obj_right;
270         int cmp;
271
272         switch (flags & (OBJ_POINTER | OBJ_KEY | OBJ_PARTIAL_KEY)) {
273         case OBJ_POINTER:
274                 right_key = right->bridge_id;
275                 /* Fall through */
276         case OBJ_KEY:
277                 cmp = strcmp(left->bridge_id, right_key);
278                 break;
279         case OBJ_PARTIAL_KEY:
280                 cmp = strncmp(left->bridge_id, right_key, strlen(right_key));
281                 break;
282         default:
283                 /* Sort can only work on something with a full or partial key. */
284                 ast_assert(0);
285                 cmp = 0;
286                 break;
287         }
288         return cmp;
289 }
290
291 /*! Removes the bridge to music on hold channel link */
292 static void remove_bridge_moh(char *bridge_id)
293 {
294         RAII_VAR(struct stasis_app_bridge_moh_wrapper *, moh_wrapper, ao2_find(app_bridges_moh, bridge_id, OBJ_KEY), ao2_cleanup);
295
296         if (moh_wrapper) {
297                 ao2_unlink_flags(app_bridges_moh, moh_wrapper, OBJ_NOLOCK);
298         }
299         ast_free(bridge_id);
300 }
301
302 /*! After bridge failure callback for moh channels */
303 static void moh_after_bridge_cb_failed(enum ast_bridge_after_cb_reason reason, void *data)
304 {
305         char *bridge_id = data;
306
307         remove_bridge_moh(bridge_id);
308 }
309
310 /*! After bridge callback for moh channels */
311 static void moh_after_bridge_cb(struct ast_channel *chan, void *data)
312 {
313         char *bridge_id = data;
314
315         remove_bridge_moh(bridge_id);
316 }
317
318 /*! Request a bridge MOH channel */
319 static struct ast_channel *prepare_bridge_moh_channel(void)
320 {
321         RAII_VAR(struct ast_format_cap *, cap, NULL, ast_format_cap_destroy);
322         struct ast_format format;
323
324         cap = ast_format_cap_alloc(AST_FORMAT_CAP_FLAG_NOLOCK);
325         if (!cap) {
326                 return NULL;
327         }
328
329         ast_format_cap_add(cap, ast_format_set(&format, AST_FORMAT_SLINEAR, 0));
330
331         return ast_request("Announcer", cap, NULL, "ARI_MOH", NULL);
332 }
333
334 /*! Provides the moh channel with a thread so it can actually play its music */
335 static void *moh_channel_thread(void *data)
336 {
337         struct ast_channel *moh_channel = data;
338
339         while (!ast_safe_sleep(moh_channel, 1000));
340
341         ast_moh_stop(moh_channel);
342         ast_hangup(moh_channel);
343
344         return NULL;
345 }
346
347 /*!
348  * \internal
349  * \brief Creates, pushes, and links a channel for playing music on hold to bridge
350  *
351  * \param bridge Which bridge this moh channel exists for
352  *
353  * \retval NULL if the channel could not be created, pushed, or linked
354  * \retval Reference to the channel on success
355  */
356 static struct ast_channel *bridge_moh_create(struct ast_bridge *bridge)
357 {
358         RAII_VAR(struct stasis_app_bridge_moh_wrapper *, new_wrapper, NULL, ao2_cleanup);
359         RAII_VAR(char *, bridge_id, ast_strdup(bridge->uniqueid), ast_free);
360         struct ast_channel *chan;
361         pthread_t threadid;
362
363         if (!bridge_id) {
364                 return NULL;
365         }
366
367         chan = prepare_bridge_moh_channel();
368
369         if (!chan) {
370                 return NULL;
371         }
372
373         /* The after bridge callback assumes responsibility of the bridge_id. */
374         ast_bridge_set_after_callback(chan, moh_after_bridge_cb, moh_after_bridge_cb_failed, bridge_id);
375
376         bridge_id = NULL;
377
378         if (ast_unreal_channel_push_to_bridge(chan, bridge,
379                 AST_BRIDGE_CHANNEL_FLAG_IMMOVABLE | AST_BRIDGE_CHANNEL_FLAG_LONELY)) {
380                 ast_hangup(chan);
381                 return NULL;
382         }
383
384         new_wrapper = ao2_alloc_options(sizeof(*new_wrapper), stasis_app_bridge_moh_wrapper_destructor, AO2_ALLOC_OPT_LOCK_NOLOCK);
385         if (!new_wrapper) {
386                 ast_hangup(chan);
387                 return NULL;
388         }
389
390         if (ast_string_field_init(new_wrapper, 32)) {
391                 ast_hangup(chan);
392                 return NULL;
393         }
394
395         ast_string_field_set(new_wrapper, bridge_id, bridge->uniqueid);
396         ast_string_field_set(new_wrapper, channel_id, ast_channel_uniqueid(chan));
397
398         if (!ao2_link(app_bridges_moh, new_wrapper)) {
399                 ast_hangup(chan);
400                 return NULL;
401         }
402
403         if (ast_pthread_create_detached(&threadid, NULL, moh_channel_thread, chan)) {
404                 ast_log(LOG_ERROR, "Failed to create channel thread. Abandoning MOH channel creation.\n");
405                 ao2_unlink_flags(app_bridges_moh, new_wrapper, OBJ_NOLOCK);
406                 ast_hangup(chan);
407                 return NULL;
408         }
409
410         return chan;
411 }
412
413 struct ast_channel *stasis_app_bridge_moh_channel(struct ast_bridge *bridge)
414 {
415         RAII_VAR(struct stasis_app_bridge_moh_wrapper *, moh_wrapper, NULL, ao2_cleanup);
416
417         SCOPED_AO2LOCK(lock, app_bridges_moh);
418
419         moh_wrapper = ao2_find(app_bridges_moh, bridge->uniqueid, OBJ_KEY | OBJ_NOLOCK);
420
421         if (!moh_wrapper) {
422                 struct ast_channel *bridge_moh_channel = bridge_moh_create(bridge);
423                 return bridge_moh_channel;
424         }
425
426         return ast_channel_get_by_name(moh_wrapper->channel_id);
427 }
428
429 int stasis_app_bridge_moh_stop(struct ast_bridge *bridge)
430 {
431         RAII_VAR(struct stasis_app_bridge_moh_wrapper *, moh_wrapper, NULL, ao2_cleanup);
432         struct ast_channel *chan;
433
434         SCOPED_AO2LOCK(lock, app_bridges_moh);
435
436         moh_wrapper = ao2_find(app_bridges_moh, bridge->uniqueid, OBJ_KEY | OBJ_NOLOCK);
437
438         if (!moh_wrapper) {
439                 return -1;
440         }
441
442         chan = ast_channel_get_by_name(moh_wrapper->channel_id);
443         if (!chan) {
444                 return -1;
445         }
446
447         ast_moh_stop(chan);
448         ast_softhangup(chan, AST_CAUSE_NORMAL_CLEARING);
449         ao2_cleanup(chan);
450
451         ao2_unlink_flags(app_bridges_moh, moh_wrapper, OBJ_NOLOCK);
452
453         return 0;
454 }
455
456 struct ast_bridge *stasis_app_bridge_find_by_id(
457         const char *bridge_id)
458 {
459         return ao2_find(app_bridges, bridge_id, OBJ_KEY);
460 }
461
462
463 /*!
464  * \brief In addition to running ao2_cleanup(), this function also removes the
465  * object from the app_controls container.
466  */
467 static void control_unlink(struct stasis_app_control *control)
468 {
469         if (!control) {
470                 return;
471         }
472
473         ao2_unlink_flags(app_controls, control,
474                 OBJ_POINTER | OBJ_UNLINK | OBJ_NODATA);
475         ao2_cleanup(control);
476 }
477
478 struct ast_bridge *stasis_app_bridge_create(const char *type)
479 {
480         struct ast_bridge *bridge;
481         int capabilities, flags = AST_BRIDGE_FLAG_MERGE_INHIBIT_FROM | AST_BRIDGE_FLAG_MERGE_INHIBIT_TO
482                 | AST_BRIDGE_FLAG_SWAP_INHIBIT_FROM | AST_BRIDGE_FLAG_SWAP_INHIBIT_TO
483                 | AST_BRIDGE_FLAG_TRANSFER_PROHIBITED;
484
485         if (ast_strlen_zero(type) || !strcmp(type, "mixing")) {
486                 capabilities = AST_BRIDGE_CAPABILITY_1TO1MIX |
487                         AST_BRIDGE_CAPABILITY_MULTIMIX |
488                         AST_BRIDGE_CAPABILITY_NATIVE;
489                 flags |= AST_BRIDGE_FLAG_SMART;
490         } else if (!strcmp(type, "holding")) {
491                 capabilities = AST_BRIDGE_CAPABILITY_HOLDING;
492         } else {
493                 return NULL;
494         }
495
496         bridge = ast_bridge_base_new(capabilities, flags);
497         if (bridge) {
498                 ao2_link(app_bridges, bridge);
499         }
500         return bridge;
501 }
502
503 void stasis_app_bridge_destroy(const char *bridge_id)
504 {
505         struct ast_bridge *bridge = stasis_app_bridge_find_by_id(bridge_id);
506         if (!bridge) {
507                 return;
508         }
509         ao2_unlink(app_bridges, bridge);
510         ast_bridge_destroy(bridge, 0);
511 }
512
513 static int send_start_msg(struct app *app, struct ast_channel *chan,
514         int argc, char *argv[])
515 {
516         RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
517         RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
518
519         struct ast_json *json_args;
520         int i;
521
522         ast_assert(chan != NULL);
523
524         /* Set channel info */
525         snapshot = ast_channel_snapshot_create(chan);
526         if (!snapshot) {
527                 return -1;
528         }
529
530         msg = ast_json_pack("{s: s, s: o, s: [], s: o}",
531                 "type", "StasisStart",
532                 "timestamp", ast_json_timeval(ast_tvnow(), NULL),
533                 "args",
534                 "channel", ast_channel_snapshot_to_json(snapshot));
535         if (!msg) {
536                 return -1;
537         }
538
539         /* Append arguments to args array */
540         json_args = ast_json_object_get(msg, "args");
541         ast_assert(json_args != NULL);
542         for (i = 0; i < argc; ++i) {
543                 int r = ast_json_array_append(json_args,
544                                               ast_json_string_create(argv[i]));
545                 if (r != 0) {
546                         ast_log(LOG_ERROR, "Error appending start message\n");
547                         return -1;
548                 }
549         }
550
551         app_send(app, msg);
552         return 0;
553 }
554
555 static int send_end_msg(struct app *app, struct ast_channel *chan)
556 {
557         RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
558         RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
559
560         ast_assert(chan != NULL);
561
562         /* Set channel info */
563         snapshot = ast_channel_snapshot_create(chan);
564         if (snapshot == NULL) {
565                 return -1;
566         }
567
568         msg = ast_json_pack("{s: s, s: o, s: o}",
569                 "type", "StasisEnd",
570                 "timestamp", ast_json_timeval(ast_tvnow(), NULL),
571                 "channel", ast_channel_snapshot_to_json(snapshot));
572         if (!msg) {
573                 return -1;
574         }
575
576         app_send(app, msg);
577         return 0;
578 }
579
580 void stasis_app_control_execute_until_exhausted(struct ast_channel *chan, struct stasis_app_control *control)
581 {
582         while (!control_is_done(control)) {
583                 int command_count = control_dispatch_all(control, chan);
584                 if (command_count == 0 || ast_channel_fdno(chan) == -1) {
585                         break;
586                 }
587         }
588 }
589
590 /*! /brief Stasis dialplan application callback */
591 int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc,
592                     char *argv[])
593 {
594         SCOPED_MODULE_USE(ast_module_info->self);
595
596         RAII_VAR(struct app *, app, NULL, ao2_cleanup);
597         RAII_VAR(struct stasis_app_control *, control, NULL, control_unlink);
598         int res = 0;
599
600         ast_assert(chan != NULL);
601
602         app = ao2_find(apps_registry, app_name, OBJ_KEY);
603         if (!app) {
604                 ast_log(LOG_ERROR,
605                         "Stasis app '%s' not registered\n", app_name);
606                 return -1;
607         }
608         if (!app_is_active(app)) {
609                 ast_log(LOG_ERROR,
610                         "Stasis app '%s' not active\n", app_name);
611                 return -1;
612         }
613
614         control = control_create(chan);
615         if (!control) {
616                 ast_log(LOG_ERROR, "Allocated failed\n");
617                 return -1;
618         }
619         ao2_link(app_controls, control);
620
621         res = send_start_msg(app, chan, argc, argv);
622         if (res != 0) {
623                 ast_log(LOG_ERROR,
624                         "Error sending start message to '%s'\n", app_name);
625                 return -1;
626         }
627
628         res = app_subscribe_channel(app, chan);
629         if (res != 0) {
630                 ast_log(LOG_ERROR, "Error subscribing app '%s' to channel '%s'\n",
631                         app_name, ast_channel_name(chan));
632                 return -1;
633         }
634
635         while (!control_is_done(control)) {
636                 RAII_VAR(struct ast_frame *, f, NULL, ast_frame_dtor);
637                 int r;
638                 int command_count;
639                 struct ast_bridge *last_bridge = NULL;
640                 struct ast_bridge *bridge = NULL;
641
642                 /* Check to see if a bridge absorbed our hangup frame */
643                 if (ast_check_hangup_locked(chan)) {
644                         break;
645                 }
646
647                 last_bridge = bridge;
648                 bridge = stasis_app_get_bridge(control);
649
650                 if (bridge != last_bridge) {
651                         app_unsubscribe_bridge(app, last_bridge);
652                         app_subscribe_bridge(app, bridge);
653                 }
654
655                 if (bridge) {
656                         /* Bridge is handling channel frames */
657                         control_wait(control);
658                         control_dispatch_all(control, chan);
659                         continue;
660                 }
661
662                 r = ast_waitfor(chan, MAX_WAIT_MS);
663
664                 if (r < 0) {
665                         ast_debug(3, "%s: Poll error\n",
666                                   ast_channel_uniqueid(chan));
667                         break;
668                 }
669
670                 command_count = control_dispatch_all(control, chan);
671
672                 if (command_count > 0 && ast_channel_fdno(chan) == -1) {
673                         /* Command drained the channel; wait for next frame */
674                         continue;
675                 }
676
677                 if (r == 0) {
678                         /* Timeout */
679                         continue;
680                 }
681
682                 f = ast_read(chan);
683                 if (!f) {
684                         /* Continue on in the dialplan */
685                         ast_debug(3, "%s: Hangup (no more frames)\n",
686                                 ast_channel_uniqueid(chan));
687                         break;
688                 }
689
690                 if (f->frametype == AST_FRAME_CONTROL) {
691                         if (f->subclass.integer == AST_CONTROL_HANGUP) {
692                                 /* Continue on in the dialplan */
693                                 ast_debug(3, "%s: Hangup\n",
694                                         ast_channel_uniqueid(chan));
695                                 break;
696                         }
697                 }
698         }
699
700         app_unsubscribe_bridge(app, stasis_app_get_bridge(control));
701         app_unsubscribe_channel(app, chan);
702
703         res = send_end_msg(app, chan);
704         if (res != 0) {
705                 ast_log(LOG_ERROR,
706                         "Error sending end message to %s\n", app_name);
707                 return res;
708         }
709
710         /* There's an off chance that app is ready for cleanup. Go ahead
711          * and clean up, just in case
712          */
713         cleanup();
714
715         return res;
716 }
717
718 int stasis_app_send(const char *app_name, struct ast_json *message)
719 {
720         RAII_VAR(struct app *, app, NULL, ao2_cleanup);
721
722         app = ao2_find(apps_registry, app_name, OBJ_KEY);
723
724         if (!app) {
725                 /* XXX We can do a better job handling late binding, queueing up
726                  * the call for a few seconds to wait for the app to register.
727                  */
728                 ast_log(LOG_WARNING,
729                         "Stasis app '%s' not registered\n", app_name);
730                 return -1;
731         }
732
733         app_send(app, message);
734         return 0;
735 }
736
737 static int append_name(void *obj, void *arg, int flags)
738 {
739         struct app *app = obj;
740         struct ao2_container *apps = arg;
741
742         ast_str_container_add(apps, app_name(app));
743         return 0;
744 }
745
746 struct ao2_container *stasis_app_get_all(void)
747 {
748         RAII_VAR(struct ao2_container *, apps, NULL, ao2_cleanup);
749
750         apps = ast_str_container_alloc(1);
751         if (!apps) {
752                 return NULL;
753         }
754
755         ao2_callback(apps_registry, OBJ_NODATA, append_name, apps);
756
757         return ao2_bump(apps);
758 }
759
760 int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data)
761 {
762         RAII_VAR(struct app *, app, NULL, ao2_cleanup);
763
764         SCOPED_LOCK(apps_lock, apps_registry, ao2_lock, ao2_unlock);
765
766         app = ao2_find(apps_registry, app_name, OBJ_KEY | OBJ_NOLOCK);
767
768         if (app) {
769                 app_update(app, handler, data);
770         } else {
771                 app = app_create(app_name, handler, data);
772                 if (app) {
773                         ao2_link_flags(apps_registry, app, OBJ_NOLOCK);
774                 } else {
775                         return -1;
776                 }
777         }
778
779         /* We lazily clean up the apps_registry, because it's good enough to
780          * prevent memory leaks, and we're lazy.
781          */
782         cleanup();
783         return 0;
784 }
785
786 void stasis_app_unregister(const char *app_name)
787 {
788         RAII_VAR(struct app *, app, NULL, ao2_cleanup);
789
790         if (!app_name) {
791                 return;
792         }
793
794         app = ao2_find(apps_registry, app_name, OBJ_KEY);
795         if (!app) {
796                 ast_log(LOG_ERROR,
797                         "Stasis app '%s' not registered\n", app_name);
798                 return;
799         }
800
801         app_deactivate(app);
802
803         /* There's a decent chance that app is ready for cleanup. Go ahead
804          * and clean up, just in case
805          */
806         cleanup();
807 }
808
809 struct ast_json *stasis_app_to_json(const char *app_name)
810 {
811         RAII_VAR(struct app *, app, NULL, ao2_cleanup);
812
813         if (app_name) {
814                 app = ao2_find(apps_registry, app_name, OBJ_KEY);
815         }
816
817         if (!app) {
818                 return NULL;
819         }
820
821         return app_to_json(app);
822 }
823
824 #define CHANNEL_SCHEME "channel:"
825 #define BRIDGE_SCHEME "bridge:"
826 #define ENDPOINT_SCHEME "endpoint:"
827
828 /*! Struct for capturing event source information */
829 struct event_source {
830         enum {
831                 EVENT_SOURCE_CHANNEL,
832                 EVENT_SOURCE_BRIDGE,
833                 EVENT_SOURCE_ENDPOINT,
834         } event_source_type;
835         union {
836                 struct ast_channel *channel;
837                 struct ast_bridge *bridge;
838                 struct ast_endpoint *endpoint;
839         };
840 };
841
842 enum stasis_app_subscribe_res stasis_app_subscribe(const char *app_name,
843         const char **event_source_uris, int event_sources_count,
844         struct ast_json **json)
845 {
846         RAII_VAR(struct app *, app, NULL, ao2_cleanup);
847         RAII_VAR(struct event_source *, event_sources, NULL, ast_free);
848         enum stasis_app_subscribe_res res = STASIS_ASR_OK;
849         int i;
850
851         if (app_name) {
852                 app = ao2_find(apps_registry, app_name, OBJ_KEY);
853         }
854
855         if (!app) {
856                 ast_log(LOG_WARNING, "Could not find app '%s'\n",
857                         app_name ? : "(null)");
858                 return STASIS_ASR_APP_NOT_FOUND;
859         }
860
861         event_sources = ast_calloc(event_sources_count, sizeof(*event_sources));
862         if (!event_sources) {
863                 return STASIS_ASR_INTERNAL_ERROR;
864         }
865
866         for (i = 0; res == STASIS_ASR_OK && i < event_sources_count; ++i) {
867                 const char *uri = event_source_uris[i];
868                 ast_debug(3, "%s: Checking %s\n", app_name,
869                         uri);
870                 if (ast_begins_with(uri, CHANNEL_SCHEME)) {
871                         event_sources[i].event_source_type =
872                                 EVENT_SOURCE_CHANNEL;
873                         event_sources[i].channel = ast_channel_get_by_name(
874                                 uri + strlen(CHANNEL_SCHEME));
875                         if (!event_sources[i].channel) {
876                                 ast_log(LOG_WARNING, "Channel not found: %s\n", uri);
877                                 res = STASIS_ASR_EVENT_SOURCE_NOT_FOUND;
878                         }
879                 } else if (ast_begins_with(uri, BRIDGE_SCHEME)) {
880                         event_sources[i].event_source_type =
881                                 EVENT_SOURCE_BRIDGE;
882                         event_sources[i].bridge = stasis_app_bridge_find_by_id(
883                                 uri + strlen(BRIDGE_SCHEME));
884                         if (!event_sources[i].bridge) {
885                                 ast_log(LOG_WARNING, "Bridge not found: %s\n", uri);
886                                 res = STASIS_ASR_EVENT_SOURCE_NOT_FOUND;
887                         }
888                 } else if (ast_begins_with(uri, ENDPOINT_SCHEME)) {
889                         event_sources[i].event_source_type =
890                                 EVENT_SOURCE_ENDPOINT;
891                         event_sources[i].endpoint = ast_endpoint_find_by_id(
892                                 uri + strlen(ENDPOINT_SCHEME));
893                         if (!event_sources[i].endpoint) {
894                                 ast_log(LOG_WARNING, "Endpoint not found: %s\n", uri);
895                                 res = STASIS_ASR_EVENT_SOURCE_NOT_FOUND;
896                         }
897                 } else {
898                         ast_log(LOG_WARNING, "Invalid scheme: %s\n", uri);
899                         res = STASIS_ASR_EVENT_SOURCE_BAD_SCHEME;
900                 }
901         }
902
903         for (i = 0; res == STASIS_ASR_OK && i < event_sources_count; ++i) {
904                 int sub_res = -1;
905                 ast_debug(1, "%s: Subscribing to %s\n", app_name,
906                         event_source_uris[i]);
907
908                 switch (event_sources[i].event_source_type) {
909                 case EVENT_SOURCE_CHANNEL:
910                         sub_res = app_subscribe_channel(app,
911                                 event_sources[i].channel);
912                         break;
913                 case EVENT_SOURCE_BRIDGE:
914                         sub_res = app_subscribe_bridge(app,
915                                 event_sources[i].bridge);
916                         break;
917                 case EVENT_SOURCE_ENDPOINT:
918                         sub_res = app_subscribe_endpoint(app,
919                                 event_sources[i].endpoint);
920                         break;
921                 }
922
923                 if (sub_res != 0) {
924                         ast_log(LOG_WARNING,
925                                 "Error subscribing app '%s' to '%s'\n",
926                                 app_name, event_source_uris[i]);
927                         res = STASIS_ASR_INTERNAL_ERROR;
928                 }
929         }
930
931         if (res == STASIS_ASR_OK && json) {
932                 ast_debug(1, "%s: Successful; setting results\n", app_name);
933                 *json = app_to_json(app);
934         }
935
936         for (i = 0; i < event_sources_count; ++i) {
937                 switch (event_sources[i].event_source_type) {
938                 case EVENT_SOURCE_CHANNEL:
939                         event_sources[i].channel =
940                                 ast_channel_cleanup(event_sources[i].channel);
941                         break;
942                 case EVENT_SOURCE_BRIDGE:
943                         ao2_cleanup(event_sources[i].bridge);
944                         event_sources[i].bridge = NULL;
945                         break;
946                 case EVENT_SOURCE_ENDPOINT:
947                         ao2_cleanup(event_sources[i].endpoint);
948                         event_sources[i].endpoint = NULL;
949                         break;
950                 }
951         }
952
953         return res;
954 }
955
956 enum stasis_app_subscribe_res stasis_app_unsubscribe(const char *app_name,
957         const char **event_source_uris, int event_sources_count,
958         struct ast_json **json)
959 {
960         RAII_VAR(struct app *, app, NULL, ao2_cleanup);
961         enum stasis_app_subscribe_res res = STASIS_ASR_OK;
962         int i;
963
964         if (app_name) {
965                 ast_log(LOG_WARNING, "Could not find app '%s'\n",
966                         app_name ? : "(null)");
967                 app = ao2_find(apps_registry, app_name, OBJ_KEY);
968         }
969
970         if (!app) {
971                 return STASIS_ASR_APP_NOT_FOUND;
972         }
973
974         /* Validate the input */
975         for (i = 0; res == STASIS_ASR_OK && i < event_sources_count; ++i) {
976                 if (ast_begins_with(event_source_uris[i], CHANNEL_SCHEME)) {
977                         const char *channel_id = event_source_uris[i] +
978                                 strlen(CHANNEL_SCHEME);
979                         if (!app_is_subscribed_channel_id(app, channel_id)) {
980                                 res = STASIS_ASR_EVENT_SOURCE_NOT_FOUND;
981                         }
982                 } else if (ast_begins_with(event_source_uris[i], BRIDGE_SCHEME)) {
983                         const char *bridge_id = event_source_uris[i] +
984                                 strlen(BRIDGE_SCHEME);
985                         if (!app_is_subscribed_bridge_id(app, bridge_id)) {
986                                 res = STASIS_ASR_EVENT_SOURCE_NOT_FOUND;
987                         }
988                 } else if (ast_begins_with(event_source_uris[i], ENDPOINT_SCHEME)) {
989                         const char *endpoint_id = event_source_uris[i] +
990                                 strlen(ENDPOINT_SCHEME);
991                         if (!app_is_subscribed_endpoint_id(app, endpoint_id)) {
992                                 res = STASIS_ASR_EVENT_SOURCE_NOT_FOUND;
993                         }
994                 } else {
995                         res = STASIS_ASR_EVENT_SOURCE_BAD_SCHEME;
996                 }
997         }
998
999         for (i = 0; res == STASIS_ASR_OK && i < event_sources_count; ++i) {
1000                 if (ast_begins_with(event_source_uris[i], CHANNEL_SCHEME)) {
1001                         const char *channel_id = event_source_uris[i] +
1002                                 strlen(CHANNEL_SCHEME);
1003                         app_unsubscribe_channel_id(app, channel_id);
1004                 } else if (ast_begins_with(event_source_uris[i], BRIDGE_SCHEME)) {
1005                         const char *bridge_id = event_source_uris[i] +
1006                                 strlen(BRIDGE_SCHEME);
1007                         app_unsubscribe_bridge_id(app, bridge_id);
1008                 } else if (ast_begins_with(event_source_uris[i], ENDPOINT_SCHEME)) {
1009                         const char *endpoint_id = event_source_uris[i] +
1010                                 strlen(ENDPOINT_SCHEME);
1011                         app_unsubscribe_endpoint_id(app, endpoint_id);
1012                 }
1013         }
1014
1015         if (res == STASIS_ASR_OK && json) {
1016                 *json = app_to_json(app);
1017         }
1018
1019         return res;
1020 }
1021
1022 void stasis_app_ref(void)
1023 {
1024         ast_module_ref(ast_module_info->self);
1025 }
1026
1027 void stasis_app_unref(void)
1028 {
1029         ast_module_unref(ast_module_info->self);
1030 }
1031
1032 static int load_module(void)
1033 {
1034         apps_registry = ao2_container_alloc(APPS_NUM_BUCKETS, app_hash,
1035                 app_compare);
1036         if (apps_registry == NULL) {
1037                 return AST_MODULE_LOAD_FAILURE;
1038         }
1039
1040         app_controls = ao2_container_alloc(CONTROLS_NUM_BUCKETS, control_hash,
1041                 control_compare);
1042         if (app_controls == NULL) {
1043                 return AST_MODULE_LOAD_FAILURE;
1044         }
1045
1046         app_bridges = ao2_container_alloc(BRIDGES_NUM_BUCKETS, bridges_hash,
1047                 bridges_compare);
1048
1049         app_bridges_moh = ao2_container_alloc_hash(
1050                 AO2_ALLOC_OPT_LOCK_MUTEX, AO2_CONTAINER_ALLOC_OPT_DUPS_REJECT,
1051                 37, bridges_moh_hash_fn, bridges_moh_sort_fn, NULL);
1052
1053         if (!app_bridges_moh) {
1054                 return AST_MODULE_LOAD_FAILURE;
1055         }
1056
1057         return AST_MODULE_LOAD_SUCCESS;
1058 }
1059
1060 static int unload_module(void)
1061 {
1062         ao2_cleanup(apps_registry);
1063         apps_registry = NULL;
1064
1065         ao2_cleanup(app_controls);
1066         app_controls = NULL;
1067
1068         ao2_cleanup(app_bridges);
1069         app_bridges = NULL;
1070
1071         ao2_cleanup(app_bridges_moh);
1072         app_bridges_moh = NULL;
1073
1074         return 0;
1075 }
1076
1077 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS, "Stasis application support",
1078         .load = load_module,
1079         .unload = unload_module,
1080         );