ARI event type filtering
[asterisk/asterisk.git] / res / stasis / app.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis application support.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 #include "asterisk.h"
27
28 #include "app.h"
29 #include "control.h"
30 #include "messaging.h"
31
32 #include "asterisk/callerid.h"
33 #include "asterisk/cli.h"
34 #include "asterisk/stasis_app.h"
35 #include "asterisk/stasis_bridges.h"
36 #include "asterisk/stasis_channels.h"
37 #include "asterisk/stasis_endpoints.h"
38 #include "asterisk/stasis_message_router.h"
39
40 #define BRIDGE_ALL "__AST_BRIDGE_ALL_TOPIC"
41 #define CHANNEL_ALL "__AST_CHANNEL_ALL_TOPIC"
42 #define ENDPOINT_ALL "__AST_ENDPOINT_ALL_TOPIC"
43
44 /*! Global debug flag.  No need for locking */
45 int global_debug;
46
47 static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate);
48
49 struct stasis_app {
50         /*! Aggregation topic for this application. */
51         struct stasis_topic *topic;
52         /*! Router for handling messages forwarded to \a topic. */
53         struct stasis_message_router *router;
54         /*! Router for handling messages to the bridge all \a topic. */
55         struct stasis_message_router *bridge_router;
56         /*! Optional router for handling endpoint messages in 'all' subscriptions */
57         struct stasis_message_router *endpoint_router;
58         /*! Container of the channel forwards to this app's topic. */
59         struct ao2_container *forwards;
60         /*! Callback function for this application. */
61         stasis_app_cb handler;
62         /*! Opaque data to hand to callback function. */
63         void *data;
64         /*! Subscription model for the application */
65         enum stasis_app_subscription_model subscription_model;
66         /*! Whether or not someone wants to see debug messages about this app */
67         int debug;
68         /*! An array of allowed events types for this application */
69         struct ast_json *events_allowed;
70         /*! An array of disallowed events types for this application */
71         struct ast_json *events_disallowed;
72         /*! Name of the Stasis application */
73         char name[];
74 };
75
76 enum forward_type {
77         FORWARD_CHANNEL,
78         FORWARD_BRIDGE,
79         FORWARD_ENDPOINT,
80 };
81
82 /*! Subscription info for a particular channel/bridge. */
83 struct app_forwards {
84         /*! Count of number of times this channel/bridge has been subscribed */
85         int interested;
86
87         /*! Forward for the regular topic */
88         struct stasis_forward *topic_forward;
89         /*! Forward for the caching topic */
90         struct stasis_forward *topic_cached_forward;
91
92         /* Type of object being forwarded */
93         enum forward_type forward_type;
94         /*! Unique id of the object being forwarded */
95         char id[];
96 };
97
98 static void forwards_dtor(void *obj)
99 {
100 #ifdef AST_DEVMODE
101         struct app_forwards *forwards = obj;
102 #endif /* AST_DEVMODE */
103
104         ast_assert(forwards->topic_forward == NULL);
105         ast_assert(forwards->topic_cached_forward == NULL);
106 }
107
108 static void forwards_unsubscribe(struct app_forwards *forwards)
109 {
110         stasis_forward_cancel(forwards->topic_forward);
111         forwards->topic_forward = NULL;
112         stasis_forward_cancel(forwards->topic_cached_forward);
113         forwards->topic_cached_forward = NULL;
114 }
115
116 static struct app_forwards *forwards_create(struct stasis_app *app,
117         const char *id)
118 {
119         struct app_forwards *forwards;
120
121         if (!app || ast_strlen_zero(id)) {
122                 return NULL;
123         }
124
125         forwards = ao2_t_alloc(sizeof(*forwards) + strlen(id) + 1, forwards_dtor, id);
126         if (!forwards) {
127                 return NULL;
128         }
129
130         strcpy(forwards->id, id); /* SAFE */
131
132         return forwards;
133 }
134
135 /*! Forward a channel's topics to an app */
136 static struct app_forwards *forwards_create_channel(struct stasis_app *app,
137         struct ast_channel *chan)
138 {
139         struct app_forwards *forwards;
140
141         if (!app) {
142                 return NULL;
143         }
144
145         forwards = forwards_create(app, chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL);
146         if (!forwards) {
147                 return NULL;
148         }
149
150         forwards->forward_type = FORWARD_CHANNEL;
151         forwards->topic_forward = stasis_forward_all(
152                 chan ? ast_channel_topic(chan) : ast_channel_topic_all(),
153                 app->topic);
154
155         if (!forwards->topic_forward) {
156                 ao2_ref(forwards, -1);
157                 return NULL;
158         }
159
160         return forwards;
161 }
162
163 /*! Forward a bridge's topics to an app */
164 static struct app_forwards *forwards_create_bridge(struct stasis_app *app,
165         struct ast_bridge *bridge)
166 {
167         struct app_forwards *forwards;
168
169         if (!app) {
170                 return NULL;
171         }
172
173         forwards = forwards_create(app, bridge ? bridge->uniqueid : BRIDGE_ALL);
174         if (!forwards) {
175                 return NULL;
176         }
177
178         forwards->forward_type = FORWARD_BRIDGE;
179         forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge), app->topic);
180
181         if (!forwards->topic_forward && bridge) {
182                 forwards_unsubscribe(forwards);
183                 ao2_ref(forwards, -1);
184                 return NULL;
185         }
186
187         return forwards;
188 }
189
190 static void endpoint_state_cb(void *data, struct stasis_subscription *sub,
191         struct stasis_message *message)
192 {
193         struct stasis_app *app = data;
194
195         stasis_publish(app->topic, message);
196 }
197
198 /*! Forward a endpoint's topics to an app */
199 static struct app_forwards *forwards_create_endpoint(struct stasis_app *app,
200         struct ast_endpoint *endpoint)
201 {
202         struct app_forwards *forwards;
203         int ret = 0;
204
205         if (!app) {
206                 return NULL;
207         }
208
209         forwards = forwards_create(app, endpoint ? ast_endpoint_get_id(endpoint) : ENDPOINT_ALL);
210         if (!forwards) {
211                 return NULL;
212         }
213
214         forwards->forward_type = FORWARD_ENDPOINT;
215         if (endpoint) {
216                 forwards->topic_forward = stasis_forward_all(ast_endpoint_topic(endpoint),
217                         app->topic);
218                 forwards->topic_cached_forward = stasis_forward_all(
219                         ast_endpoint_topic_cached(endpoint), app->topic);
220
221                 if (!forwards->topic_forward || !forwards->topic_cached_forward) {
222                         /* Half-subscribed is a bad thing */
223                         forwards_unsubscribe(forwards);
224                         ao2_ref(forwards, -1);
225                         return NULL;
226                 }
227         } else {
228                 /* Since endpoint subscriptions also subscribe to channels, in the case
229                  * of all endpoint subscriptions, we only want messages for the endpoints.
230                  * As such, we route those particular messages and then re-publish them
231                  * on the app's topic.
232                  */
233                 ast_assert(app->endpoint_router == NULL);
234                 app->endpoint_router = stasis_message_router_create(ast_endpoint_topic_all_cached());
235                 if (!app->endpoint_router) {
236                         forwards_unsubscribe(forwards);
237                         ao2_ref(forwards, -1);
238                         return NULL;
239                 }
240
241                 ret |= stasis_message_router_add(app->endpoint_router,
242                         ast_endpoint_state_type(), endpoint_state_cb, app);
243                 ret |= stasis_message_router_add(app->endpoint_router,
244                         ast_endpoint_contact_state_type(), endpoint_state_cb, app);
245
246                 if (ret) {
247                         ao2_ref(app->endpoint_router, -1);
248                         app->endpoint_router = NULL;
249                         ao2_ref(forwards, -1);
250                         return NULL;
251                 }
252         }
253
254         return forwards;
255 }
256
257 static int forwards_sort(const void *obj_left, const void *obj_right, int flags)
258 {
259         const struct app_forwards *object_left = obj_left;
260         const struct app_forwards *object_right = obj_right;
261         const char *right_key = obj_right;
262         int cmp;
263
264         switch (flags & (OBJ_POINTER | OBJ_KEY | OBJ_PARTIAL_KEY)) {
265         case OBJ_POINTER:
266                 right_key = object_right->id;
267                 /* Fall through */
268         case OBJ_KEY:
269                 cmp = strcmp(object_left->id, right_key);
270                 break;
271         case OBJ_PARTIAL_KEY:
272                 /*
273                  * We could also use a partial key struct containing a length
274                  * so strlen() does not get called for every comparison instead.
275                  */
276                 cmp = strncmp(object_left->id, right_key, strlen(right_key));
277                 break;
278         default:
279                 /* Sort can only work on something with a full or partial key. */
280                 ast_assert(0);
281                 cmp = 0;
282                 break;
283         }
284         return cmp;
285 }
286
287 static void app_dtor(void *obj)
288 {
289         struct stasis_app *app = obj;
290         size_t size = strlen("stasis-") + strlen(app->name) + 1;
291         char context_name[size];
292
293         ast_verb(1, "Destroying Stasis app %s\n", app->name);
294
295         ast_assert(app->router == NULL);
296         ast_assert(app->bridge_router == NULL);
297         ast_assert(app->endpoint_router == NULL);
298
299         /* If we created a context for this application, remove it */
300         strcpy(context_name, "stasis-");
301         strcat(context_name, app->name);
302         ast_context_destroy_by_name(context_name, "res_stasis");
303
304         ao2_cleanup(app->topic);
305         app->topic = NULL;
306         ao2_cleanup(app->forwards);
307         app->forwards = NULL;
308         ao2_cleanup(app->data);
309         app->data = NULL;
310
311         ast_json_unref(app->events_allowed);
312         app->events_allowed = NULL;
313         ast_json_unref(app->events_disallowed);
314         app->events_disallowed = NULL;
315
316 }
317
318 static void call_forwarded_handler(struct stasis_app *app, struct stasis_message *message)
319 {
320         struct ast_multi_channel_blob *payload = stasis_message_data(message);
321         struct ast_channel_snapshot *snapshot = ast_multi_channel_blob_get_channel(payload, "forwarded");
322         struct ast_channel *chan;
323
324         if (!snapshot) {
325                 return;
326         }
327
328         chan = ast_channel_get_by_name(snapshot->base->uniqueid);
329         if (!chan) {
330                 return;
331         }
332
333         app_subscribe_channel(app, chan);
334         ast_channel_unref(chan);
335 }
336
337 static void sub_subscription_change_handler(void *data, struct stasis_subscription *sub,
338         struct stasis_message *message)
339 {
340         struct stasis_app *app = data;
341
342         if (stasis_subscription_final_message(sub, message)) {
343                 ao2_cleanup(app);
344         }
345 }
346
347 static void sub_default_handler(void *data, struct stasis_subscription *sub,
348         struct stasis_message *message)
349 {
350         struct stasis_app *app = data;
351         struct ast_json *json;
352
353         /* The dial type can be converted to JSON so it will always be passed
354          * here.
355          */
356         if (stasis_message_type(message) == ast_channel_dial_type()) {
357                 call_forwarded_handler(app, message);
358         }
359
360         /* By default, send any message that has a JSON representation */
361         json = stasis_message_to_json(message, stasis_app_get_sanitizer());
362         if (!json) {
363                 return;
364         }
365
366         app_send(app, json);
367         ast_json_unref(json);
368 }
369
370 /*! \brief Typedef for callbacks that get called on channel snapshot updates */
371 typedef struct ast_json *(*channel_snapshot_monitor)(
372         struct ast_channel_snapshot *old_snapshot,
373         struct ast_channel_snapshot *new_snapshot,
374         const struct timeval *tv);
375
376 static struct ast_json *simple_channel_event(
377         const char *type,
378         struct ast_channel_snapshot *snapshot,
379         const struct timeval *tv)
380 {
381         struct ast_json *json_channel = ast_channel_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
382
383         if (!json_channel) {
384                 return NULL;
385         }
386
387         return ast_json_pack("{s: s, s: o, s: o}",
388                 "type", type,
389                 "timestamp", ast_json_timeval(*tv, NULL),
390                 "channel", json_channel);
391 }
392
393 static struct ast_json *channel_created_event(
394         struct ast_channel_snapshot *snapshot,
395         const struct timeval *tv)
396 {
397         return simple_channel_event("ChannelCreated", snapshot, tv);
398 }
399
400 static struct ast_json *channel_destroyed_event(
401         struct ast_channel_snapshot *snapshot,
402         const struct timeval *tv)
403 {
404         struct ast_json *json_channel = ast_channel_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
405
406         if (!json_channel) {
407                 return NULL;
408         }
409
410         return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
411                 "type", "ChannelDestroyed",
412                 "timestamp", ast_json_timeval(*tv, NULL),
413                 "cause", snapshot->hangup->cause,
414                 "cause_txt", ast_cause2str(snapshot->hangup->cause),
415                 "channel", json_channel);
416 }
417
418 static struct ast_json *channel_state_change_event(
419         struct ast_channel_snapshot *snapshot,
420         const struct timeval *tv)
421 {
422         return simple_channel_event("ChannelStateChange", snapshot, tv);
423 }
424
425 /*! \brief Handle channel state changes */
426 static struct ast_json *channel_state(
427         struct ast_channel_snapshot *old_snapshot,
428         struct ast_channel_snapshot *new_snapshot,
429         const struct timeval *tv)
430 {
431         struct ast_channel_snapshot *snapshot = new_snapshot ?
432                 new_snapshot : old_snapshot;
433
434         if (!old_snapshot) {
435                 return channel_created_event(snapshot, tv);
436         } else if (ast_test_flag(&new_snapshot->flags, AST_FLAG_DEAD)) {
437                 return channel_destroyed_event(snapshot, tv);
438         } else if (old_snapshot->state != new_snapshot->state) {
439                 return channel_state_change_event(snapshot, tv);
440         }
441
442         return NULL;
443 }
444
445 static struct ast_json *channel_dialplan(
446         struct ast_channel_snapshot *old_snapshot,
447         struct ast_channel_snapshot *new_snapshot,
448         const struct timeval *tv)
449 {
450         struct ast_json *json_channel;
451
452         /* No Newexten event on first channel snapshot */
453         if (!old_snapshot) {
454                 return NULL;
455         }
456
457         /* Empty application is not valid for a Newexten event */
458         if (ast_strlen_zero(new_snapshot->dialplan->appl)) {
459                 return NULL;
460         }
461
462         if (ast_channel_snapshot_cep_equal(old_snapshot, new_snapshot)) {
463                 return NULL;
464         }
465
466         json_channel = ast_channel_snapshot_to_json(new_snapshot, stasis_app_get_sanitizer());
467         if (!json_channel) {
468                 return NULL;
469         }
470
471         return ast_json_pack("{s: s, s: o, s: s, s: s, s: o}",
472                 "type", "ChannelDialplan",
473                 "timestamp", ast_json_timeval(*tv, NULL),
474                 "dialplan_app", new_snapshot->dialplan->appl,
475                 "dialplan_app_data", AST_JSON_UTF8_VALIDATE(new_snapshot->dialplan->data),
476                 "channel", json_channel);
477 }
478
479 static struct ast_json *channel_callerid(
480         struct ast_channel_snapshot *old_snapshot,
481         struct ast_channel_snapshot *new_snapshot,
482         const struct timeval *tv)
483 {
484         struct ast_json *json_channel;
485
486         /* No NewCallerid event on first channel snapshot */
487         if (!old_snapshot) {
488                 return NULL;
489         }
490
491         if (ast_channel_snapshot_caller_id_equal(old_snapshot, new_snapshot)) {
492                 return NULL;
493         }
494
495         json_channel = ast_channel_snapshot_to_json(new_snapshot, stasis_app_get_sanitizer());
496         if (!json_channel) {
497                 return NULL;
498         }
499
500         return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
501                 "type", "ChannelCallerId",
502                 "timestamp", ast_json_timeval(*tv, NULL),
503                 "caller_presentation", new_snapshot->caller->pres,
504                 "caller_presentation_txt", ast_describe_caller_presentation(
505                         new_snapshot->caller->pres),
506                 "channel", json_channel);
507 }
508
509 static struct ast_json *channel_connected_line(
510         struct ast_channel_snapshot *old_snapshot,
511         struct ast_channel_snapshot *new_snapshot,
512         const struct timeval *tv)
513 {
514         struct ast_json *json_channel;
515
516         /* No ChannelConnectedLine event on first channel snapshot */
517         if (!old_snapshot) {
518                 return NULL;
519         }
520
521         if (ast_channel_snapshot_connected_line_equal(old_snapshot, new_snapshot)) {
522                 return NULL;
523         }
524
525         json_channel = ast_channel_snapshot_to_json(new_snapshot, stasis_app_get_sanitizer());
526         if (!json_channel) {
527                 return NULL;
528         }
529
530         return ast_json_pack("{s: s, s: o, s: o}",
531                 "type", "ChannelConnectedLine",
532                 "timestamp", ast_json_timeval(*tv, NULL),
533                 "channel", json_channel);
534 }
535
536 static channel_snapshot_monitor channel_monitors[] = {
537         channel_state,
538         channel_dialplan,
539         channel_callerid,
540         channel_connected_line,
541 };
542
543 static void sub_channel_update_handler(void *data,
544         struct stasis_subscription *sub,
545         struct stasis_message *message)
546 {
547         struct stasis_app *app = data;
548         struct ast_channel_snapshot_update *update = stasis_message_data(message);
549         int i;
550
551         for (i = 0; i < ARRAY_LEN(channel_monitors); ++i) {
552                 struct ast_json *msg;
553
554                 msg = channel_monitors[i](update->old_snapshot, update->new_snapshot,
555                         stasis_message_timestamp(message));
556                 if (msg) {
557                         app_send(app, msg);
558                         ast_json_unref(msg);
559                 }
560         }
561
562         if (ast_test_flag(&update->new_snapshot->flags, AST_FLAG_DEAD)) {
563                 unsubscribe(app, "channel", update->new_snapshot->base->uniqueid, 1);
564         }
565 }
566
567 static struct ast_json *simple_endpoint_event(
568         const char *type,
569         struct ast_endpoint_snapshot *snapshot,
570         const struct timeval *tv)
571 {
572         struct ast_json *json_endpoint = ast_endpoint_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
573
574         if (!json_endpoint) {
575                 return NULL;
576         }
577
578         return ast_json_pack("{s: s, s: o, s: o}",
579                 "type", type,
580                 "timestamp", ast_json_timeval(*tv, NULL),
581                 "endpoint", json_endpoint);
582 }
583
584 static int message_received_handler(const char *endpoint_id, struct ast_json *json_msg, void *pvt)
585 {
586         struct ast_endpoint_snapshot *snapshot;
587         struct ast_json *json_endpoint;
588         struct ast_json *message;
589         struct stasis_app *app = pvt;
590         char *tech;
591         char *resource;
592
593         tech = ast_strdupa(endpoint_id);
594         resource = strchr(tech, '/');
595         if (resource) {
596                 resource[0] = '\0';
597                 resource++;
598         }
599
600         if (ast_strlen_zero(tech) || ast_strlen_zero(resource)) {
601                 return -1;
602         }
603
604         snapshot = ast_endpoint_latest_snapshot(tech, resource);
605         if (!snapshot) {
606                 return -1;
607         }
608
609         json_endpoint = ast_endpoint_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
610         ao2_ref(snapshot, -1);
611         if (!json_endpoint) {
612                 return -1;
613         }
614
615         message = ast_json_pack("{s: s, s: o, s: o, s: o}",
616                 "type", "TextMessageReceived",
617                 "timestamp", ast_json_timeval(ast_tvnow(), NULL),
618                 "endpoint", json_endpoint,
619                 "message", ast_json_ref(json_msg));
620         if (message) {
621                 app_send(app, message);
622                 ast_json_unref(message);
623         }
624
625         return 0;
626 }
627
628 static void sub_endpoint_update_handler(void *data,
629         struct stasis_subscription *sub,
630         struct stasis_message *message)
631 {
632         struct stasis_app *app = data;
633         struct stasis_cache_update *update;
634         struct ast_endpoint_snapshot *new_snapshot;
635         struct ast_endpoint_snapshot *old_snapshot;
636         const struct timeval *tv;
637
638         ast_assert(stasis_message_type(message) == stasis_cache_update_type());
639
640         update = stasis_message_data(message);
641
642         ast_assert(update->type == ast_endpoint_snapshot_type());
643
644         new_snapshot = stasis_message_data(update->new_snapshot);
645         old_snapshot = stasis_message_data(update->old_snapshot);
646
647         if (new_snapshot) {
648                 struct ast_json *json;
649
650                 tv = stasis_message_timestamp(update->new_snapshot);
651
652                 json = simple_endpoint_event("EndpointStateChange", new_snapshot, tv);
653                 if (!json) {
654                         return;
655                 }
656
657                 app_send(app, json);
658                 ast_json_unref(json);
659         }
660
661         if (!new_snapshot && old_snapshot) {
662                 unsubscribe(app, "endpoint", old_snapshot->id, 1);
663         }
664 }
665
666 static struct ast_json *simple_bridge_event(
667         const char *type,
668         struct ast_bridge_snapshot *snapshot,
669         const struct timeval *tv)
670 {
671         struct ast_json *json_bridge = ast_bridge_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
672         if (!json_bridge) {
673                 return NULL;
674         }
675
676         return ast_json_pack("{s: s, s: o, s: o}",
677                 "type", type,
678                 "timestamp", ast_json_timeval(*tv, NULL),
679                 "bridge", json_bridge);
680 }
681
682 static void sub_bridge_update_handler(void *data,
683         struct stasis_subscription *sub,
684         struct stasis_message *message)
685 {
686         struct ast_json *json = NULL;
687         struct stasis_app *app = data;
688         struct ast_bridge_snapshot_update *update;
689         const struct timeval *tv;
690
691         update = stasis_message_data(message);
692
693         tv = stasis_message_timestamp(message);
694
695         if (!update->new_snapshot) {
696                 json = simple_bridge_event("BridgeDestroyed", update->old_snapshot, tv);
697         } else if (!update->old_snapshot) {
698                 json = simple_bridge_event("BridgeCreated", update->new_snapshot, tv);
699         } else if (update->new_snapshot && update->old_snapshot
700                 && strcmp(update->new_snapshot->video_source_id, update->old_snapshot->video_source_id)) {
701                 json = simple_bridge_event("BridgeVideoSourceChanged", update->new_snapshot, tv);
702                 if (json && !ast_strlen_zero(update->old_snapshot->video_source_id)) {
703                         ast_json_object_set(json, "old_video_source_id",
704                                 ast_json_string_create(update->old_snapshot->video_source_id));
705                 }
706         }
707
708         if (json) {
709                 app_send(app, json);
710                 ast_json_unref(json);
711         }
712
713         if (!update->new_snapshot && update->old_snapshot) {
714                 unsubscribe(app, "bridge", update->old_snapshot->uniqueid, 1);
715         }
716 }
717
718
719 /*! \brief Helper function for determining if the application is subscribed to a given entity */
720 static int bridge_app_subscribed(struct stasis_app *app, const char *uniqueid)
721 {
722         struct app_forwards *forwards = NULL;
723
724         forwards = ao2_find(app->forwards, uniqueid, OBJ_SEARCH_KEY);
725         if (!forwards) {
726                 return 0;
727         }
728
729         ao2_ref(forwards, -1);
730         return 1;
731 }
732
733 static void bridge_merge_handler(void *data, struct stasis_subscription *sub,
734         struct stasis_message *message)
735 {
736         struct stasis_app *app = data;
737         struct ast_bridge_merge_message *merge;
738
739         merge = stasis_message_data(message);
740
741         /* Find out if we're subscribed to either bridge */
742         if (bridge_app_subscribed(app, merge->from->uniqueid) ||
743                 bridge_app_subscribed(app, merge->to->uniqueid)) {
744                 /* Forward the message to the app */
745                 stasis_publish(app->topic, message);
746         }
747 }
748
749 /*! \brief Callback function for checking if channels in a bridge are subscribed to */
750 static int bridge_app_subscribed_involved(struct stasis_app *app, struct ast_bridge_snapshot *snapshot)
751 {
752         int subscribed = 0;
753         struct ao2_iterator iter;
754         char *uniqueid;
755
756         if (bridge_app_subscribed(app, snapshot->uniqueid)) {
757                 return 1;
758         }
759
760         iter = ao2_iterator_init(snapshot->channels, 0);
761         for (; (uniqueid = ao2_iterator_next(&iter)); ao2_ref(uniqueid, -1)) {
762                 if (bridge_app_subscribed(app, uniqueid)) {
763                         subscribed = 1;
764                         ao2_ref(uniqueid, -1);
765                         break;
766                 }
767         }
768         ao2_iterator_destroy(&iter);
769
770         return subscribed;
771 }
772
773 static void bridge_blind_transfer_handler(void *data, struct stasis_subscription *sub,
774         struct stasis_message *message)
775 {
776         struct stasis_app *app = data;
777         struct ast_blind_transfer_message *transfer_msg = stasis_message_data(message);
778         struct ast_bridge_snapshot *bridge = transfer_msg->bridge;
779
780         if (bridge_app_subscribed(app, transfer_msg->transferer->base->uniqueid) ||
781                 (bridge && bridge_app_subscribed_involved(app, bridge))) {
782                 stasis_publish(app->topic, message);
783         }
784 }
785
786 static void bridge_attended_transfer_handler(void *data, struct stasis_subscription *sub,
787         struct stasis_message *message)
788 {
789         struct stasis_app *app = data;
790         struct ast_attended_transfer_message *transfer_msg = stasis_message_data(message);
791         int subscribed = 0;
792
793         subscribed = bridge_app_subscribed(app, transfer_msg->to_transferee.channel_snapshot->base->uniqueid);
794         if (!subscribed) {
795                 subscribed = bridge_app_subscribed(app, transfer_msg->to_transfer_target.channel_snapshot->base->uniqueid);
796         }
797         if (!subscribed && transfer_msg->to_transferee.bridge_snapshot) {
798                 subscribed = bridge_app_subscribed_involved(app, transfer_msg->to_transferee.bridge_snapshot);
799         }
800         if (!subscribed && transfer_msg->to_transfer_target.bridge_snapshot) {
801                 subscribed = bridge_app_subscribed_involved(app, transfer_msg->to_transfer_target.bridge_snapshot);
802         }
803
804         if (!subscribed) {
805                 switch (transfer_msg->dest_type) {
806                 case AST_ATTENDED_TRANSFER_DEST_BRIDGE_MERGE:
807                         subscribed = bridge_app_subscribed(app, transfer_msg->dest.bridge);
808                         break;
809                 case AST_ATTENDED_TRANSFER_DEST_LINK:
810                         subscribed = bridge_app_subscribed(app, transfer_msg->dest.links[0]->base->uniqueid);
811                         if (!subscribed) {
812                                 subscribed = bridge_app_subscribed(app, transfer_msg->dest.links[1]->base->uniqueid);
813                         }
814                         break;
815                 break;
816                 case AST_ATTENDED_TRANSFER_DEST_THREEWAY:
817                         subscribed = bridge_app_subscribed_involved(app, transfer_msg->dest.threeway.bridge_snapshot);
818                         if (!subscribed) {
819                                 subscribed = bridge_app_subscribed(app, transfer_msg->dest.threeway.channel_snapshot->base->uniqueid);
820                         }
821                         break;
822                 default:
823                         break;
824                 }
825         }
826
827         if (subscribed) {
828                 stasis_publish(app->topic, message);
829         }
830 }
831
832 static void bridge_subscription_change_handler(void *data, struct stasis_subscription *sub,
833         struct stasis_message *message)
834 {
835         struct stasis_app *app = data;
836
837         if (stasis_subscription_final_message(sub, message)) {
838                 ao2_cleanup(app);
839         }
840 }
841
842 void stasis_app_set_debug(struct stasis_app *app, int debug)
843 {
844         if (!app) {
845                 return;
846         }
847
848         app->debug = debug;
849 }
850
851 void stasis_app_set_debug_by_name(const char *app_name, int debug)
852 {
853         struct stasis_app *app = stasis_app_get_by_name(app_name);
854
855         if (!app) {
856                 return;
857         }
858
859         app->debug = debug;
860         ao2_cleanup(app);
861 }
862
863 int stasis_app_get_debug(struct stasis_app *app)
864 {
865         return (app ? app->debug : 0) || global_debug;
866 }
867
868 int stasis_app_get_debug_by_name(const char *app_name)
869 {
870         int debug_enabled = 0;
871
872         if (global_debug) {
873                 debug_enabled = 1;
874         } else {
875                 struct stasis_app *app = stasis_app_get_by_name(app_name);
876
877                 if (app) {
878                         if (app->debug) {
879                                 debug_enabled = 1;
880                         }
881                         ao2_ref(app, -1);
882                 }
883         }
884         return debug_enabled;
885 }
886
887 void stasis_app_set_global_debug(int debug)
888 {
889         global_debug = debug;
890         if (!global_debug) {
891                 struct ao2_container *app_names = stasis_app_get_all();
892                 struct ao2_iterator it_app_names;
893                 char *app_name;
894                 struct stasis_app *app;
895
896                 if (!app_names || !ao2_container_count(app_names)) {
897                         ao2_cleanup(app_names);
898                         return;
899                 }
900
901                 it_app_names = ao2_iterator_init(app_names, 0);
902                 while ((app_name = ao2_iterator_next(&it_app_names))) {
903                         if ((app = stasis_app_get_by_name(app_name))) {
904                                 stasis_app_set_debug(app, 0);
905                         }
906
907                         ao2_cleanup(app_name);
908                         ao2_cleanup(app);
909                 }
910                 ao2_iterator_cleanup(&it_app_names);
911                 ao2_cleanup(app_names);
912         }
913 }
914
915 struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *data, enum stasis_app_subscription_model subscription_model)
916 {
917         RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
918         size_t size;
919         int res = 0;
920         size_t context_size = strlen("stasis-") + strlen(name) + 1;
921         char context_name[context_size];
922
923         ast_assert(name != NULL);
924         ast_assert(handler != NULL);
925
926         ast_verb(1, "Creating Stasis app '%s'\n", name);
927
928         size = sizeof(*app) + strlen(name) + 1;
929         app = ao2_alloc_options(size, app_dtor, AO2_ALLOC_OPT_LOCK_MUTEX);
930         if (!app) {
931                 return NULL;
932         }
933         app->subscription_model = subscription_model;
934
935         app->forwards = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_MUTEX,
936                 AO2_CONTAINER_ALLOC_OPT_DUPS_OBJ_REJECT,
937                 forwards_sort, NULL);
938         if (!app->forwards) {
939                 return NULL;
940         }
941
942         app->topic = stasis_topic_create(name);
943         if (!app->topic) {
944                 return NULL;
945         }
946
947         app->bridge_router = stasis_message_router_create(ast_bridge_topic_all());
948         if (!app->bridge_router) {
949                 return NULL;
950         }
951
952         res |= stasis_message_router_add(app->bridge_router,
953                 ast_bridge_merge_message_type(), bridge_merge_handler, app);
954
955         res |= stasis_message_router_add(app->bridge_router,
956                 ast_blind_transfer_type(), bridge_blind_transfer_handler, app);
957
958         res |= stasis_message_router_add(app->bridge_router,
959                 ast_attended_transfer_type(), bridge_attended_transfer_handler, app);
960
961         res |= stasis_message_router_add(app->bridge_router,
962                 stasis_subscription_change_type(), bridge_subscription_change_handler, app);
963
964         if (res != 0) {
965                 return NULL;
966         }
967         /* Bridge router holds a reference */
968         ao2_ref(app, +1);
969
970         app->router = stasis_message_router_create(app->topic);
971         if (!app->router) {
972                 return NULL;
973         }
974
975         res |= stasis_message_router_add(app->router,
976                 ast_bridge_snapshot_type(), sub_bridge_update_handler, app);
977
978         res |= stasis_message_router_add(app->router,
979                 ast_channel_snapshot_type(), sub_channel_update_handler, app);
980
981         res |= stasis_message_router_add_cache_update(app->router,
982                 ast_endpoint_snapshot_type(), sub_endpoint_update_handler, app);
983
984         res |= stasis_message_router_add(app->router,
985                 stasis_subscription_change_type(), sub_subscription_change_handler, app);
986
987         stasis_message_router_set_formatters_default(app->router,
988                 sub_default_handler, app, STASIS_SUBSCRIPTION_FORMATTER_JSON);
989
990         if (res != 0) {
991                 return NULL;
992         }
993         /* Router holds a reference */
994         ao2_ref(app, +1);
995
996         strncpy(app->name, name, size - sizeof(*app));
997         app->handler = handler;
998         app->data = ao2_bump(data);
999
1000         /* Create a context, a match-all extension, and a 'h' extension for this application. Note that
1001          * this should only be done if a context does not already exist. */
1002         strcpy(context_name, "stasis-");
1003         strcat(context_name, name);
1004         if (!ast_context_find(context_name)) {
1005                 if (!ast_context_find_or_create(NULL, NULL, context_name, "res_stasis")) {
1006                         ast_log(LOG_WARNING, "Could not create context '%s' for Stasis application '%s'\n", context_name, name);
1007                 } else {
1008                         ast_add_extension(context_name, 0, "_.", 1, NULL, NULL, "Stasis", ast_strdup(name), ast_free_ptr, "res_stasis");
1009                         ast_add_extension(context_name, 0, "h", 1, NULL, NULL, "NoOp", NULL, NULL, "res_stasis");
1010                 }
1011         } else {
1012                 ast_log(LOG_WARNING, "Not creating context '%s' for Stasis application '%s' because it already exists\n",
1013                         context_name, name);
1014         }
1015
1016         ao2_ref(app, +1);
1017         return app;
1018 }
1019
1020 struct stasis_topic *ast_app_get_topic(struct stasis_app *app)
1021 {
1022         return app->topic;
1023 }
1024
1025 /*!
1026  * \brief Send a message to the given application.
1027  * \param app App to send the message to.
1028  * \param message Message to send.
1029  */
1030 void app_send(struct stasis_app *app, struct ast_json *message)
1031 {
1032         stasis_app_cb handler;
1033         char eid[20];
1034         void *data;
1035
1036         if (ast_json_object_set(message, "asterisk_id", ast_json_string_create(
1037                         ast_eid_to_str(eid, sizeof(eid), &ast_eid_default)))) {
1038                 ast_log(AST_LOG_WARNING, "Failed to append EID to outgoing event %s\n",
1039                         ast_json_string_get(ast_json_object_get(message, "type")));
1040         }
1041
1042         /* Copy off mutable state with lock held */
1043         ao2_lock(app);
1044         handler = app->handler;
1045         data = ao2_bump(app->data);
1046         ao2_unlock(app);
1047         /* Name is immutable; no need to copy */
1048
1049         if (handler) {
1050                 handler(data, app->name, message);
1051         } else {
1052                 ast_verb(3,
1053                         "Inactive Stasis app '%s' missed message\n", app->name);
1054         }
1055         ao2_cleanup(data);
1056 }
1057
1058 void app_deactivate(struct stasis_app *app)
1059 {
1060         ao2_lock(app);
1061
1062         ast_verb(1, "Deactivating Stasis app '%s'\n", app->name);
1063         app->handler = NULL;
1064         ao2_cleanup(app->data);
1065         app->data = NULL;
1066
1067         ao2_unlock(app);
1068 }
1069
1070 void app_shutdown(struct stasis_app *app)
1071 {
1072         ao2_lock(app);
1073
1074         ast_assert(app_is_finished(app));
1075
1076         stasis_message_router_unsubscribe(app->router);
1077         app->router = NULL;
1078         stasis_message_router_unsubscribe(app->bridge_router);
1079         app->bridge_router = NULL;
1080         stasis_message_router_unsubscribe(app->endpoint_router);
1081         app->endpoint_router = NULL;
1082
1083         ao2_unlock(app);
1084 }
1085
1086 int app_is_active(struct stasis_app *app)
1087 {
1088         int ret;
1089
1090         ao2_lock(app);
1091         ret = app->handler != NULL;
1092         ao2_unlock(app);
1093
1094         return ret;
1095 }
1096
1097 int app_is_finished(struct stasis_app *app)
1098 {
1099         int ret;
1100
1101         ao2_lock(app);
1102         ret = app->handler == NULL && ao2_container_count(app->forwards) == 0;
1103         ao2_unlock(app);
1104
1105         return ret;
1106 }
1107
1108 void app_update(struct stasis_app *app, stasis_app_cb handler, void *data)
1109 {
1110         ao2_lock(app);
1111         if (app->handler && app->data) {
1112                 struct ast_json *msg;
1113
1114                 ast_verb(1, "Replacing Stasis app '%s'\n", app->name);
1115
1116                 msg = ast_json_pack("{s: s, s: s}",
1117                         "type", "ApplicationReplaced",
1118                         "application", app->name);
1119                 if (msg) {
1120                         app_send(app, msg);
1121                         ast_json_unref(msg);
1122                 }
1123         } else {
1124                 ast_verb(1, "Activating Stasis app '%s'\n", app->name);
1125         }
1126
1127         app->handler = handler;
1128         ao2_replace(app->data, data);
1129         ao2_unlock(app);
1130 }
1131
1132 const char *stasis_app_name(const struct stasis_app *app)
1133 {
1134         return app->name;
1135 }
1136
1137 static int forwards_filter_by_type(void *obj, void *arg, int flags)
1138 {
1139         struct app_forwards *forward = obj;
1140         enum forward_type *forward_type = arg;
1141
1142         if (forward->forward_type == *forward_type) {
1143                 return CMP_MATCH;
1144         }
1145
1146         return 0;
1147 }
1148
1149 void stasis_app_to_cli(const struct stasis_app *app, struct ast_cli_args *a)
1150 {
1151         struct ao2_iterator *channels;
1152         struct ao2_iterator *endpoints;
1153         struct ao2_iterator *bridges;
1154         struct app_forwards *forward;
1155         enum forward_type forward_type;
1156
1157         ast_cli(a->fd, "Name: %s\n"
1158                 "  Debug: %s\n"
1159                 "  Subscription Model: %s\n",
1160                 app->name,
1161                 app->debug ? "Yes" : "No",
1162                 app->subscription_model == STASIS_APP_SUBSCRIBE_ALL ?
1163                         "Global Resource Subscription" :
1164                         "Application/Explicit Resource Subscription");
1165         ast_cli(a->fd, "  Subscriptions: %d\n", ao2_container_count(app->forwards));
1166
1167         ast_cli(a->fd, "    Channels:\n");
1168         forward_type = FORWARD_CHANNEL;
1169         channels = ao2_callback(app->forwards, OBJ_MULTIPLE,
1170                 forwards_filter_by_type, &forward_type);
1171         if (channels) {
1172                 while ((forward = ao2_iterator_next(channels))) {
1173                         ast_cli(a->fd, "      %s (%d)\n", forward->id, forward->interested);
1174                         ao2_ref(forward, -1);
1175                 }
1176                 ao2_iterator_destroy(channels);
1177         }
1178
1179         ast_cli(a->fd, "    Bridges:\n");
1180         forward_type = FORWARD_BRIDGE;
1181         bridges = ao2_callback(app->forwards, OBJ_MULTIPLE,
1182                 forwards_filter_by_type, &forward_type);
1183         if (bridges) {
1184                 while ((forward = ao2_iterator_next(bridges))) {
1185                         ast_cli(a->fd, "      %s (%d)\n", forward->id, forward->interested);
1186                         ao2_ref(forward, -1);
1187                 }
1188                 ao2_iterator_destroy(bridges);
1189         }
1190
1191         ast_cli(a->fd, "    Endpoints:\n");
1192         forward_type = FORWARD_ENDPOINT;
1193         endpoints = ao2_callback(app->forwards, OBJ_MULTIPLE,
1194                 forwards_filter_by_type, &forward_type);
1195         if (endpoints) {
1196                 while ((forward = ao2_iterator_next(endpoints))) {
1197                         ast_cli(a->fd, "      %s (%d)\n", forward->id, forward->interested);
1198                         ao2_ref(forward, -1);
1199                 }
1200                 ao2_iterator_destroy(endpoints);
1201         }
1202 }
1203
1204 struct ast_json *app_to_json(const struct stasis_app *app)
1205 {
1206         struct ast_json *json;
1207         struct ast_json *channels;
1208         struct ast_json *bridges;
1209         struct ast_json *endpoints;
1210         struct ao2_iterator i;
1211         struct app_forwards *forwards;
1212
1213         json = ast_json_pack("{s: s, s: [], s: [], s: []}",
1214                 "name", app->name,
1215                 "channel_ids", "bridge_ids", "endpoint_ids");
1216         if (!json) {
1217                 return NULL;
1218         }
1219         channels = ast_json_object_get(json, "channel_ids");
1220         bridges = ast_json_object_get(json, "bridge_ids");
1221         endpoints = ast_json_object_get(json, "endpoint_ids");
1222
1223         i = ao2_iterator_init(app->forwards, 0);
1224         while ((forwards = ao2_iterator_next(&i))) {
1225                 struct ast_json *array = NULL;
1226                 int append_res;
1227
1228                 switch (forwards->forward_type) {
1229                 case FORWARD_CHANNEL:
1230                         array = channels;
1231                         break;
1232                 case FORWARD_BRIDGE:
1233                         array = bridges;
1234                         break;
1235                 case FORWARD_ENDPOINT:
1236                         array = endpoints;
1237                         break;
1238                 }
1239
1240                 /* If forward_type value is unexpected this will safely return an error. */
1241                 append_res = ast_json_array_append(array, ast_json_string_create(forwards->id));
1242                 ao2_ref(forwards, -1);
1243
1244                 if (append_res != 0) {
1245                         ast_log(LOG_ERROR, "Error building response\n");
1246                         ao2_iterator_destroy(&i);
1247                         ast_json_unref(json);
1248
1249                         return NULL;
1250                 }
1251         }
1252         ao2_iterator_destroy(&i);
1253
1254         return json;
1255 }
1256
1257 int app_subscribe_channel(struct stasis_app *app, struct ast_channel *chan)
1258 {
1259         struct app_forwards *forwards;
1260
1261         if (!app) {
1262                 return -1;
1263         }
1264
1265         ao2_lock(app->forwards);
1266         /* If subscribed to all, don't subscribe again */
1267         forwards = ao2_find(app->forwards, CHANNEL_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1268         if (forwards) {
1269                 ao2_unlock(app->forwards);
1270                 ao2_ref(forwards, -1);
1271
1272                 return 0;
1273         }
1274
1275         forwards = ao2_find(app->forwards,
1276                 chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL,
1277                 OBJ_SEARCH_KEY | OBJ_NOLOCK);
1278         if (!forwards) {
1279                 int res;
1280
1281                 /* Forwards not found, create one */
1282                 forwards = forwards_create_channel(app, chan);
1283                 if (!forwards) {
1284                         ao2_unlock(app->forwards);
1285
1286                         return -1;
1287                 }
1288
1289                 res = ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
1290                 if (!res) {
1291                         ao2_unlock(app->forwards);
1292                         ao2_ref(forwards, -1);
1293
1294                         return -1;
1295                 }
1296         }
1297
1298         ++forwards->interested;
1299         ast_debug(3, "Channel '%s' is %d interested in %s\n",
1300                 chan ? ast_channel_uniqueid(chan) : "ALL",
1301                 forwards->interested,
1302                 app->name);
1303
1304         ao2_unlock(app->forwards);
1305         ao2_ref(forwards, -1);
1306
1307         return 0;
1308 }
1309
1310 static int subscribe_channel(struct stasis_app *app, void *obj)
1311 {
1312         return app_subscribe_channel(app, obj);
1313 }
1314
1315 static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate)
1316 {
1317         struct app_forwards *forwards;
1318
1319         if (!id) {
1320                 if (!strcmp(kind, "bridge")) {
1321                         id = BRIDGE_ALL;
1322                 } else if (!strcmp(kind, "channel")) {
1323                         id = CHANNEL_ALL;
1324                 } else if (!strcmp(kind, "endpoint")) {
1325                         id = ENDPOINT_ALL;
1326                 } else {
1327                         ast_log(LOG_WARNING, "Unknown subscription kind '%s'\n", kind);
1328                         return -1;
1329                 }
1330         }
1331
1332         ao2_lock(app->forwards);
1333         forwards = ao2_find(app->forwards, id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1334         if (!forwards) {
1335                 ao2_unlock(app->forwards);
1336                 ast_debug(3, "App '%s' not subscribed to %s '%s'\n", app->name, kind, id);
1337                 return -1;
1338         }
1339         forwards->interested--;
1340
1341         ast_debug(3, "%s '%s': is %d interested in %s\n", kind, id, forwards->interested, app->name);
1342         if (forwards->interested == 0 || terminate) {
1343                 /* No one is interested any more; unsubscribe */
1344                 ast_debug(3, "%s '%s' unsubscribed from %s\n", kind, id, app->name);
1345                 forwards_unsubscribe(forwards);
1346                 ao2_find(app->forwards, forwards,
1347                         OBJ_POINTER | OBJ_NOLOCK | OBJ_UNLINK |
1348                         OBJ_NODATA);
1349
1350                 if (!strcmp(kind, "endpoint")) {
1351                         messaging_app_unsubscribe_endpoint(app->name, id);
1352                 }
1353         }
1354         ao2_unlock(app->forwards);
1355         ao2_ref(forwards, -1);
1356
1357         return 0;
1358 }
1359
1360 int app_unsubscribe_channel(struct stasis_app *app, struct ast_channel *chan)
1361 {
1362         if (!app) {
1363                 return -1;
1364         }
1365
1366         return app_unsubscribe_channel_id(app, chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL);
1367 }
1368
1369 int app_unsubscribe_channel_id(struct stasis_app *app, const char *channel_id)
1370 {
1371         if (!app) {
1372                 return -1;
1373         }
1374
1375         return unsubscribe(app, "channel", channel_id, 0);
1376 }
1377
1378 int app_is_subscribed_channel_id(struct stasis_app *app, const char *channel_id)
1379 {
1380         struct app_forwards *forwards;
1381
1382         if (ast_strlen_zero(channel_id)) {
1383                 channel_id = CHANNEL_ALL;
1384         }
1385         forwards = ao2_find(app->forwards, channel_id, OBJ_SEARCH_KEY);
1386         ao2_cleanup(forwards);
1387
1388         return forwards != NULL;
1389 }
1390
1391 static void *channel_find(const struct stasis_app *app, const char *id)
1392 {
1393         return ast_channel_get_by_name(id);
1394 }
1395
1396 struct stasis_app_event_source channel_event_source = {
1397         .scheme = "channel:",
1398         .find = channel_find,
1399         .subscribe = subscribe_channel,
1400         .unsubscribe = app_unsubscribe_channel_id,
1401         .is_subscribed = app_is_subscribed_channel_id
1402 };
1403
1404 int app_subscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
1405 {
1406         struct app_forwards *forwards;
1407
1408         if (!app) {
1409                 return -1;
1410         }
1411
1412         ao2_lock(app->forwards);
1413         /* If subscribed to all, don't subscribe again */
1414         forwards = ao2_find(app->forwards, BRIDGE_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1415         if (forwards) {
1416                 ao2_unlock(app->forwards);
1417                 ao2_ref(forwards, -1);
1418
1419                 return 0;
1420         }
1421
1422         forwards = ao2_find(app->forwards,
1423                 bridge ? bridge->uniqueid : BRIDGE_ALL,
1424                 OBJ_SEARCH_KEY | OBJ_NOLOCK);
1425         if (!forwards) {
1426                 int res;
1427
1428                 /* Forwards not found, create one */
1429                 forwards = forwards_create_bridge(app, bridge);
1430                 if (!forwards) {
1431                         ao2_unlock(app->forwards);
1432
1433                         return -1;
1434                 }
1435
1436                 res = ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
1437                 if (!res) {
1438                         ao2_unlock(app->forwards);
1439                         ao2_ref(forwards, -1);
1440
1441                         return -1;
1442                 }
1443         }
1444
1445         ++forwards->interested;
1446         ast_debug(3, "Bridge '%s' is %d interested in %s\n",
1447                 bridge ? bridge->uniqueid : "ALL",
1448                 forwards->interested,
1449                 app->name);
1450
1451         ao2_unlock(app->forwards);
1452         ao2_ref(forwards, -1);
1453
1454         return 0;
1455 }
1456
1457 static int subscribe_bridge(struct stasis_app *app, void *obj)
1458 {
1459         return app_subscribe_bridge(app, obj);
1460 }
1461
1462 int app_unsubscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
1463 {
1464         if (!app) {
1465                 return -1;
1466         }
1467
1468         return app_unsubscribe_bridge_id(app, bridge ? bridge->uniqueid : BRIDGE_ALL);
1469 }
1470
1471 int app_unsubscribe_bridge_id(struct stasis_app *app, const char *bridge_id)
1472 {
1473         if (!app) {
1474                 return -1;
1475         }
1476
1477         return unsubscribe(app, "bridge", bridge_id, 0);
1478 }
1479
1480 int app_is_subscribed_bridge_id(struct stasis_app *app, const char *bridge_id)
1481 {
1482         struct app_forwards *forwards;
1483
1484         if (ast_strlen_zero(bridge_id)) {
1485                 bridge_id = BRIDGE_ALL;
1486         }
1487
1488         forwards = ao2_find(app->forwards, bridge_id, OBJ_SEARCH_KEY);
1489         ao2_cleanup(forwards);
1490
1491         return forwards != NULL;
1492 }
1493
1494 static void *bridge_find(const struct stasis_app *app, const char *id)
1495 {
1496         return stasis_app_bridge_find_by_id(id);
1497 }
1498
1499 struct stasis_app_event_source bridge_event_source = {
1500         .scheme = "bridge:",
1501         .find = bridge_find,
1502         .subscribe = subscribe_bridge,
1503         .unsubscribe = app_unsubscribe_bridge_id,
1504         .is_subscribed = app_is_subscribed_bridge_id
1505 };
1506
1507 int app_subscribe_endpoint(struct stasis_app *app, struct ast_endpoint *endpoint)
1508 {
1509         struct app_forwards *forwards;
1510
1511         if (!app) {
1512                 return -1;
1513         }
1514
1515         ao2_lock(app->forwards);
1516         /* If subscribed to all, don't subscribe again */
1517         forwards = ao2_find(app->forwards, ENDPOINT_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1518         if (forwards) {
1519                 ao2_unlock(app->forwards);
1520                 ao2_ref(forwards, -1);
1521
1522                 return 0;
1523         }
1524
1525         forwards = ao2_find(app->forwards,
1526                 endpoint ? ast_endpoint_get_id(endpoint) : ENDPOINT_ALL,
1527                 OBJ_SEARCH_KEY | OBJ_NOLOCK);
1528         if (!forwards) {
1529                 int res;
1530
1531                 /* Forwards not found, create one */
1532                 forwards = forwards_create_endpoint(app, endpoint);
1533                 if (!forwards) {
1534                         ao2_unlock(app->forwards);
1535
1536                         return -1;
1537                 }
1538
1539                 res = ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
1540                 if (!res) {
1541                         ao2_unlock(app->forwards);
1542                         ao2_ref(forwards, -1);
1543
1544                         return -1;
1545                 }
1546
1547                 /* Subscribe for messages */
1548                 messaging_app_subscribe_endpoint(app->name, endpoint, &message_received_handler, app);
1549         }
1550
1551         ++forwards->interested;
1552         ast_debug(3, "Endpoint '%s' is %d interested in %s\n",
1553                 endpoint ? ast_endpoint_get_id(endpoint) : "ALL",
1554                 forwards->interested,
1555                 app->name);
1556
1557         ao2_unlock(app->forwards);
1558         ao2_ref(forwards, -1);
1559
1560         return 0;
1561 }
1562
1563 static int subscribe_endpoint(struct stasis_app *app, void *obj)
1564 {
1565         return app_subscribe_endpoint(app, obj);
1566 }
1567
1568 int app_unsubscribe_endpoint_id(struct stasis_app *app, const char *endpoint_id)
1569 {
1570         if (!app) {
1571                 return -1;
1572         }
1573
1574         return unsubscribe(app, "endpoint", endpoint_id, 0);
1575 }
1576
1577 int app_is_subscribed_endpoint_id(struct stasis_app *app, const char *endpoint_id)
1578 {
1579         struct app_forwards *forwards;
1580
1581         if (ast_strlen_zero(endpoint_id)) {
1582                 endpoint_id = ENDPOINT_ALL;
1583         }
1584         forwards = ao2_find(app->forwards, endpoint_id, OBJ_SEARCH_KEY);
1585         ao2_cleanup(forwards);
1586
1587         return forwards != NULL;
1588 }
1589
1590 static void *endpoint_find(const struct stasis_app *app, const char *id)
1591 {
1592         return ast_endpoint_find_by_id(id);
1593 }
1594
1595 struct stasis_app_event_source endpoint_event_source = {
1596         .scheme = "endpoint:",
1597         .find = endpoint_find,
1598         .subscribe = subscribe_endpoint,
1599         .unsubscribe = app_unsubscribe_endpoint_id,
1600         .is_subscribed = app_is_subscribed_endpoint_id
1601 };
1602
1603 void stasis_app_register_event_sources(void)
1604 {
1605         stasis_app_register_event_source(&channel_event_source);
1606         stasis_app_register_event_source(&bridge_event_source);
1607         stasis_app_register_event_source(&endpoint_event_source);
1608 }
1609
1610 void stasis_app_unregister_event_sources(void)
1611 {
1612         stasis_app_unregister_event_source(&endpoint_event_source);
1613         stasis_app_unregister_event_source(&bridge_event_source);
1614         stasis_app_unregister_event_source(&channel_event_source);
1615 }
1616
1617 struct ast_json *stasis_app_event_filter_to_json(struct stasis_app *app, struct ast_json *json)
1618 {
1619         if (!app || !json) {
1620                 return json;
1621         }
1622
1623         ast_json_object_set(json, "events_allowed", app->events_allowed ?
1624                 ast_json_ref(app->events_allowed) : ast_json_array_create());
1625         ast_json_object_set(json, "events_disallowed", app->events_disallowed ?
1626                 ast_json_ref(app->events_disallowed) : ast_json_array_create());
1627
1628         return json;
1629 }
1630
1631 static int app_event_filter_set(struct stasis_app *app, struct ast_json **member,
1632         struct ast_json *filter, const char *filter_type)
1633 {
1634         if (filter && ast_json_typeof(filter) == AST_JSON_OBJECT) {
1635                 if (!ast_json_object_size(filter)) {
1636                         /* If no filters are specified then reset this filter type */
1637                         filter = NULL;
1638                 } else {
1639                         /* Otherwise try to get the filter array for this type */
1640                         filter = ast_json_object_get(filter, filter_type);
1641                         if (!filter) {
1642                                 /* A filter type exists, but not this one, so don't update */
1643                                 return 0;
1644                         }
1645                 }
1646         }
1647
1648         /* At this point the filter object should be an array */
1649         if (filter && ast_json_typeof(filter) != AST_JSON_ARRAY) {
1650                 ast_log(LOG_ERROR, "Invalid json type event filter - app: %s, filter: %s\n",
1651                                 app->name, filter_type);
1652                 return -1;
1653         }
1654
1655         if (filter) {
1656                 /* Confirm that at least the type names are specified */
1657                 struct ast_json *obj;
1658                 int i;
1659
1660                 for (i = 0; i < ast_json_array_size(filter) &&
1661                                  (obj = ast_json_array_get(filter, i)); ++i) {
1662
1663                         if (ast_strlen_zero(ast_json_object_string_get(obj, "type"))) {
1664                                 ast_log(LOG_ERROR, "Filter event must have a type - app: %s, "
1665                                                 "filter: %s\n", app->name, filter_type);
1666                                 return -1;
1667                         }
1668                 }
1669         }
1670
1671         ao2_lock(app);
1672         ast_json_unref(*member);
1673         *member = filter ? ast_json_ref(filter) : NULL;
1674         ao2_unlock(app);
1675
1676         return 0;
1677 }
1678
1679 static int app_events_allowed_set(struct stasis_app *app, struct ast_json *filter)
1680 {
1681         return app_event_filter_set(app, &app->events_allowed, filter, "allowed");
1682 }
1683
1684 static int app_events_disallowed_set(struct stasis_app *app, struct ast_json *filter)
1685 {
1686         return app_event_filter_set(app, &app->events_disallowed, filter, "disallowed");
1687 }
1688
1689 int stasis_app_event_filter_set(struct stasis_app *app, struct ast_json *filter)
1690 {
1691         return app_events_disallowed_set(app, filter) || app_events_allowed_set(app, filter);
1692 }
1693
1694 static int app_event_filter_matched(struct ast_json *array, struct ast_json *event, int empty)
1695 {
1696         struct ast_json *obj;
1697         int i;
1698
1699         if (!array || !ast_json_array_size(array)) {
1700                 return empty;
1701         }
1702
1703         for (i = 0; i < ast_json_array_size(array) &&
1704                         (obj = ast_json_array_get(array, i)); ++i) {
1705
1706                 if (ast_strings_equal(ast_json_object_string_get(obj, "type"),
1707                                 ast_json_object_string_get(event, "type"))) {
1708                         return 1;
1709                 }
1710         }
1711
1712         return 0;
1713 }
1714
1715 int stasis_app_event_allowed(const char *app_name, struct ast_json *event)
1716 {
1717         struct stasis_app *app = stasis_app_get_by_name(app_name);
1718         int res;
1719
1720         if (!app) {
1721                 return 0;
1722         }
1723
1724         ao2_lock(app);
1725         res = !app_event_filter_matched(app->events_disallowed, event, 0) &&
1726                 app_event_filter_matched(app->events_allowed, event, 1);
1727         ao2_unlock(app);
1728         ao2_ref(app, -1);
1729
1730         return res;
1731 }