Stasis-HTTP: Flesh out bridge-related capabilities
[asterisk/asterisk.git] / res / res_stasis.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2012 - 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis application support.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  *
25  * <code>res_stasis.so</code> brings together the various components of the
26  * Stasis application infrastructure.
27  *
28  * First, there's the Stasis application handler, stasis_app_exec(). This is
29  * called by <code>app_stasis.so</code> to give control of a channel to the
30  * Stasis application code from the dialplan.
31  *
32  * While a channel is in stasis_app_exec(), it has a \ref stasis_app_control
33  * object, which may be used to control the channel.
34  *
35  * To control the channel, commands may be sent to channel using
36  * stasis_app_send_command() and stasis_app_send_async_command().
37  *
38  * Alongside this, applications may be registered/unregistered using
39  * stasis_app_register()/stasis_app_unregister(). While a channel is in Stasis,
40  * events received on the channel's topic are converted to JSON and forwarded to
41  * the \ref stasis_app_cb. The application may also subscribe to the channel to
42  * continue to receive messages even after the channel has left Stasis, but it
43  * will not be able to control it.
44  *
45  * Given all the stuff that comes together in this module, it's been broken up
46  * into several pieces that are in <code>res/stasis/</code> and compiled into
47  * <code>res_stasis.so</code>.
48  */
49
50 /*** MODULEINFO
51         <depend>res_stasis_json_events</depend>
52         <support_level>core</support_level>
53  ***/
54
55 #include "asterisk.h"
56
57 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
58
59 #include "asterisk/astobj2.h"
60 #include "asterisk/callerid.h"
61 #include "asterisk/module.h"
62 #include "asterisk/stasis_app_impl.h"
63 #include "asterisk/stasis_channels.h"
64 #include "asterisk/stasis_bridging.h"
65 #include "asterisk/stasis_message_router.h"
66 #include "asterisk/strings.h"
67 #include "stasis/app.h"
68 #include "stasis/control.h"
69 #include "stasis_json/resource_events.h"
70
71 /*! Time to wait for a frame in the application */
72 #define MAX_WAIT_MS 200
73
74 /*!
75  * \brief Number of buckets for the Stasis application hash table.  Remember to
76  * keep it a prime number!
77  */
78 #define APPS_NUM_BUCKETS 127
79
80 /*!
81  * \brief Number of buckets for the Stasis application hash table.  Remember to
82  * keep it a prime number!
83  */
84 #define CONTROLS_NUM_BUCKETS 127
85
86 /*!
87  * \brief Stasis application container.
88  */
89 struct ao2_container *apps_registry;
90
91 struct ao2_container *app_controls;
92
93 struct ao2_container *app_bridges;
94
95 /*! \brief Message router for the channel caching topic */
96 struct stasis_message_router *channel_router;
97
98 /*! \brief Message router for the bridge caching topic */
99 struct stasis_message_router *bridge_router;
100
101 /*! AO2 hash function for \ref app */
102 static int app_hash(const void *obj, const int flags)
103 {
104         const struct app *app = obj;
105         const char *name = flags & OBJ_KEY ? obj : app_name(app);
106
107         return ast_str_hash(name);
108 }
109
110 /*! AO2 comparison function for \ref app */
111 static int app_compare(void *lhs, void *rhs, int flags)
112 {
113         const struct app *lhs_app = lhs;
114         const struct app *rhs_app = rhs;
115         const char *lhs_name = app_name(lhs_app);
116         const char *rhs_name = flags & OBJ_KEY ? rhs : app_name(rhs_app);
117
118         if (strcmp(lhs_name, rhs_name) == 0) {
119                 return CMP_MATCH | CMP_STOP;
120         } else {
121                 return 0;
122         }
123 }
124
125 /*! AO2 hash function for \ref stasis_app_control */
126 static int control_hash(const void *obj, const int flags)
127 {
128         const struct stasis_app_control *control = obj;
129         const char *id = flags & OBJ_KEY ?
130                 obj : stasis_app_control_get_channel_id(control);
131
132         return ast_str_hash(id);
133 }
134
135 /*! AO2 comparison function for \ref stasis_app_control */
136 static int control_compare(void *lhs, void *rhs, int flags)
137 {
138         const struct stasis_app_control *lhs_control = lhs;
139         const struct stasis_app_control *rhs_control = rhs;
140         const char *lhs_id = stasis_app_control_get_channel_id(lhs_control);
141         const char *rhs_id = flags & OBJ_KEY ?
142                 rhs : stasis_app_control_get_channel_id(rhs_control);
143
144         if (strcmp(lhs_id, rhs_id) == 0) {
145                 return CMP_MATCH | CMP_STOP;
146         } else {
147                 return 0;
148         }
149 }
150
151 struct stasis_app_control *stasis_app_control_find_by_channel(
152         const struct ast_channel *chan)
153 {
154         if (chan == NULL) {
155                 return NULL;
156         }
157
158         return stasis_app_control_find_by_channel_id(
159                 ast_channel_uniqueid(chan));
160 }
161
162 struct stasis_app_control *stasis_app_control_find_by_channel_id(
163         const char *channel_id)
164 {
165         return ao2_find(app_controls, channel_id, OBJ_KEY);
166 }
167
168 /*! AO2 hash function for bridges container  */
169 static int bridges_hash(const void *obj, const int flags)
170 {
171         const struct ast_bridge *bridge = obj;
172         const char *id = flags & OBJ_KEY ?
173                 obj : bridge->uniqueid;
174
175         return ast_str_hash(id);
176 }
177
178 /*! AO2 comparison function for bridges container */
179 static int bridges_compare(void *lhs, void *rhs, int flags)
180 {
181         const struct ast_bridge *lhs_bridge = lhs;
182         const struct ast_bridge *rhs_bridge = rhs;
183         const char *lhs_id = lhs_bridge->uniqueid;
184         const char *rhs_id = flags & OBJ_KEY ?
185                 rhs : rhs_bridge->uniqueid;
186
187         if (strcmp(lhs_id, rhs_id) == 0) {
188                 return CMP_MATCH | CMP_STOP;
189         } else {
190                 return 0;
191         }
192 }
193
194 struct ast_bridge *stasis_app_bridge_find_by_id(
195         const char *bridge_id)
196 {
197         return ao2_find(app_bridges, bridge_id, OBJ_KEY);
198 }
199
200 /*! \brief Typedef for blob handler callbacks */
201 typedef struct ast_json *(*channel_blob_handler_cb)(struct ast_channel_blob *);
202
203 /*! \brief Callback to check whether an app is watching a given channel */
204 static int app_watching_channel_cb(void *obj, void *arg, int flags)
205 {
206         struct app *app = obj;
207         char *uniqueid = arg;
208
209         return app_is_watching_channel(app, uniqueid) ? CMP_MATCH : 0;
210 }
211
212 /*! \brief Get a container full of apps that are interested in the specified channel */
213 static struct ao2_container *get_apps_watching_channel(const char *uniqueid)
214 {
215         struct ao2_container *watching_apps;
216         char *uniqueid_dup;
217         RAII_VAR(struct ao2_iterator *,watching_apps_iter, NULL, ao2_iterator_destroy);
218         ast_assert(uniqueid != NULL);
219
220         uniqueid_dup = ast_strdupa(uniqueid);
221
222         watching_apps_iter = ao2_callback(apps_registry, OBJ_MULTIPLE, app_watching_channel_cb, uniqueid_dup);
223         watching_apps = watching_apps_iter->c;
224
225         if (!ao2_container_count(watching_apps)) {
226                 return NULL;
227         }
228
229         ao2_ref(watching_apps, +1);
230         return watching_apps_iter->c;
231 }
232
233 /*! \brief Typedef for callbacks that get called on channel snapshot updates */
234 typedef struct ast_json *(*channel_snapshot_monitor)(
235         struct ast_channel_snapshot *old_snapshot,
236         struct ast_channel_snapshot *new_snapshot);
237
238 /*! \brief Handle channel state changes */
239 static struct ast_json *channel_state(
240         struct ast_channel_snapshot *old_snapshot,
241         struct ast_channel_snapshot *new_snapshot)
242 {
243         RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
244         struct ast_channel_snapshot *snapshot = new_snapshot ? new_snapshot : old_snapshot;
245
246         if (!old_snapshot) {
247                 return stasis_json_event_channel_created_create(snapshot);
248         } else if (!new_snapshot) {
249                 json = ast_json_pack("{s: i, s: s}",
250                         "cause", snapshot->hangupcause,
251                         "cause_txt", ast_cause2str(snapshot->hangupcause));
252                 if (!json) {
253                         return NULL;
254                 }
255                 return stasis_json_event_channel_destroyed_create(snapshot, json);
256         } else if (old_snapshot->state != new_snapshot->state) {
257                 return stasis_json_event_channel_state_change_create(snapshot);
258         }
259
260         return NULL;
261 }
262
263 static struct ast_json *channel_dialplan(
264         struct ast_channel_snapshot *old_snapshot,
265         struct ast_channel_snapshot *new_snapshot)
266 {
267         RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
268
269         /* No Newexten event on cache clear */
270         if (!new_snapshot) {
271                 return NULL;
272         }
273
274         /* Empty application is not valid for a Newexten event */
275         if (ast_strlen_zero(new_snapshot->appl)) {
276                 return NULL;
277         }
278
279         if (old_snapshot && ast_channel_snapshot_cep_equal(old_snapshot, new_snapshot)) {
280                 return NULL;
281         }
282
283         json = ast_json_pack("{s: s, s: s}",
284                 "application", new_snapshot->appl,
285                 "application_data", new_snapshot->data);
286         if (!json) {
287                 return NULL;
288         }
289
290         return stasis_json_event_channel_dialplan_create(new_snapshot, json);
291 }
292
293 static struct ast_json *channel_callerid(
294         struct ast_channel_snapshot *old_snapshot,
295         struct ast_channel_snapshot *new_snapshot)
296 {
297         RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
298
299         /* No NewCallerid event on cache clear or first event */
300         if (!old_snapshot || !new_snapshot) {
301                 return NULL;
302         }
303
304         if (ast_channel_snapshot_caller_id_equal(old_snapshot, new_snapshot)) {
305                 return NULL;
306         }
307
308         json = ast_json_pack("{s: i, s: s}",
309                 "caller_presentation", new_snapshot->caller_pres,
310                 "caller_presentation_txt", ast_describe_caller_presentation(new_snapshot->caller_pres));
311         if (!json) {
312                 return NULL;
313         }
314
315         return stasis_json_event_channel_caller_id_create(new_snapshot, json);
316 }
317
318 static struct ast_json *channel_snapshot(
319         struct ast_channel_snapshot *old_snapshot,
320         struct ast_channel_snapshot *new_snapshot)
321 {
322         if (!new_snapshot) {
323                 return NULL;
324         }
325
326         return stasis_json_event_channel_snapshot_create(new_snapshot);
327 }
328
329 channel_snapshot_monitor channel_monitors[] = {
330         channel_snapshot,
331         channel_state,
332         channel_dialplan,
333         channel_callerid
334 };
335
336 static int app_send_cb(void *obj, void *arg, int flags)
337 {
338         struct app *app = obj;
339         struct ast_json *msg = arg;
340
341         app_send(app, msg);
342         return 0;
343 }
344
345 static void sub_channel_snapshot_handler(void *data,
346                 struct stasis_subscription *sub,
347                 struct stasis_topic *topic,
348                 struct stasis_message *message)
349 {
350         RAII_VAR(struct ao2_container *, watching_apps, NULL, ao2_cleanup);
351         struct stasis_cache_update *update = stasis_message_data(message);
352         struct ast_channel_snapshot *new_snapshot = stasis_message_data(update->new_snapshot);
353         struct ast_channel_snapshot *old_snapshot = stasis_message_data(update->old_snapshot);
354         int i;
355
356         watching_apps = get_apps_watching_channel(new_snapshot ? new_snapshot->uniqueid : old_snapshot->uniqueid);
357         if (!watching_apps) {
358                 return;
359         }
360
361         for (i = 0; i < ARRAY_LEN(channel_monitors); ++i) {
362                 RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
363
364                 msg = channel_monitors[i](old_snapshot, new_snapshot);
365                 if (msg) {
366                         ao2_callback(watching_apps, OBJ_NODATA, app_send_cb, msg);
367                 }
368         }
369 }
370
371 static void distribute_message(struct ao2_container *apps, struct ast_json *msg)
372 {
373         ao2_callback(apps, OBJ_NODATA, app_send_cb, msg);
374 }
375
376 static void generic_blob_handler(struct ast_channel_blob *obj, channel_blob_handler_cb handler_cb)
377 {
378         RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
379         RAII_VAR(struct ao2_container *, watching_apps, NULL, ao2_cleanup);
380
381         if (!obj->snapshot) {
382                 return;
383         }
384
385         watching_apps = get_apps_watching_channel(obj->snapshot->uniqueid);
386         if (!watching_apps) {
387                 return;
388         }
389
390         msg = handler_cb(obj);
391         if (!msg) {
392                 return;
393         }
394
395         distribute_message(watching_apps, msg);
396 }
397
398 /*!
399  * \brief In addition to running ao2_cleanup(), this function also removes the
400  * object from the app_controls container.
401  */
402 static void control_unlink(struct stasis_app_control *control)
403 {
404         if (!control) {
405                 return;
406         }
407
408         ao2_unlink_flags(app_controls, control,
409                 OBJ_POINTER | OBJ_UNLINK | OBJ_NODATA);
410         ao2_cleanup(control);
411 }
412
413 struct ast_bridge *stasis_app_bridge_create(const char *type)
414 {
415         struct ast_bridge *bridge;
416         int capabilities, flags = 0;
417         if (ast_strlen_zero(type) || !strcmp(type, "mixing")) {
418                 capabilities = AST_BRIDGE_CAPABILITY_1TO1MIX |
419                         AST_BRIDGE_CAPABILITY_MULTIMIX |
420                         AST_BRIDGE_CAPABILITY_NATIVE;
421                 flags = AST_BRIDGE_FLAG_SMART;
422         } else if (!strcmp(type, "holding")) {
423                 capabilities = AST_BRIDGE_CAPABILITY_HOLDING;
424         } else {
425                 return NULL;
426         }
427
428         bridge = ast_bridge_base_new(capabilities, flags);
429         if (bridge) {
430                 ao2_link(app_bridges, bridge);
431         }
432         return bridge;
433 }
434
435 void stasis_app_bridge_destroy(const char *bridge_id)
436 {
437         struct ast_bridge *bridge = stasis_app_bridge_find_by_id(bridge_id);
438         if (!bridge) {
439                 return;
440         }
441         ao2_unlink(app_bridges, bridge);
442         ast_bridge_destroy(bridge);
443 }
444
445 int app_send_start_msg(struct app *app, struct ast_channel *chan,
446         int argc, char *argv[])
447 {
448         RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
449         RAII_VAR(struct ast_json *, blob, NULL, ast_json_unref);
450         RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
451
452         struct ast_json *json_args;
453         int i;
454
455         ast_assert(chan != NULL);
456
457         /* Set channel info */
458         snapshot = ast_channel_snapshot_create(chan);
459         if (!snapshot) {
460                 return -1;
461         }
462
463         blob = ast_json_pack("{s: []}", "args");
464         if (!blob) {
465                 return -1;
466         }
467
468         /* Append arguments to args array */
469         json_args = ast_json_object_get(blob, "args");
470         ast_assert(json_args != NULL);
471         for (i = 0; i < argc; ++i) {
472                 int r = ast_json_array_append(json_args,
473                                               ast_json_string_create(argv[i]));
474                 if (r != 0) {
475                         ast_log(LOG_ERROR, "Error appending start message\n");
476                         return -1;
477                 }
478         }
479
480         msg = stasis_json_event_stasis_start_create(snapshot, blob);
481         if (!msg) {
482                 return -1;
483         }
484
485         app_send(app, msg);
486         return 0;
487 }
488
489 int app_send_end_msg(struct app *app, struct ast_channel *chan)
490 {
491         RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
492         RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
493
494         ast_assert(chan != NULL);
495
496         /* Set channel info */
497         snapshot = ast_channel_snapshot_create(chan);
498         if (snapshot == NULL) {
499                 return -1;
500         }
501
502         msg = stasis_json_event_stasis_end_create(snapshot);
503         if (!msg) {
504                 return -1;
505         }
506
507         app_send(app, msg);
508         return 0;
509 }
510
511 /*! /brief Stasis dialplan application callback */
512 int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc,
513                     char *argv[])
514 {
515         SCOPED_MODULE_USE(ast_module_info->self);
516
517         RAII_VAR(struct app *, app, NULL, ao2_cleanup);
518         RAII_VAR(struct stasis_app_control *, control, NULL, control_unlink);
519         int res = 0;
520
521         ast_assert(chan != NULL);
522
523         app = ao2_find(apps_registry, app_name, OBJ_KEY);
524         if (!app) {
525                 ast_log(LOG_ERROR,
526                         "Stasis app '%s' not registered\n", app_name);
527                 return -1;
528         }
529
530         control = control_create(chan);
531         if (!control) {
532                 ast_log(LOG_ERROR, "Allocated failed\n");
533                 return -1;
534         }
535         ao2_link(app_controls, control);
536
537         res = app_send_start_msg(app, chan, argc, argv);
538         if (res != 0) {
539                 ast_log(LOG_ERROR,
540                         "Error sending start message to %s\n", app_name);
541                 return res;
542         }
543
544         if (app_add_channel(app, chan)) {
545                 ast_log(LOG_ERROR, "Error adding listener for channel %s to app %s\n", ast_channel_name(chan), app_name);
546                 return -1;
547         }
548
549         while (!control_is_done(control)) {
550                 RAII_VAR(struct ast_frame *, f, NULL, ast_frame_dtor);
551                 int r;
552                 int command_count;
553
554                 r = ast_waitfor(chan, MAX_WAIT_MS);
555
556                 if (r < 0) {
557                         ast_debug(3, "%s: Poll error\n",
558                                   ast_channel_uniqueid(chan));
559                         break;
560                 }
561
562                 command_count = control_dispatch_all(control, chan);
563
564                 if (command_count > 0 && ast_channel_fdno(chan) == -1) {
565                         /* Command drained the channel; wait for next frame */
566                         continue;
567                 }
568
569                 if (r == 0) {
570                         /* Timeout */
571                         continue;
572                 }
573
574                 f = ast_read(chan);
575                 if (!f) {
576                         ast_debug(3,
577                                 "%s: No more frames. Must be done, I guess.\n",
578                                 ast_channel_uniqueid(chan));
579                         break;
580                 }
581
582                 switch (f->frametype) {
583                 case AST_FRAME_CONTROL:
584                         if (f->subclass.integer == AST_CONTROL_HANGUP) {
585                                 /* Continue on in the dialplan */
586                                 ast_debug(3, "%s: Hangup\n",
587                                         ast_channel_uniqueid(chan));
588                                 control_continue(control);
589                         }
590                         break;
591                 default:
592                         /* Not handled; discard */
593                         break;
594                 }
595         }
596
597         app_remove_channel(app, chan);
598         res = app_send_end_msg(app, chan);
599         if (res != 0) {
600                 ast_log(LOG_ERROR,
601                         "Error sending end message to %s\n", app_name);
602                 return res;
603         }
604
605         return res;
606 }
607
608 int stasis_app_send(const char *app_name, struct ast_json *message)
609 {
610         RAII_VAR(struct app *, app, NULL, ao2_cleanup);
611
612         app = ao2_find(apps_registry, app_name, OBJ_KEY);
613
614         if (!app) {
615                 /* XXX We can do a better job handling late binding, queueing up
616                  * the call for a few seconds to wait for the app to register.
617                  */
618                 ast_log(LOG_WARNING,
619                         "Stasis app '%s' not registered\n", app_name);
620                 return -1;
621         }
622
623         app_send(app, message);
624         return 0;
625 }
626
627 int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data)
628 {
629         RAII_VAR(struct app *, app, NULL, ao2_cleanup);
630
631         SCOPED_LOCK(apps_lock, apps_registry, ao2_lock, ao2_unlock);
632
633         app = ao2_find(apps_registry, app_name, OBJ_KEY | OBJ_NOLOCK);
634
635         if (app) {
636                 RAII_VAR(struct ast_json *, blob, NULL, ast_json_unref);
637                 RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
638
639                 blob = ast_json_pack("{s: s}", "application", app_name);
640                 if (blob) {
641                         msg = stasis_json_event_application_replaced_create(blob);
642                         if (msg) {
643                                 app_send(app, msg);
644                         }
645                 }
646
647                 app_update(app, handler, data);
648         } else {
649                 app = app_create(app_name, handler, data);
650                 if (app) {
651                         ao2_link_flags(apps_registry, app, OBJ_NOLOCK);
652                 } else {
653                         return -1;
654                 }
655         }
656
657         return 0;
658 }
659
660 void stasis_app_unregister(const char *app_name)
661 {
662         if (app_name) {
663                 ao2_cleanup(ao2_find(
664                                 apps_registry, app_name, OBJ_KEY | OBJ_UNLINK));
665         }
666 }
667
668 static struct ast_json *handle_blob_dtmf(struct ast_channel_blob *obj)
669 {
670         RAII_VAR(struct ast_json *, extra, NULL, ast_json_unref);
671         RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
672         const char *direction;
673
674         /* To simplify events, we'll only generate on receive */
675         direction = ast_json_string_get(
676                 ast_json_object_get(obj->blob, "direction"));
677
678         if (strcmp("Received", direction) != 0) {
679                 return NULL;
680         }
681
682         extra = ast_json_pack(
683                 "{s: o}",
684                 "digit", ast_json_ref(ast_json_object_get(obj->blob, "digit")));
685         if (!extra) {
686                 return NULL;
687         }
688
689         return stasis_json_event_channel_dtmf_received_create(obj->snapshot, extra);
690 }
691
692 /* To simplify events, we'll only generate on DTMF end (dtmf_end type) */
693 static void sub_dtmf_handler(void *data,
694                 struct stasis_subscription *sub,
695                 struct stasis_topic *topic,
696                 struct stasis_message *message)
697 {
698         struct ast_channel_blob *obj = stasis_message_data(message);
699         generic_blob_handler(obj, handle_blob_dtmf);
700 }
701
702 static struct ast_json *handle_blob_userevent(struct ast_channel_blob *obj)
703 {
704         return stasis_json_event_channel_userevent_create(obj->snapshot, obj->blob);
705 }
706
707 static void sub_userevent_handler(void *data,
708                 struct stasis_subscription *sub,
709                 struct stasis_topic *topic,
710                 struct stasis_message *message)
711 {
712         struct ast_channel_blob *obj = stasis_message_data(message);
713         generic_blob_handler(obj, handle_blob_userevent);
714 }
715
716 static struct ast_json *handle_blob_hangup_request(struct ast_channel_blob *obj)
717 {
718         return stasis_json_event_channel_hangup_request_create(obj->snapshot, obj->blob);
719 }
720
721 static void sub_hangup_request_handler(void *data,
722                 struct stasis_subscription *sub,
723                 struct stasis_topic *topic,
724                 struct stasis_message *message)
725 {
726         struct ast_channel_blob *obj = stasis_message_data(message);
727         generic_blob_handler(obj, handle_blob_hangup_request);
728 }
729
730 static struct ast_json *handle_blob_varset(struct ast_channel_blob *obj)
731 {
732         return stasis_json_event_channel_varset_create(obj->snapshot, obj->blob);
733 }
734
735 static void sub_varset_handler(void *data,
736                 struct stasis_subscription *sub,
737                 struct stasis_topic *topic,
738                 struct stasis_message *message)
739 {
740         struct ast_channel_blob *obj = stasis_message_data(message);
741         generic_blob_handler(obj, handle_blob_varset);
742 }
743
744 void stasis_app_ref(void)
745 {
746         ast_module_ref(ast_module_info->self);
747 }
748
749 void stasis_app_unref(void)
750 {
751         ast_module_unref(ast_module_info->self);
752 }
753
754 /*! \brief Callback to check whether an app is watching a given bridge */
755 static int app_watching_bridge_cb(void *obj, void *arg, int flags)
756 {
757         struct app *app = obj;
758         char *uniqueid = arg;
759
760         return app_is_watching_bridge(app, uniqueid) ? CMP_MATCH : 0;
761 }
762
763 /*! \brief Get a container full of apps that are interested in the specified bridge */
764 static struct ao2_container *get_apps_watching_bridge(const char *uniqueid)
765 {
766         struct ao2_container *watching_apps;
767         char *uniqueid_dup;
768         RAII_VAR(struct ao2_iterator *,watching_apps_iter, NULL, ao2_iterator_destroy);
769         ast_assert(uniqueid != NULL);
770
771         uniqueid_dup = ast_strdupa(uniqueid);
772
773         watching_apps_iter = ao2_callback(apps_registry, OBJ_MULTIPLE, app_watching_bridge_cb, uniqueid_dup);
774         watching_apps = watching_apps_iter->c;
775
776         if (!ao2_container_count(watching_apps)) {
777                 return NULL;
778         }
779
780         ao2_ref(watching_apps, +1);
781         return watching_apps_iter->c;
782 }
783
784 /*! Callback used to remove an app's interest in a bridge */
785 static int remove_bridge_cb(void *obj, void *arg, int flags)
786 {
787         app_remove_bridge(obj, arg);
788         return 0;
789 }
790
791 static void sub_bridge_snapshot_handler(void *data,
792                 struct stasis_subscription *sub,
793                 struct stasis_topic *topic,
794                 struct stasis_message *message)
795 {
796         RAII_VAR(struct ao2_container *, watching_apps, NULL, ao2_cleanup);
797         struct stasis_cache_update *update = stasis_message_data(message);
798         struct ast_bridge_snapshot *new_snapshot = stasis_message_data(update->new_snapshot);
799         struct ast_bridge_snapshot *old_snapshot = stasis_message_data(update->old_snapshot);
800         RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
801
802         watching_apps = get_apps_watching_bridge(new_snapshot ? new_snapshot->uniqueid : old_snapshot->uniqueid);
803         if (!watching_apps || !ao2_container_count(watching_apps)) {
804                 return;
805         }
806
807         if (!new_snapshot) {
808                 RAII_VAR(char *, bridge_id, ast_strdup(old_snapshot->uniqueid), ast_free);
809
810                 /* The bridge has gone away. Create the message, make sure no apps are
811                  * watching this bridge anymore, and destroy the bridge's control
812                  * structure */
813                 msg = stasis_json_event_bridge_destroyed_create(old_snapshot);
814                 ao2_callback(watching_apps, OBJ_NODATA, remove_bridge_cb, bridge_id);
815                 stasis_app_bridge_destroy(old_snapshot->uniqueid);
816         } else if (!old_snapshot) {
817                 msg = stasis_json_event_bridge_created_create(old_snapshot);
818         }
819
820         if (!msg) {
821                 return;
822         }
823
824         distribute_message(watching_apps, msg);
825 }
826
827 /*! \brief Callback used to merge two containers of applications */
828 static int list_merge_cb(void *obj, void *arg, int flags)
829 {
830         /* remove any current entries for this app */
831         ao2_find(arg, obj, OBJ_POINTER | OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE);
832         /* relink as the only entry */
833         ao2_link(arg, obj);
834         return 0;
835 }
836
837 /*! \brief Merge container src into container dst without modifying src */
838 static void update_apps_list(struct ao2_container *dst, struct ao2_container *src)
839 {
840         ao2_callback(src, OBJ_NODATA, list_merge_cb, dst);
841 }
842
843 /*! \brief Callback for adding to an app's bridges of interest */
844 static int app_add_bridge_cb(void *obj, void *arg, int flags)
845 {
846         app_add_bridge(obj, arg);
847         return 0;
848 }
849
850 /*! \brief Add interest in the given bridge to all apps in the container */
851 static void update_bridge_interest(struct ao2_container *apps, const char *bridge_id)
852 {
853         RAII_VAR(char *, bridge_id_dup, ast_strdup(bridge_id), ast_free);
854         ao2_callback(apps, OBJ_NODATA, app_add_bridge_cb, bridge_id_dup);
855 }
856
857 static void sub_bridge_merge_handler(void *data,
858                 struct stasis_subscription *sub,
859                 struct stasis_topic *topic,
860                 struct stasis_message *message)
861 {
862         RAII_VAR(struct ao2_container *, watching_apps_to, NULL, ao2_cleanup);
863         RAII_VAR(struct ao2_container *, watching_apps_from, NULL, ao2_cleanup);
864         RAII_VAR(struct ao2_container *, watching_apps_all, ao2_container_alloc(1, NULL, NULL), ao2_cleanup);
865         struct ast_bridge_merge_message *merge = stasis_message_data(message);
866         RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
867         RAII_VAR(struct ast_json *, blob, NULL, ast_json_unref);
868
869         watching_apps_to = get_apps_watching_bridge(merge->to->uniqueid);
870         if (watching_apps_to) {
871                 update_apps_list(watching_apps_all, watching_apps_to);
872         }
873
874         watching_apps_from = get_apps_watching_bridge(merge->from->uniqueid);
875         if (watching_apps_from) {
876                 update_bridge_interest(watching_apps_from, merge->to->uniqueid);
877                 update_apps_list(watching_apps_all, watching_apps_from);
878         }
879
880         if (!ao2_container_count(watching_apps_all)) {
881                 return;
882         }
883
884         /* The secondary bridge has to be packed into JSON by hand because the auto-generated
885          * JSON event generator can only handle one instance of a given snapshot type in an
886          * elegant way */
887         blob = ast_json_pack("{s: o}", "bridge_from", ast_bridge_snapshot_to_json(merge->from));
888         if (!blob) {
889                 return;
890         }
891
892         msg = stasis_json_event_bridge_merged_create(merge->to, blob);
893
894         distribute_message(watching_apps_all, msg);
895 }
896
897 static void sub_bridge_enter_handler(void *data,
898                 struct stasis_subscription *sub,
899                 struct stasis_topic *topic,
900                 struct stasis_message *message)
901 {
902         RAII_VAR(struct ao2_container *, watching_apps_channel, NULL, ao2_cleanup);
903         RAII_VAR(struct ao2_container *, watching_apps_bridge, NULL, ao2_cleanup);
904         RAII_VAR(struct ao2_container *, watching_apps_all, ao2_container_alloc(1, NULL, NULL), ao2_cleanup);
905         struct ast_bridge_blob *obj = stasis_message_data(message);
906         RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
907
908         watching_apps_bridge = get_apps_watching_bridge(obj->bridge->uniqueid);
909         if (watching_apps_bridge) {
910                 update_apps_list(watching_apps_all, watching_apps_bridge);
911         }
912
913         watching_apps_channel = get_apps_watching_channel(obj->channel->uniqueid);
914         if (watching_apps_channel) {
915                 update_bridge_interest(watching_apps_channel, obj->bridge->uniqueid);
916                 update_apps_list(watching_apps_all, watching_apps_channel);
917         }
918
919         if (!ao2_container_count(watching_apps_all)) {
920                 return;
921         }
922
923         msg = stasis_json_event_channel_entered_bridge_create(obj->bridge, obj->channel);
924
925         distribute_message(watching_apps_all, msg);
926 }
927
928 static void sub_bridge_leave_handler(void *data,
929                 struct stasis_subscription *sub,
930                 struct stasis_topic *topic,
931                 struct stasis_message *message)
932 {
933         RAII_VAR(struct ao2_container *, watching_apps_bridge, NULL, ao2_cleanup);
934         struct ast_bridge_blob *obj = stasis_message_data(message);
935         RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
936
937         watching_apps_bridge = get_apps_watching_bridge(obj->bridge->uniqueid);
938         if (!watching_apps_bridge) {
939                 return;
940         }
941
942         msg = stasis_json_event_channel_left_bridge_create(obj->bridge, obj->channel);
943
944         distribute_message(watching_apps_bridge, msg);
945 }
946
947 static int load_module(void)
948 {
949         int r = 0;
950
951         apps_registry =
952                 ao2_container_alloc(APPS_NUM_BUCKETS, app_hash, app_compare);
953         if (apps_registry == NULL) {
954                 return AST_MODULE_LOAD_FAILURE;
955         }
956
957         app_controls = ao2_container_alloc(CONTROLS_NUM_BUCKETS,
958                                              control_hash, control_compare);
959         if (app_controls == NULL) {
960                 return AST_MODULE_LOAD_FAILURE;
961         }
962
963         app_bridges = ao2_container_alloc(CONTROLS_NUM_BUCKETS,
964                                              bridges_hash, bridges_compare);
965         if (app_bridges == NULL) {
966                 return AST_MODULE_LOAD_FAILURE;
967         }
968
969         channel_router = stasis_message_router_create(stasis_caching_get_topic(ast_channel_topic_all_cached()));
970         if (!channel_router) {
971                 return AST_MODULE_LOAD_FAILURE;
972         }
973
974         r |= stasis_message_router_add(channel_router, stasis_cache_update_type(), sub_channel_snapshot_handler, NULL);
975         r |= stasis_message_router_add(channel_router, ast_channel_user_event_type(), sub_userevent_handler, NULL);
976         r |= stasis_message_router_add(channel_router, ast_channel_varset_type(), sub_varset_handler, NULL);
977         r |= stasis_message_router_add(channel_router, ast_channel_dtmf_begin_type(), sub_dtmf_handler, NULL);
978         r |= stasis_message_router_add(channel_router, ast_channel_hangup_request_type(), sub_hangup_request_handler, NULL);
979         if (r) {
980                 return AST_MODULE_LOAD_FAILURE;
981         }
982
983         bridge_router = stasis_message_router_create(stasis_caching_get_topic(ast_bridge_topic_all_cached()));
984         if (!bridge_router) {
985                 return AST_MODULE_LOAD_FAILURE;
986         }
987
988         r |= stasis_message_router_add(bridge_router, stasis_cache_update_type(), sub_bridge_snapshot_handler, NULL);
989         r |= stasis_message_router_add(bridge_router, ast_bridge_merge_message_type(), sub_bridge_merge_handler, NULL);
990         r |= stasis_message_router_add(bridge_router, ast_channel_entered_bridge_type(), sub_bridge_enter_handler, NULL);
991         r |= stasis_message_router_add(bridge_router, ast_channel_left_bridge_type(), sub_bridge_leave_handler, NULL);
992         if (r) {
993                 return AST_MODULE_LOAD_FAILURE;
994         }
995
996         return AST_MODULE_LOAD_SUCCESS;
997 }
998
999 static int unload_module(void)
1000 {
1001         int r = 0;
1002
1003         stasis_message_router_unsubscribe_and_join(channel_router);
1004         channel_router = NULL;
1005
1006         stasis_message_router_unsubscribe_and_join(bridge_router);
1007         bridge_router = NULL;
1008
1009         ao2_cleanup(apps_registry);
1010         apps_registry = NULL;
1011
1012         ao2_cleanup(app_controls);
1013         app_controls = NULL;
1014
1015         ao2_cleanup(app_bridges);
1016         app_bridges = NULL;
1017
1018         return r;
1019 }
1020
1021 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS,
1022                 "Stasis application support",
1023                 .load = load_module,
1024                 .unload = unload_module);