Merge "Revert "PJSIP_CONTACT: add missing argument documentation""
[asterisk/asterisk.git] / res / stasis / app.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis application support.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 #include "asterisk.h"
27
28 #include "app.h"
29 #include "control.h"
30 #include "messaging.h"
31
32 #include "asterisk/callerid.h"
33 #include "asterisk/cli.h"
34 #include "asterisk/stasis_app.h"
35 #include "asterisk/stasis_bridges.h"
36 #include "asterisk/stasis_channels.h"
37 #include "asterisk/stasis_endpoints.h"
38 #include "asterisk/stasis_message_router.h"
39
40 #define BRIDGE_ALL "__AST_BRIDGE_ALL_TOPIC"
41 #define CHANNEL_ALL "__AST_CHANNEL_ALL_TOPIC"
42 #define ENDPOINT_ALL "__AST_ENDPOINT_ALL_TOPIC"
43
44 /*! Global debug flag.  No need for locking */
45 int global_debug;
46
47 static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate);
48
49 struct stasis_app {
50         /*! Aggregation topic for this application. */
51         struct stasis_topic *topic;
52         /*! Router for handling messages forwarded to \a topic. */
53         struct stasis_message_router *router;
54         /*! Router for handling messages to the bridge all \a topic. */
55         struct stasis_message_router *bridge_router;
56         /*! Optional router for handling endpoint messages in 'all' subscriptions */
57         struct stasis_message_router *endpoint_router;
58         /*! Container of the channel forwards to this app's topic. */
59         struct ao2_container *forwards;
60         /*! Callback function for this application. */
61         stasis_app_cb handler;
62         /*! Opaque data to hand to callback function. */
63         void *data;
64         /*! Subscription model for the application */
65         enum stasis_app_subscription_model subscription_model;
66         /*! Whether or not someone wants to see debug messages about this app */
67         int debug;
68         /*! An array of allowed events types for this application */
69         struct ast_json *events_allowed;
70         /*! An array of disallowed events types for this application */
71         struct ast_json *events_disallowed;
72         /*! Name of the Stasis application */
73         char name[];
74 };
75
76 enum forward_type {
77         FORWARD_CHANNEL,
78         FORWARD_BRIDGE,
79         FORWARD_ENDPOINT,
80 };
81
82 /*! Subscription info for a particular channel/bridge. */
83 struct app_forwards {
84         /*! Count of number of times this channel/bridge has been subscribed */
85         int interested;
86
87         /*! Forward for the regular topic */
88         struct stasis_forward *topic_forward;
89         /*! Forward for the caching topic */
90         struct stasis_forward *topic_cached_forward;
91
92         /* Type of object being forwarded */
93         enum forward_type forward_type;
94         /*! Unique id of the object being forwarded */
95         char id[];
96 };
97
98 static void forwards_dtor(void *obj)
99 {
100 #ifdef AST_DEVMODE
101         struct app_forwards *forwards = obj;
102 #endif /* AST_DEVMODE */
103
104         ast_assert(forwards->topic_forward == NULL);
105         ast_assert(forwards->topic_cached_forward == NULL);
106 }
107
108 static void forwards_unsubscribe(struct app_forwards *forwards)
109 {
110         stasis_forward_cancel(forwards->topic_forward);
111         forwards->topic_forward = NULL;
112         stasis_forward_cancel(forwards->topic_cached_forward);
113         forwards->topic_cached_forward = NULL;
114 }
115
116 static struct app_forwards *forwards_create(struct stasis_app *app,
117         const char *id)
118 {
119         struct app_forwards *forwards;
120
121         if (!app || ast_strlen_zero(id)) {
122                 return NULL;
123         }
124
125         forwards = ao2_t_alloc(sizeof(*forwards) + strlen(id) + 1, forwards_dtor, id);
126         if (!forwards) {
127                 return NULL;
128         }
129
130         strcpy(forwards->id, id); /* SAFE */
131
132         return forwards;
133 }
134
135 /*! Forward a channel's topics to an app */
136 static struct app_forwards *forwards_create_channel(struct stasis_app *app,
137         struct ast_channel *chan)
138 {
139         struct app_forwards *forwards;
140
141         if (!app) {
142                 return NULL;
143         }
144
145         forwards = forwards_create(app, chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL);
146         if (!forwards) {
147                 return NULL;
148         }
149
150         forwards->forward_type = FORWARD_CHANNEL;
151         forwards->topic_forward = stasis_forward_all(
152                 chan ? ast_channel_topic(chan) : ast_channel_topic_all(),
153                 app->topic);
154
155         if (!forwards->topic_forward) {
156                 ao2_ref(forwards, -1);
157                 return NULL;
158         }
159
160         return forwards;
161 }
162
163 /*! Forward a bridge's topics to an app */
164 static struct app_forwards *forwards_create_bridge(struct stasis_app *app,
165         struct ast_bridge *bridge)
166 {
167         struct app_forwards *forwards;
168
169         if (!app) {
170                 return NULL;
171         }
172
173         forwards = forwards_create(app, bridge ? bridge->uniqueid : BRIDGE_ALL);
174         if (!forwards) {
175                 return NULL;
176         }
177
178         forwards->forward_type = FORWARD_BRIDGE;
179         forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge), app->topic);
180
181         if (!forwards->topic_forward && bridge) {
182                 forwards_unsubscribe(forwards);
183                 ao2_ref(forwards, -1);
184                 return NULL;
185         }
186
187         return forwards;
188 }
189
190 static void endpoint_state_cb(void *data, struct stasis_subscription *sub,
191         struct stasis_message *message)
192 {
193         struct stasis_app *app = data;
194
195         stasis_publish(app->topic, message);
196 }
197
198 /*! Forward a endpoint's topics to an app */
199 static struct app_forwards *forwards_create_endpoint(struct stasis_app *app,
200         struct ast_endpoint *endpoint)
201 {
202         struct app_forwards *forwards;
203         int ret = 0;
204
205         if (!app) {
206                 return NULL;
207         }
208
209         forwards = forwards_create(app, endpoint ? ast_endpoint_get_id(endpoint) : ENDPOINT_ALL);
210         if (!forwards) {
211                 return NULL;
212         }
213
214         forwards->forward_type = FORWARD_ENDPOINT;
215         if (endpoint) {
216                 forwards->topic_forward = stasis_forward_all(ast_endpoint_topic(endpoint),
217                         app->topic);
218                 forwards->topic_cached_forward = stasis_forward_all(
219                         ast_endpoint_topic_cached(endpoint), app->topic);
220
221                 if (!forwards->topic_forward || !forwards->topic_cached_forward) {
222                         /* Half-subscribed is a bad thing */
223                         forwards_unsubscribe(forwards);
224                         ao2_ref(forwards, -1);
225                         return NULL;
226                 }
227         } else {
228                 /* Since endpoint subscriptions also subscribe to channels, in the case
229                  * of all endpoint subscriptions, we only want messages for the endpoints.
230                  * As such, we route those particular messages and then re-publish them
231                  * on the app's topic.
232                  */
233                 ast_assert(app->endpoint_router == NULL);
234                 app->endpoint_router = stasis_message_router_create(ast_endpoint_topic_all_cached());
235                 if (!app->endpoint_router) {
236                         forwards_unsubscribe(forwards);
237                         ao2_ref(forwards, -1);
238                         return NULL;
239                 }
240
241                 ret |= stasis_message_router_add(app->endpoint_router,
242                         ast_endpoint_state_type(), endpoint_state_cb, app);
243                 ret |= stasis_message_router_add(app->endpoint_router,
244                         ast_endpoint_contact_state_type(), endpoint_state_cb, app);
245
246                 if (ret) {
247                         ao2_ref(app->endpoint_router, -1);
248                         app->endpoint_router = NULL;
249                         ao2_ref(forwards, -1);
250                         return NULL;
251                 }
252         }
253
254         return forwards;
255 }
256
257 static int forwards_sort(const void *obj_left, const void *obj_right, int flags)
258 {
259         const struct app_forwards *object_left = obj_left;
260         const struct app_forwards *object_right = obj_right;
261         const char *right_key = obj_right;
262         int cmp;
263
264         switch (flags & (OBJ_POINTER | OBJ_KEY | OBJ_PARTIAL_KEY)) {
265         case OBJ_POINTER:
266                 right_key = object_right->id;
267                 /* Fall through */
268         case OBJ_KEY:
269                 cmp = strcmp(object_left->id, right_key);
270                 break;
271         case OBJ_PARTIAL_KEY:
272                 /*
273                  * We could also use a partial key struct containing a length
274                  * so strlen() does not get called for every comparison instead.
275                  */
276                 cmp = strncmp(object_left->id, right_key, strlen(right_key));
277                 break;
278         default:
279                 /* Sort can only work on something with a full or partial key. */
280                 ast_assert(0);
281                 cmp = 0;
282                 break;
283         }
284         return cmp;
285 }
286
287 static void app_dtor(void *obj)
288 {
289         struct stasis_app *app = obj;
290         size_t size = strlen("stasis-") + strlen(app->name) + 1;
291         char context_name[size];
292
293         ast_verb(1, "Destroying Stasis app %s\n", app->name);
294
295         ast_assert(app->router == NULL);
296         ast_assert(app->bridge_router == NULL);
297         ast_assert(app->endpoint_router == NULL);
298
299         /* If we created a context for this application, remove it */
300         strcpy(context_name, "stasis-");
301         strcat(context_name, app->name);
302         ast_context_destroy_by_name(context_name, "res_stasis");
303
304         ao2_cleanup(app->topic);
305         app->topic = NULL;
306         ao2_cleanup(app->forwards);
307         app->forwards = NULL;
308         ao2_cleanup(app->data);
309         app->data = NULL;
310
311         ast_json_unref(app->events_allowed);
312         app->events_allowed = NULL;
313         ast_json_unref(app->events_disallowed);
314         app->events_disallowed = NULL;
315
316 }
317
318 static void call_forwarded_handler(struct stasis_app *app, struct stasis_message *message)
319 {
320         struct ast_multi_channel_blob *payload = stasis_message_data(message);
321         struct ast_channel_snapshot *snapshot = ast_multi_channel_blob_get_channel(payload, "forwarded");
322         struct ast_channel *chan;
323
324         if (!snapshot) {
325                 return;
326         }
327
328         chan = ast_channel_get_by_name(snapshot->base->uniqueid);
329         if (!chan) {
330                 return;
331         }
332
333         app_subscribe_channel(app, chan);
334         ast_channel_unref(chan);
335 }
336
337 static void sub_subscription_change_handler(void *data, struct stasis_subscription *sub,
338         struct stasis_message *message)
339 {
340         struct stasis_app *app = data;
341
342         if (stasis_subscription_final_message(sub, message)) {
343                 ao2_cleanup(app);
344         }
345 }
346
347 static void sub_default_handler(void *data, struct stasis_subscription *sub,
348         struct stasis_message *message)
349 {
350         struct stasis_app *app = data;
351         struct ast_json *json;
352
353         /* The dial type can be converted to JSON so it will always be passed
354          * here.
355          */
356         if (stasis_message_type(message) == ast_channel_dial_type()) {
357                 call_forwarded_handler(app, message);
358         }
359
360         /* By default, send any message that has a JSON representation */
361         json = stasis_message_to_json(message, stasis_app_get_sanitizer());
362         if (!json) {
363                 return;
364         }
365
366         app_send(app, json);
367         ast_json_unref(json);
368 }
369
370 /*! \brief Typedef for callbacks that get called on channel snapshot updates */
371 typedef struct ast_json *(*channel_snapshot_monitor)(
372         struct ast_channel_snapshot *old_snapshot,
373         struct ast_channel_snapshot *new_snapshot,
374         const struct timeval *tv);
375
376 static struct ast_json *simple_channel_event(
377         const char *type,
378         struct ast_channel_snapshot *snapshot,
379         const struct timeval *tv)
380 {
381         struct ast_json *json_channel = ast_channel_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
382
383         if (!json_channel) {
384                 return NULL;
385         }
386
387         return ast_json_pack("{s: s, s: o, s: o}",
388                 "type", type,
389                 "timestamp", ast_json_timeval(*tv, NULL),
390                 "channel", json_channel);
391 }
392
393 static struct ast_json *channel_created_event(
394         struct ast_channel_snapshot *snapshot,
395         const struct timeval *tv)
396 {
397         return simple_channel_event("ChannelCreated", snapshot, tv);
398 }
399
400 static struct ast_json *channel_destroyed_event(
401         struct ast_channel_snapshot *snapshot,
402         const struct timeval *tv)
403 {
404         struct ast_json *json_channel = ast_channel_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
405
406         if (!json_channel) {
407                 return NULL;
408         }
409
410         return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
411                 "type", "ChannelDestroyed",
412                 "timestamp", ast_json_timeval(*tv, NULL),
413                 "cause", snapshot->hangup->cause,
414                 "cause_txt", ast_cause2str(snapshot->hangup->cause),
415                 "channel", json_channel);
416 }
417
418 static struct ast_json *channel_state_change_event(
419         struct ast_channel_snapshot *snapshot,
420         const struct timeval *tv)
421 {
422         return simple_channel_event("ChannelStateChange", snapshot, tv);
423 }
424
425 /*! \brief Handle channel state changes */
426 static struct ast_json *channel_state(
427         struct ast_channel_snapshot *old_snapshot,
428         struct ast_channel_snapshot *new_snapshot,
429         const struct timeval *tv)
430 {
431         struct ast_channel_snapshot *snapshot = new_snapshot ?
432                 new_snapshot : old_snapshot;
433
434         if (!old_snapshot) {
435                 return channel_created_event(snapshot, tv);
436         } else if (ast_test_flag(&new_snapshot->flags, AST_FLAG_DEAD)) {
437                 return channel_destroyed_event(snapshot, tv);
438         } else if (old_snapshot->state != new_snapshot->state) {
439                 return channel_state_change_event(snapshot, tv);
440         }
441
442         return NULL;
443 }
444
445 static struct ast_json *channel_dialplan(
446         struct ast_channel_snapshot *old_snapshot,
447         struct ast_channel_snapshot *new_snapshot,
448         const struct timeval *tv)
449 {
450         struct ast_json *json_channel;
451
452         /* No Newexten event on first channel snapshot */
453         if (!old_snapshot) {
454                 return NULL;
455         }
456
457         /* Empty application is not valid for a Newexten event */
458         if (ast_strlen_zero(new_snapshot->dialplan->appl)) {
459                 return NULL;
460         }
461
462         if (ast_channel_snapshot_cep_equal(old_snapshot, new_snapshot)) {
463                 return NULL;
464         }
465
466         json_channel = ast_channel_snapshot_to_json(new_snapshot, stasis_app_get_sanitizer());
467         if (!json_channel) {
468                 return NULL;
469         }
470
471         return ast_json_pack("{s: s, s: o, s: s, s: s, s: o}",
472                 "type", "ChannelDialplan",
473                 "timestamp", ast_json_timeval(*tv, NULL),
474                 "dialplan_app", new_snapshot->dialplan->appl,
475                 "dialplan_app_data", AST_JSON_UTF8_VALIDATE(new_snapshot->dialplan->data),
476                 "channel", json_channel);
477 }
478
479 static struct ast_json *channel_callerid(
480         struct ast_channel_snapshot *old_snapshot,
481         struct ast_channel_snapshot *new_snapshot,
482         const struct timeval *tv)
483 {
484         struct ast_json *json_channel;
485
486         /* No NewCallerid event on first channel snapshot */
487         if (!old_snapshot) {
488                 return NULL;
489         }
490
491         if (ast_channel_snapshot_caller_id_equal(old_snapshot, new_snapshot)) {
492                 return NULL;
493         }
494
495         json_channel = ast_channel_snapshot_to_json(new_snapshot, stasis_app_get_sanitizer());
496         if (!json_channel) {
497                 return NULL;
498         }
499
500         return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
501                 "type", "ChannelCallerId",
502                 "timestamp", ast_json_timeval(*tv, NULL),
503                 "caller_presentation", new_snapshot->caller->pres,
504                 "caller_presentation_txt", ast_describe_caller_presentation(
505                         new_snapshot->caller->pres),
506                 "channel", json_channel);
507 }
508
509 static struct ast_json *channel_connected_line(
510         struct ast_channel_snapshot *old_snapshot,
511         struct ast_channel_snapshot *new_snapshot,
512         const struct timeval *tv)
513 {
514         struct ast_json *json_channel;
515
516         /* No ChannelConnectedLine event on first channel snapshot */
517         if (!old_snapshot) {
518                 return NULL;
519         }
520
521         if (ast_channel_snapshot_connected_line_equal(old_snapshot, new_snapshot)) {
522                 return NULL;
523         }
524
525         json_channel = ast_channel_snapshot_to_json(new_snapshot, stasis_app_get_sanitizer());
526         if (!json_channel) {
527                 return NULL;
528         }
529
530         return ast_json_pack("{s: s, s: o, s: o}",
531                 "type", "ChannelConnectedLine",
532                 "timestamp", ast_json_timeval(*tv, NULL),
533                 "channel", json_channel);
534 }
535
536 static channel_snapshot_monitor channel_monitors[] = {
537         channel_state,
538         channel_dialplan,
539         channel_callerid,
540         channel_connected_line,
541 };
542
543 static void sub_channel_update_handler(void *data,
544         struct stasis_subscription *sub,
545         struct stasis_message *message)
546 {
547         struct stasis_app *app = data;
548         struct ast_channel_snapshot_update *update = stasis_message_data(message);
549         int i;
550
551         for (i = 0; i < ARRAY_LEN(channel_monitors); ++i) {
552                 struct ast_json *msg;
553
554                 msg = channel_monitors[i](update->old_snapshot, update->new_snapshot,
555                         stasis_message_timestamp(message));
556                 if (msg) {
557                         app_send(app, msg);
558                         ast_json_unref(msg);
559                 }
560         }
561
562         if (ast_test_flag(&update->new_snapshot->flags, AST_FLAG_DEAD)) {
563                 unsubscribe(app, "channel", update->new_snapshot->base->uniqueid, 1);
564         }
565 }
566
567 static struct ast_json *simple_endpoint_event(
568         const char *type,
569         struct ast_endpoint_snapshot *snapshot,
570         const struct timeval *tv)
571 {
572         struct ast_json *json_endpoint = ast_endpoint_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
573
574         if (!json_endpoint) {
575                 return NULL;
576         }
577
578         return ast_json_pack("{s: s, s: o, s: o}",
579                 "type", type,
580                 "timestamp", ast_json_timeval(*tv, NULL),
581                 "endpoint", json_endpoint);
582 }
583
584 static int message_received_handler(const char *endpoint_id, struct ast_json *json_msg, void *pvt)
585 {
586         struct ast_endpoint_snapshot *snapshot;
587         struct ast_json *json_endpoint;
588         struct ast_json *message;
589         struct stasis_app *app = pvt;
590         char *tech;
591         char *resource;
592
593         tech = ast_strdupa(endpoint_id);
594         resource = strchr(tech, '/');
595         if (resource) {
596                 resource[0] = '\0';
597                 resource++;
598         }
599
600         if (ast_strlen_zero(tech) || ast_strlen_zero(resource)) {
601                 return -1;
602         }
603
604         snapshot = ast_endpoint_latest_snapshot(tech, resource);
605         if (!snapshot) {
606                 return -1;
607         }
608
609         json_endpoint = ast_endpoint_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
610         ao2_ref(snapshot, -1);
611         if (!json_endpoint) {
612                 return -1;
613         }
614
615         message = ast_json_pack("{s: s, s: o, s: o, s: o}",
616                 "type", "TextMessageReceived",
617                 "timestamp", ast_json_timeval(ast_tvnow(), NULL),
618                 "endpoint", json_endpoint,
619                 "message", ast_json_ref(json_msg));
620         if (message) {
621                 app_send(app, message);
622                 ast_json_unref(message);
623         }
624
625         return 0;
626 }
627
628 static void sub_endpoint_update_handler(void *data,
629         struct stasis_subscription *sub,
630         struct stasis_message *message)
631 {
632         struct stasis_app *app = data;
633         struct stasis_cache_update *update;
634         struct ast_endpoint_snapshot *new_snapshot;
635         struct ast_endpoint_snapshot *old_snapshot;
636         const struct timeval *tv;
637
638         ast_assert(stasis_message_type(message) == stasis_cache_update_type());
639
640         update = stasis_message_data(message);
641
642         ast_assert(update->type == ast_endpoint_snapshot_type());
643
644         new_snapshot = stasis_message_data(update->new_snapshot);
645         old_snapshot = stasis_message_data(update->old_snapshot);
646
647         if (new_snapshot) {
648                 struct ast_json *json;
649
650                 tv = stasis_message_timestamp(update->new_snapshot);
651
652                 json = simple_endpoint_event("EndpointStateChange", new_snapshot, tv);
653                 if (!json) {
654                         return;
655                 }
656
657                 app_send(app, json);
658                 ast_json_unref(json);
659         }
660
661         if (!new_snapshot && old_snapshot) {
662                 unsubscribe(app, "endpoint", old_snapshot->id, 1);
663         }
664 }
665
666 static struct ast_json *simple_bridge_event(
667         const char *type,
668         struct ast_bridge_snapshot *snapshot,
669         const struct timeval *tv)
670 {
671         struct ast_json *json_bridge = ast_bridge_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
672         if (!json_bridge) {
673                 return NULL;
674         }
675
676         return ast_json_pack("{s: s, s: o, s: o}",
677                 "type", type,
678                 "timestamp", ast_json_timeval(*tv, NULL),
679                 "bridge", json_bridge);
680 }
681
682 static void sub_bridge_update_handler(void *data,
683         struct stasis_subscription *sub,
684         struct stasis_message *message)
685 {
686         struct ast_json *json = NULL;
687         struct stasis_app *app = data;
688         struct ast_bridge_snapshot_update *update;
689         const struct timeval *tv;
690
691         update = stasis_message_data(message);
692
693         tv = stasis_message_timestamp(message);
694
695         if (!update->new_snapshot) {
696                 json = simple_bridge_event("BridgeDestroyed", update->old_snapshot, tv);
697         } else if (!update->old_snapshot) {
698                 json = simple_bridge_event("BridgeCreated", update->new_snapshot, tv);
699         } else if (update->new_snapshot && update->old_snapshot
700                 && strcmp(update->new_snapshot->video_source_id, update->old_snapshot->video_source_id)) {
701                 json = simple_bridge_event("BridgeVideoSourceChanged", update->new_snapshot, tv);
702                 if (json && !ast_strlen_zero(update->old_snapshot->video_source_id)) {
703                         ast_json_object_set(json, "old_video_source_id",
704                                 ast_json_string_create(update->old_snapshot->video_source_id));
705                 }
706         }
707
708         if (json) {
709                 app_send(app, json);
710                 ast_json_unref(json);
711         }
712
713         if (!update->new_snapshot && update->old_snapshot) {
714                 unsubscribe(app, "bridge", update->old_snapshot->uniqueid, 1);
715         }
716 }
717
718
719 /*! \brief Helper function for determining if the application is subscribed to a given entity */
720 static int bridge_app_subscribed(struct stasis_app *app, const char *uniqueid)
721 {
722         struct app_forwards *forwards = NULL;
723
724         forwards = ao2_find(app->forwards, uniqueid, OBJ_SEARCH_KEY);
725         if (!forwards) {
726                 return 0;
727         }
728
729         ao2_ref(forwards, -1);
730         return 1;
731 }
732
733 static void bridge_merge_handler(void *data, struct stasis_subscription *sub,
734         struct stasis_message *message)
735 {
736         struct stasis_app *app = data;
737         struct ast_bridge_merge_message *merge;
738
739         merge = stasis_message_data(message);
740
741         /* Find out if we're subscribed to either bridge */
742         if (bridge_app_subscribed(app, merge->from->uniqueid) ||
743                 bridge_app_subscribed(app, merge->to->uniqueid)) {
744                 /* Forward the message to the app */
745                 stasis_publish(app->topic, message);
746         }
747 }
748
749 /*! \brief Callback function for checking if channels in a bridge are subscribed to */
750 static int bridge_app_subscribed_involved(struct stasis_app *app, struct ast_bridge_snapshot *snapshot)
751 {
752         int subscribed = 0;
753         struct ao2_iterator iter;
754         char *uniqueid;
755
756         if (bridge_app_subscribed(app, snapshot->uniqueid)) {
757                 return 1;
758         }
759
760         iter = ao2_iterator_init(snapshot->channels, 0);
761         for (; (uniqueid = ao2_iterator_next(&iter)); ao2_ref(uniqueid, -1)) {
762                 if (bridge_app_subscribed(app, uniqueid)) {
763                         subscribed = 1;
764                         ao2_ref(uniqueid, -1);
765                         break;
766                 }
767         }
768         ao2_iterator_destroy(&iter);
769
770         return subscribed;
771 }
772
773 static void bridge_blind_transfer_handler(void *data, struct stasis_subscription *sub,
774         struct stasis_message *message)
775 {
776         struct stasis_app *app = data;
777         struct ast_blind_transfer_message *transfer_msg = stasis_message_data(message);
778         struct ast_bridge_snapshot *bridge = transfer_msg->bridge;
779
780         if (bridge_app_subscribed(app, transfer_msg->transferer->base->uniqueid) ||
781                 (bridge && bridge_app_subscribed_involved(app, bridge))) {
782                 stasis_publish(app->topic, message);
783         }
784 }
785
786 static void bridge_attended_transfer_handler(void *data, struct stasis_subscription *sub,
787         struct stasis_message *message)
788 {
789         struct stasis_app *app = data;
790         struct ast_attended_transfer_message *transfer_msg = stasis_message_data(message);
791         int subscribed = 0;
792
793         subscribed = bridge_app_subscribed(app, transfer_msg->to_transferee.channel_snapshot->base->uniqueid);
794         if (!subscribed) {
795                 subscribed = bridge_app_subscribed(app, transfer_msg->to_transfer_target.channel_snapshot->base->uniqueid);
796         }
797         if (!subscribed && transfer_msg->to_transferee.bridge_snapshot) {
798                 subscribed = bridge_app_subscribed_involved(app, transfer_msg->to_transferee.bridge_snapshot);
799         }
800         if (!subscribed && transfer_msg->to_transfer_target.bridge_snapshot) {
801                 subscribed = bridge_app_subscribed_involved(app, transfer_msg->to_transfer_target.bridge_snapshot);
802         }
803
804         if (!subscribed) {
805                 switch (transfer_msg->dest_type) {
806                 case AST_ATTENDED_TRANSFER_DEST_BRIDGE_MERGE:
807                         subscribed = bridge_app_subscribed(app, transfer_msg->dest.bridge);
808                         break;
809                 case AST_ATTENDED_TRANSFER_DEST_LINK:
810                         subscribed = bridge_app_subscribed(app, transfer_msg->dest.links[0]->base->uniqueid);
811                         if (!subscribed) {
812                                 subscribed = bridge_app_subscribed(app, transfer_msg->dest.links[1]->base->uniqueid);
813                         }
814                         break;
815                 break;
816                 case AST_ATTENDED_TRANSFER_DEST_THREEWAY:
817                         subscribed = bridge_app_subscribed_involved(app, transfer_msg->dest.threeway.bridge_snapshot);
818                         if (!subscribed) {
819                                 subscribed = bridge_app_subscribed(app, transfer_msg->dest.threeway.channel_snapshot->base->uniqueid);
820                         }
821                         break;
822                 default:
823                         break;
824                 }
825         }
826
827         if (subscribed) {
828                 stasis_publish(app->topic, message);
829         }
830 }
831
832 static void bridge_subscription_change_handler(void *data, struct stasis_subscription *sub,
833         struct stasis_message *message)
834 {
835         struct stasis_app *app = data;
836
837         if (stasis_subscription_final_message(sub, message)) {
838                 ao2_cleanup(app);
839         }
840 }
841
842 void stasis_app_set_debug(struct stasis_app *app, int debug)
843 {
844         if (!app) {
845                 return;
846         }
847
848         app->debug = debug;
849 }
850
851 void stasis_app_set_debug_by_name(const char *app_name, int debug)
852 {
853         struct stasis_app *app = stasis_app_get_by_name(app_name);
854
855         if (!app) {
856                 return;
857         }
858
859         app->debug = debug;
860         ao2_cleanup(app);
861 }
862
863 int stasis_app_get_debug(struct stasis_app *app)
864 {
865         return (app ? app->debug : 0) || global_debug;
866 }
867
868 int stasis_app_get_debug_by_name(const char *app_name)
869 {
870         int debug_enabled = 0;
871
872         if (global_debug) {
873                 debug_enabled = 1;
874         } else {
875                 struct stasis_app *app = stasis_app_get_by_name(app_name);
876
877                 if (app) {
878                         if (app->debug) {
879                                 debug_enabled = 1;
880                         }
881                         ao2_ref(app, -1);
882                 }
883         }
884         return debug_enabled;
885 }
886
887 void stasis_app_set_global_debug(int debug)
888 {
889         global_debug = debug;
890         if (!global_debug) {
891                 struct ao2_container *app_names = stasis_app_get_all();
892                 struct ao2_iterator it_app_names;
893                 char *app_name;
894                 struct stasis_app *app;
895
896                 if (!app_names || !ao2_container_count(app_names)) {
897                         ao2_cleanup(app_names);
898                         return;
899                 }
900
901                 it_app_names = ao2_iterator_init(app_names, 0);
902                 while ((app_name = ao2_iterator_next(&it_app_names))) {
903                         if ((app = stasis_app_get_by_name(app_name))) {
904                                 stasis_app_set_debug(app, 0);
905                         }
906
907                         ao2_cleanup(app_name);
908                         ao2_cleanup(app);
909                 }
910                 ao2_iterator_cleanup(&it_app_names);
911                 ao2_cleanup(app_names);
912         }
913 }
914
915 struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *data, enum stasis_app_subscription_model subscription_model)
916 {
917         RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
918         size_t size;
919         int res = 0;
920         size_t context_size = strlen("stasis-") + strlen(name) + 1;
921         char context_name[context_size];
922         char *topic_name;
923         int ret;
924
925         ast_assert(name != NULL);
926         ast_assert(handler != NULL);
927
928         ast_verb(1, "Creating Stasis app '%s'\n", name);
929
930         size = sizeof(*app) + strlen(name) + 1;
931         app = ao2_alloc_options(size, app_dtor, AO2_ALLOC_OPT_LOCK_MUTEX);
932         if (!app) {
933                 return NULL;
934         }
935         app->subscription_model = subscription_model;
936
937         app->forwards = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_MUTEX,
938                 AO2_CONTAINER_ALLOC_OPT_DUPS_OBJ_REJECT,
939                 forwards_sort, NULL);
940         if (!app->forwards) {
941                 return NULL;
942         }
943
944         ret = ast_asprintf(&topic_name, "ari:application/%s", name);
945         if (ret < 0) {
946                 return NULL;
947         }
948
949         app->topic = stasis_topic_create(topic_name);
950         ast_free(topic_name);
951         if (!app->topic) {
952                 return NULL;
953         }
954
955         app->bridge_router = stasis_message_router_create(ast_bridge_topic_all());
956         if (!app->bridge_router) {
957                 return NULL;
958         }
959
960         res |= stasis_message_router_add(app->bridge_router,
961                 ast_bridge_merge_message_type(), bridge_merge_handler, app);
962
963         res |= stasis_message_router_add(app->bridge_router,
964                 ast_blind_transfer_type(), bridge_blind_transfer_handler, app);
965
966         res |= stasis_message_router_add(app->bridge_router,
967                 ast_attended_transfer_type(), bridge_attended_transfer_handler, app);
968
969         res |= stasis_message_router_add(app->bridge_router,
970                 stasis_subscription_change_type(), bridge_subscription_change_handler, app);
971
972         if (res != 0) {
973                 return NULL;
974         }
975         /* Bridge router holds a reference */
976         ao2_ref(app, +1);
977
978         app->router = stasis_message_router_create(app->topic);
979         if (!app->router) {
980                 return NULL;
981         }
982
983         res |= stasis_message_router_add(app->router,
984                 ast_bridge_snapshot_type(), sub_bridge_update_handler, app);
985
986         res |= stasis_message_router_add(app->router,
987                 ast_channel_snapshot_type(), sub_channel_update_handler, app);
988
989         res |= stasis_message_router_add_cache_update(app->router,
990                 ast_endpoint_snapshot_type(), sub_endpoint_update_handler, app);
991
992         res |= stasis_message_router_add(app->router,
993                 stasis_subscription_change_type(), sub_subscription_change_handler, app);
994
995         stasis_message_router_set_formatters_default(app->router,
996                 sub_default_handler, app, STASIS_SUBSCRIPTION_FORMATTER_JSON);
997
998         if (res != 0) {
999                 return NULL;
1000         }
1001         /* Router holds a reference */
1002         ao2_ref(app, +1);
1003
1004         strncpy(app->name, name, size - sizeof(*app));
1005         app->handler = handler;
1006         app->data = ao2_bump(data);
1007
1008         /* Create a context, a match-all extension, and a 'h' extension for this application. Note that
1009          * this should only be done if a context does not already exist. */
1010         strcpy(context_name, "stasis-");
1011         strcat(context_name, name);
1012         if (!ast_context_find(context_name)) {
1013                 if (!ast_context_find_or_create(NULL, NULL, context_name, "res_stasis")) {
1014                         ast_log(LOG_WARNING, "Could not create context '%s' for Stasis application '%s'\n", context_name, name);
1015                 } else {
1016                         ast_add_extension(context_name, 0, "_.", 1, NULL, NULL, "Stasis", ast_strdup(name), ast_free_ptr, "res_stasis");
1017                         ast_add_extension(context_name, 0, "h", 1, NULL, NULL, "NoOp", NULL, NULL, "res_stasis");
1018                 }
1019         } else {
1020                 ast_log(LOG_WARNING, "Not creating context '%s' for Stasis application '%s' because it already exists\n",
1021                         context_name, name);
1022         }
1023
1024         ao2_ref(app, +1);
1025         return app;
1026 }
1027
1028 struct stasis_topic *ast_app_get_topic(struct stasis_app *app)
1029 {
1030         return app->topic;
1031 }
1032
1033 /*!
1034  * \brief Send a message to the given application.
1035  * \param app App to send the message to.
1036  * \param message Message to send.
1037  */
1038 void app_send(struct stasis_app *app, struct ast_json *message)
1039 {
1040         stasis_app_cb handler;
1041         char eid[20];
1042         void *data;
1043
1044         if (ast_json_object_set(message, "asterisk_id", ast_json_string_create(
1045                         ast_eid_to_str(eid, sizeof(eid), &ast_eid_default)))) {
1046                 ast_log(AST_LOG_WARNING, "Failed to append EID to outgoing event %s\n",
1047                         ast_json_string_get(ast_json_object_get(message, "type")));
1048         }
1049
1050         /* Copy off mutable state with lock held */
1051         ao2_lock(app);
1052         handler = app->handler;
1053         data = ao2_bump(app->data);
1054         ao2_unlock(app);
1055         /* Name is immutable; no need to copy */
1056
1057         if (handler) {
1058                 handler(data, app->name, message);
1059         } else {
1060                 ast_verb(3,
1061                         "Inactive Stasis app '%s' missed message\n", app->name);
1062         }
1063         ao2_cleanup(data);
1064 }
1065
1066 void app_deactivate(struct stasis_app *app)
1067 {
1068         ao2_lock(app);
1069
1070         ast_verb(1, "Deactivating Stasis app '%s'\n", app->name);
1071         app->handler = NULL;
1072         ao2_cleanup(app->data);
1073         app->data = NULL;
1074
1075         ao2_unlock(app);
1076 }
1077
1078 void app_shutdown(struct stasis_app *app)
1079 {
1080         ao2_lock(app);
1081
1082         ast_assert(app_is_finished(app));
1083
1084         stasis_message_router_unsubscribe(app->router);
1085         app->router = NULL;
1086         stasis_message_router_unsubscribe(app->bridge_router);
1087         app->bridge_router = NULL;
1088         stasis_message_router_unsubscribe(app->endpoint_router);
1089         app->endpoint_router = NULL;
1090
1091         ao2_unlock(app);
1092 }
1093
1094 int app_is_active(struct stasis_app *app)
1095 {
1096         int ret;
1097
1098         ao2_lock(app);
1099         ret = app->handler != NULL;
1100         ao2_unlock(app);
1101
1102         return ret;
1103 }
1104
1105 int app_is_finished(struct stasis_app *app)
1106 {
1107         int ret;
1108
1109         ao2_lock(app);
1110         ret = app->handler == NULL && ao2_container_count(app->forwards) == 0;
1111         ao2_unlock(app);
1112
1113         return ret;
1114 }
1115
1116 void app_update(struct stasis_app *app, stasis_app_cb handler, void *data)
1117 {
1118         ao2_lock(app);
1119         if (app->handler && app->data) {
1120                 struct ast_json *msg;
1121
1122                 ast_verb(1, "Replacing Stasis app '%s'\n", app->name);
1123
1124                 msg = ast_json_pack("{s: s, s: o?, s: s}",
1125                         "type", "ApplicationReplaced",
1126                         "timestamp", ast_json_timeval(ast_tvnow(), NULL),
1127                         "application", app->name);
1128                 if (msg) {
1129                         app_send(app, msg);
1130                         ast_json_unref(msg);
1131                 }
1132         } else {
1133                 ast_verb(1, "Activating Stasis app '%s'\n", app->name);
1134         }
1135
1136         app->handler = handler;
1137         ao2_replace(app->data, data);
1138         ao2_unlock(app);
1139 }
1140
1141 const char *stasis_app_name(const struct stasis_app *app)
1142 {
1143         return app->name;
1144 }
1145
1146 static int forwards_filter_by_type(void *obj, void *arg, int flags)
1147 {
1148         struct app_forwards *forward = obj;
1149         enum forward_type *forward_type = arg;
1150
1151         if (forward->forward_type == *forward_type) {
1152                 return CMP_MATCH;
1153         }
1154
1155         return 0;
1156 }
1157
1158 void stasis_app_to_cli(const struct stasis_app *app, struct ast_cli_args *a)
1159 {
1160         struct ao2_iterator *channels;
1161         struct ao2_iterator *endpoints;
1162         struct ao2_iterator *bridges;
1163         struct app_forwards *forward;
1164         enum forward_type forward_type;
1165
1166         ast_cli(a->fd, "Name: %s\n"
1167                 "  Debug: %s\n"
1168                 "  Subscription Model: %s\n",
1169                 app->name,
1170                 app->debug ? "Yes" : "No",
1171                 app->subscription_model == STASIS_APP_SUBSCRIBE_ALL ?
1172                         "Global Resource Subscription" :
1173                         "Application/Explicit Resource Subscription");
1174         ast_cli(a->fd, "  Subscriptions: %d\n", ao2_container_count(app->forwards));
1175
1176         ast_cli(a->fd, "    Channels:\n");
1177         forward_type = FORWARD_CHANNEL;
1178         channels = ao2_callback(app->forwards, OBJ_MULTIPLE,
1179                 forwards_filter_by_type, &forward_type);
1180         if (channels) {
1181                 while ((forward = ao2_iterator_next(channels))) {
1182                         ast_cli(a->fd, "      %s (%d)\n", forward->id, forward->interested);
1183                         ao2_ref(forward, -1);
1184                 }
1185                 ao2_iterator_destroy(channels);
1186         }
1187
1188         ast_cli(a->fd, "    Bridges:\n");
1189         forward_type = FORWARD_BRIDGE;
1190         bridges = ao2_callback(app->forwards, OBJ_MULTIPLE,
1191                 forwards_filter_by_type, &forward_type);
1192         if (bridges) {
1193                 while ((forward = ao2_iterator_next(bridges))) {
1194                         ast_cli(a->fd, "      %s (%d)\n", forward->id, forward->interested);
1195                         ao2_ref(forward, -1);
1196                 }
1197                 ao2_iterator_destroy(bridges);
1198         }
1199
1200         ast_cli(a->fd, "    Endpoints:\n");
1201         forward_type = FORWARD_ENDPOINT;
1202         endpoints = ao2_callback(app->forwards, OBJ_MULTIPLE,
1203                 forwards_filter_by_type, &forward_type);
1204         if (endpoints) {
1205                 while ((forward = ao2_iterator_next(endpoints))) {
1206                         ast_cli(a->fd, "      %s (%d)\n", forward->id, forward->interested);
1207                         ao2_ref(forward, -1);
1208                 }
1209                 ao2_iterator_destroy(endpoints);
1210         }
1211 }
1212
1213 struct ast_json *app_to_json(const struct stasis_app *app)
1214 {
1215         struct ast_json *json;
1216         struct ast_json *channels;
1217         struct ast_json *bridges;
1218         struct ast_json *endpoints;
1219         struct ao2_iterator i;
1220         struct app_forwards *forwards;
1221
1222         json = ast_json_pack("{s: s, s: [], s: [], s: []}",
1223                 "name", app->name,
1224                 "channel_ids", "bridge_ids", "endpoint_ids");
1225         if (!json) {
1226                 return NULL;
1227         }
1228         channels = ast_json_object_get(json, "channel_ids");
1229         bridges = ast_json_object_get(json, "bridge_ids");
1230         endpoints = ast_json_object_get(json, "endpoint_ids");
1231
1232         i = ao2_iterator_init(app->forwards, 0);
1233         while ((forwards = ao2_iterator_next(&i))) {
1234                 struct ast_json *array = NULL;
1235                 int append_res;
1236
1237                 switch (forwards->forward_type) {
1238                 case FORWARD_CHANNEL:
1239                         array = channels;
1240                         break;
1241                 case FORWARD_BRIDGE:
1242                         array = bridges;
1243                         break;
1244                 case FORWARD_ENDPOINT:
1245                         array = endpoints;
1246                         break;
1247                 }
1248
1249                 /* If forward_type value is unexpected this will safely return an error. */
1250                 append_res = ast_json_array_append(array, ast_json_string_create(forwards->id));
1251                 ao2_ref(forwards, -1);
1252
1253                 if (append_res != 0) {
1254                         ast_log(LOG_ERROR, "Error building response\n");
1255                         ao2_iterator_destroy(&i);
1256                         ast_json_unref(json);
1257
1258                         return NULL;
1259                 }
1260         }
1261         ao2_iterator_destroy(&i);
1262
1263         return json;
1264 }
1265
1266 int app_subscribe_channel(struct stasis_app *app, struct ast_channel *chan)
1267 {
1268         struct app_forwards *forwards;
1269
1270         if (!app) {
1271                 return -1;
1272         }
1273
1274         ao2_lock(app->forwards);
1275         /* If subscribed to all, don't subscribe again */
1276         forwards = ao2_find(app->forwards, CHANNEL_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1277         if (forwards) {
1278                 ao2_unlock(app->forwards);
1279                 ao2_ref(forwards, -1);
1280
1281                 return 0;
1282         }
1283
1284         forwards = ao2_find(app->forwards,
1285                 chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL,
1286                 OBJ_SEARCH_KEY | OBJ_NOLOCK);
1287         if (!forwards) {
1288                 int res;
1289
1290                 /* Forwards not found, create one */
1291                 forwards = forwards_create_channel(app, chan);
1292                 if (!forwards) {
1293                         ao2_unlock(app->forwards);
1294
1295                         return -1;
1296                 }
1297
1298                 res = ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
1299                 if (!res) {
1300                         ao2_unlock(app->forwards);
1301                         ao2_ref(forwards, -1);
1302
1303                         return -1;
1304                 }
1305         }
1306
1307         ++forwards->interested;
1308         ast_debug(3, "Channel '%s' is %d interested in %s\n",
1309                 chan ? ast_channel_uniqueid(chan) : "ALL",
1310                 forwards->interested,
1311                 app->name);
1312
1313         ao2_unlock(app->forwards);
1314         ao2_ref(forwards, -1);
1315
1316         return 0;
1317 }
1318
1319 static int subscribe_channel(struct stasis_app *app, void *obj)
1320 {
1321         return app_subscribe_channel(app, obj);
1322 }
1323
1324 static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate)
1325 {
1326         struct app_forwards *forwards;
1327
1328         if (!id) {
1329                 if (!strcmp(kind, "bridge")) {
1330                         id = BRIDGE_ALL;
1331                 } else if (!strcmp(kind, "channel")) {
1332                         id = CHANNEL_ALL;
1333                 } else if (!strcmp(kind, "endpoint")) {
1334                         id = ENDPOINT_ALL;
1335                 } else {
1336                         ast_log(LOG_WARNING, "Unknown subscription kind '%s'\n", kind);
1337                         return -1;
1338                 }
1339         }
1340
1341         ao2_lock(app->forwards);
1342         forwards = ao2_find(app->forwards, id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1343         if (!forwards) {
1344                 ao2_unlock(app->forwards);
1345                 ast_debug(3, "App '%s' not subscribed to %s '%s'\n", app->name, kind, id);
1346                 return -1;
1347         }
1348         forwards->interested--;
1349
1350         ast_debug(3, "%s '%s': is %d interested in %s\n", kind, id, forwards->interested, app->name);
1351         if (forwards->interested == 0 || terminate) {
1352                 /* No one is interested any more; unsubscribe */
1353                 ast_debug(3, "%s '%s' unsubscribed from %s\n", kind, id, app->name);
1354                 forwards_unsubscribe(forwards);
1355                 ao2_find(app->forwards, forwards,
1356                         OBJ_POINTER | OBJ_NOLOCK | OBJ_UNLINK |
1357                         OBJ_NODATA);
1358
1359                 if (!strcmp(kind, "endpoint")) {
1360                         messaging_app_unsubscribe_endpoint(app->name, id);
1361                 }
1362         }
1363         ao2_unlock(app->forwards);
1364         ao2_ref(forwards, -1);
1365
1366         return 0;
1367 }
1368
1369 int app_unsubscribe_channel(struct stasis_app *app, struct ast_channel *chan)
1370 {
1371         if (!app) {
1372                 return -1;
1373         }
1374
1375         return app_unsubscribe_channel_id(app, chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL);
1376 }
1377
1378 int app_unsubscribe_channel_id(struct stasis_app *app, const char *channel_id)
1379 {
1380         if (!app) {
1381                 return -1;
1382         }
1383
1384         return unsubscribe(app, "channel", channel_id, 0);
1385 }
1386
1387 int app_is_subscribed_channel_id(struct stasis_app *app, const char *channel_id)
1388 {
1389         struct app_forwards *forwards;
1390
1391         if (ast_strlen_zero(channel_id)) {
1392                 channel_id = CHANNEL_ALL;
1393         }
1394         forwards = ao2_find(app->forwards, channel_id, OBJ_SEARCH_KEY);
1395         ao2_cleanup(forwards);
1396
1397         return forwards != NULL;
1398 }
1399
1400 static void *channel_find(const struct stasis_app *app, const char *id)
1401 {
1402         return ast_channel_get_by_name(id);
1403 }
1404
1405 struct stasis_app_event_source channel_event_source = {
1406         .scheme = "channel:",
1407         .find = channel_find,
1408         .subscribe = subscribe_channel,
1409         .unsubscribe = app_unsubscribe_channel_id,
1410         .is_subscribed = app_is_subscribed_channel_id
1411 };
1412
1413 int app_subscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
1414 {
1415         struct app_forwards *forwards;
1416
1417         if (!app) {
1418                 return -1;
1419         }
1420
1421         ao2_lock(app->forwards);
1422         /* If subscribed to all, don't subscribe again */
1423         forwards = ao2_find(app->forwards, BRIDGE_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1424         if (forwards) {
1425                 ao2_unlock(app->forwards);
1426                 ao2_ref(forwards, -1);
1427
1428                 return 0;
1429         }
1430
1431         forwards = ao2_find(app->forwards,
1432                 bridge ? bridge->uniqueid : BRIDGE_ALL,
1433                 OBJ_SEARCH_KEY | OBJ_NOLOCK);
1434         if (!forwards) {
1435                 int res;
1436
1437                 /* Forwards not found, create one */
1438                 forwards = forwards_create_bridge(app, bridge);
1439                 if (!forwards) {
1440                         ao2_unlock(app->forwards);
1441
1442                         return -1;
1443                 }
1444
1445                 res = ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
1446                 if (!res) {
1447                         ao2_unlock(app->forwards);
1448                         ao2_ref(forwards, -1);
1449
1450                         return -1;
1451                 }
1452         }
1453
1454         ++forwards->interested;
1455         ast_debug(3, "Bridge '%s' is %d interested in %s\n",
1456                 bridge ? bridge->uniqueid : "ALL",
1457                 forwards->interested,
1458                 app->name);
1459
1460         ao2_unlock(app->forwards);
1461         ao2_ref(forwards, -1);
1462
1463         return 0;
1464 }
1465
1466 static int subscribe_bridge(struct stasis_app *app, void *obj)
1467 {
1468         return app_subscribe_bridge(app, obj);
1469 }
1470
1471 int app_unsubscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
1472 {
1473         if (!app) {
1474                 return -1;
1475         }
1476
1477         return app_unsubscribe_bridge_id(app, bridge ? bridge->uniqueid : BRIDGE_ALL);
1478 }
1479
1480 int app_unsubscribe_bridge_id(struct stasis_app *app, const char *bridge_id)
1481 {
1482         if (!app) {
1483                 return -1;
1484         }
1485
1486         return unsubscribe(app, "bridge", bridge_id, 0);
1487 }
1488
1489 int app_is_subscribed_bridge_id(struct stasis_app *app, const char *bridge_id)
1490 {
1491         struct app_forwards *forwards;
1492
1493         if (ast_strlen_zero(bridge_id)) {
1494                 bridge_id = BRIDGE_ALL;
1495         }
1496
1497         forwards = ao2_find(app->forwards, bridge_id, OBJ_SEARCH_KEY);
1498         ao2_cleanup(forwards);
1499
1500         return forwards != NULL;
1501 }
1502
1503 static void *bridge_find(const struct stasis_app *app, const char *id)
1504 {
1505         return stasis_app_bridge_find_by_id(id);
1506 }
1507
1508 struct stasis_app_event_source bridge_event_source = {
1509         .scheme = "bridge:",
1510         .find = bridge_find,
1511         .subscribe = subscribe_bridge,
1512         .unsubscribe = app_unsubscribe_bridge_id,
1513         .is_subscribed = app_is_subscribed_bridge_id
1514 };
1515
1516 int app_subscribe_endpoint(struct stasis_app *app, struct ast_endpoint *endpoint)
1517 {
1518         struct app_forwards *forwards;
1519
1520         if (!app) {
1521                 return -1;
1522         }
1523
1524         ao2_lock(app->forwards);
1525         /* If subscribed to all, don't subscribe again */
1526         forwards = ao2_find(app->forwards, ENDPOINT_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1527         if (forwards) {
1528                 ao2_unlock(app->forwards);
1529                 ao2_ref(forwards, -1);
1530
1531                 return 0;
1532         }
1533
1534         forwards = ao2_find(app->forwards,
1535                 endpoint ? ast_endpoint_get_id(endpoint) : ENDPOINT_ALL,
1536                 OBJ_SEARCH_KEY | OBJ_NOLOCK);
1537         if (!forwards) {
1538                 int res;
1539
1540                 /* Forwards not found, create one */
1541                 forwards = forwards_create_endpoint(app, endpoint);
1542                 if (!forwards) {
1543                         ao2_unlock(app->forwards);
1544
1545                         return -1;
1546                 }
1547
1548                 res = ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
1549                 if (!res) {
1550                         ao2_unlock(app->forwards);
1551                         ao2_ref(forwards, -1);
1552
1553                         return -1;
1554                 }
1555
1556                 /* Subscribe for messages */
1557                 messaging_app_subscribe_endpoint(app->name, endpoint, &message_received_handler, app);
1558         }
1559
1560         ++forwards->interested;
1561         ast_debug(3, "Endpoint '%s' is %d interested in %s\n",
1562                 endpoint ? ast_endpoint_get_id(endpoint) : "ALL",
1563                 forwards->interested,
1564                 app->name);
1565
1566         ao2_unlock(app->forwards);
1567         ao2_ref(forwards, -1);
1568
1569         return 0;
1570 }
1571
1572 static int subscribe_endpoint(struct stasis_app *app, void *obj)
1573 {
1574         return app_subscribe_endpoint(app, obj);
1575 }
1576
1577 int app_unsubscribe_endpoint_id(struct stasis_app *app, const char *endpoint_id)
1578 {
1579         if (!app) {
1580                 return -1;
1581         }
1582
1583         return unsubscribe(app, "endpoint", endpoint_id, 0);
1584 }
1585
1586 int app_is_subscribed_endpoint_id(struct stasis_app *app, const char *endpoint_id)
1587 {
1588         struct app_forwards *forwards;
1589
1590         if (ast_strlen_zero(endpoint_id)) {
1591                 endpoint_id = ENDPOINT_ALL;
1592         }
1593         forwards = ao2_find(app->forwards, endpoint_id, OBJ_SEARCH_KEY);
1594         ao2_cleanup(forwards);
1595
1596         return forwards != NULL;
1597 }
1598
1599 static void *endpoint_find(const struct stasis_app *app, const char *id)
1600 {
1601         return ast_endpoint_find_by_id(id);
1602 }
1603
1604 struct stasis_app_event_source endpoint_event_source = {
1605         .scheme = "endpoint:",
1606         .find = endpoint_find,
1607         .subscribe = subscribe_endpoint,
1608         .unsubscribe = app_unsubscribe_endpoint_id,
1609         .is_subscribed = app_is_subscribed_endpoint_id
1610 };
1611
1612 void stasis_app_register_event_sources(void)
1613 {
1614         stasis_app_register_event_source(&channel_event_source);
1615         stasis_app_register_event_source(&bridge_event_source);
1616         stasis_app_register_event_source(&endpoint_event_source);
1617 }
1618
1619 void stasis_app_unregister_event_sources(void)
1620 {
1621         stasis_app_unregister_event_source(&endpoint_event_source);
1622         stasis_app_unregister_event_source(&bridge_event_source);
1623         stasis_app_unregister_event_source(&channel_event_source);
1624 }
1625
1626 struct ast_json *stasis_app_event_filter_to_json(struct stasis_app *app, struct ast_json *json)
1627 {
1628         if (!app || !json) {
1629                 return json;
1630         }
1631
1632         ast_json_object_set(json, "events_allowed", app->events_allowed ?
1633                 ast_json_ref(app->events_allowed) : ast_json_array_create());
1634         ast_json_object_set(json, "events_disallowed", app->events_disallowed ?
1635                 ast_json_ref(app->events_disallowed) : ast_json_array_create());
1636
1637         return json;
1638 }
1639
1640 static int app_event_filter_set(struct stasis_app *app, struct ast_json **member,
1641         struct ast_json *filter, const char *filter_type)
1642 {
1643         if (filter && ast_json_typeof(filter) == AST_JSON_OBJECT) {
1644                 if (!ast_json_object_size(filter)) {
1645                         /* If no filters are specified then reset this filter type */
1646                         filter = NULL;
1647                 } else {
1648                         /* Otherwise try to get the filter array for this type */
1649                         filter = ast_json_object_get(filter, filter_type);
1650                         if (!filter) {
1651                                 /* A filter type exists, but not this one, so don't update */
1652                                 return 0;
1653                         }
1654                 }
1655         }
1656
1657         /* At this point the filter object should be an array */
1658         if (filter && ast_json_typeof(filter) != AST_JSON_ARRAY) {
1659                 ast_log(LOG_ERROR, "Invalid json type event filter - app: %s, filter: %s\n",
1660                                 app->name, filter_type);
1661                 return -1;
1662         }
1663
1664         if (filter) {
1665                 /* Confirm that at least the type names are specified */
1666                 struct ast_json *obj;
1667                 int i;
1668
1669                 for (i = 0; i < ast_json_array_size(filter) &&
1670                                  (obj = ast_json_array_get(filter, i)); ++i) {
1671
1672                         if (ast_strlen_zero(ast_json_object_string_get(obj, "type"))) {
1673                                 ast_log(LOG_ERROR, "Filter event must have a type - app: %s, "
1674                                                 "filter: %s\n", app->name, filter_type);
1675                                 return -1;
1676                         }
1677                 }
1678         }
1679
1680         ao2_lock(app);
1681         ast_json_unref(*member);
1682         *member = filter ? ast_json_ref(filter) : NULL;
1683         ao2_unlock(app);
1684
1685         return 0;
1686 }
1687
1688 static int app_events_allowed_set(struct stasis_app *app, struct ast_json *filter)
1689 {
1690         return app_event_filter_set(app, &app->events_allowed, filter, "allowed");
1691 }
1692
1693 static int app_events_disallowed_set(struct stasis_app *app, struct ast_json *filter)
1694 {
1695         return app_event_filter_set(app, &app->events_disallowed, filter, "disallowed");
1696 }
1697
1698 int stasis_app_event_filter_set(struct stasis_app *app, struct ast_json *filter)
1699 {
1700         return app_events_disallowed_set(app, filter) || app_events_allowed_set(app, filter);
1701 }
1702
1703 static int app_event_filter_matched(struct ast_json *array, struct ast_json *event, int empty)
1704 {
1705         struct ast_json *obj;
1706         int i;
1707
1708         if (!array || !ast_json_array_size(array)) {
1709                 return empty;
1710         }
1711
1712         for (i = 0; i < ast_json_array_size(array) &&
1713                         (obj = ast_json_array_get(array, i)); ++i) {
1714
1715                 if (ast_strings_equal(ast_json_object_string_get(obj, "type"),
1716                                 ast_json_object_string_get(event, "type"))) {
1717                         return 1;
1718                 }
1719         }
1720
1721         return 0;
1722 }
1723
1724 int stasis_app_event_allowed(const char *app_name, struct ast_json *event)
1725 {
1726         struct stasis_app *app = stasis_app_get_by_name(app_name);
1727         int res;
1728
1729         if (!app) {
1730                 return 0;
1731         }
1732
1733         ao2_lock(app);
1734         res = !app_event_filter_matched(app->events_disallowed, event, 0) &&
1735                 app_event_filter_matched(app->events_allowed, event, 1);
1736         ao2_unlock(app);
1737         ao2_ref(app, -1);
1738
1739         return res;
1740 }