585eddaf8121ac4de646244734d6fb83706c35a0
[asterisk/asterisk.git] / res / stasis / app.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis application support.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 #include "asterisk.h"
27
28 #include "app.h"
29 #include "control.h"
30 #include "messaging.h"
31
32 #include "asterisk/callerid.h"
33 #include "asterisk/cli.h"
34 #include "asterisk/stasis_app.h"
35 #include "asterisk/stasis_bridges.h"
36 #include "asterisk/stasis_channels.h"
37 #include "asterisk/stasis_endpoints.h"
38 #include "asterisk/stasis_message_router.h"
39
40 #define BRIDGE_ALL "__AST_BRIDGE_ALL_TOPIC"
41 #define CHANNEL_ALL "__AST_CHANNEL_ALL_TOPIC"
42 #define ENDPOINT_ALL "__AST_ENDPOINT_ALL_TOPIC"
43
44 /*! Global debug flag.  No need for locking */
45 int global_debug;
46
47 static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate);
48
49 struct stasis_app {
50         /*! Aggregation topic for this application. */
51         struct stasis_topic *topic;
52         /*! Router for handling messages forwarded to \a topic. */
53         struct stasis_message_router *router;
54         /*! Router for handling messages to the bridge all \a topic. */
55         struct stasis_message_router *bridge_router;
56         /*! Optional router for handling endpoint messages in 'all' subscriptions */
57         struct stasis_message_router *endpoint_router;
58         /*! Container of the channel forwards to this app's topic. */
59         struct ao2_container *forwards;
60         /*! Callback function for this application. */
61         stasis_app_cb handler;
62         /*! Opaque data to hand to callback function. */
63         void *data;
64         /*! Subscription model for the application */
65         enum stasis_app_subscription_model subscription_model;
66         /*! Whether or not someone wants to see debug messages about this app */
67         int debug;
68         /*! Name of the Stasis application */
69         char name[];
70 };
71
72 enum forward_type {
73         FORWARD_CHANNEL,
74         FORWARD_BRIDGE,
75         FORWARD_ENDPOINT,
76 };
77
78 /*! Subscription info for a particular channel/bridge. */
79 struct app_forwards {
80         /*! Count of number of times this channel/bridge has been subscribed */
81         int interested;
82
83         /*! Forward for the regular topic */
84         struct stasis_forward *topic_forward;
85         /*! Forward for the caching topic */
86         struct stasis_forward *topic_cached_forward;
87
88         /* Type of object being forwarded */
89         enum forward_type forward_type;
90         /*! Unique id of the object being forwarded */
91         char id[];
92 };
93
94 static void forwards_dtor(void *obj)
95 {
96 #ifdef AST_DEVMODE
97         struct app_forwards *forwards = obj;
98 #endif /* AST_DEVMODE */
99
100         ast_assert(forwards->topic_forward == NULL);
101         ast_assert(forwards->topic_cached_forward == NULL);
102 }
103
104 static void forwards_unsubscribe(struct app_forwards *forwards)
105 {
106         stasis_forward_cancel(forwards->topic_forward);
107         forwards->topic_forward = NULL;
108         stasis_forward_cancel(forwards->topic_cached_forward);
109         forwards->topic_cached_forward = NULL;
110 }
111
112 static struct app_forwards *forwards_create(struct stasis_app *app,
113         const char *id)
114 {
115         struct app_forwards *forwards;
116
117         if (!app || ast_strlen_zero(id)) {
118                 return NULL;
119         }
120
121         forwards = ao2_t_alloc(sizeof(*forwards) + strlen(id) + 1, forwards_dtor, id);
122         if (!forwards) {
123                 return NULL;
124         }
125
126         strcpy(forwards->id, id); /* SAFE */
127
128         return forwards;
129 }
130
131 /*! Forward a channel's topics to an app */
132 static struct app_forwards *forwards_create_channel(struct stasis_app *app,
133         struct ast_channel *chan)
134 {
135         struct app_forwards *forwards;
136
137         if (!app) {
138                 return NULL;
139         }
140
141         forwards = forwards_create(app, chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL);
142         if (!forwards) {
143                 return NULL;
144         }
145
146         forwards->forward_type = FORWARD_CHANNEL;
147         forwards->topic_forward = stasis_forward_all(
148                 chan ? ast_channel_topic(chan) : ast_channel_topic_all(),
149                 app->topic);
150
151         if (!forwards->topic_forward) {
152                 ao2_ref(forwards, -1);
153                 return NULL;
154         }
155
156         return forwards;
157 }
158
159 /*! Forward a bridge's topics to an app */
160 static struct app_forwards *forwards_create_bridge(struct stasis_app *app,
161         struct ast_bridge *bridge)
162 {
163         struct app_forwards *forwards;
164
165         if (!app) {
166                 return NULL;
167         }
168
169         forwards = forwards_create(app, bridge ? bridge->uniqueid : BRIDGE_ALL);
170         if (!forwards) {
171                 return NULL;
172         }
173
174         forwards->forward_type = FORWARD_BRIDGE;
175         forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge), app->topic);
176
177         if (!forwards->topic_forward && bridge) {
178                 forwards_unsubscribe(forwards);
179                 ao2_ref(forwards, -1);
180                 return NULL;
181         }
182
183         return forwards;
184 }
185
186 static void endpoint_state_cb(void *data, struct stasis_subscription *sub,
187         struct stasis_message *message)
188 {
189         struct stasis_app *app = data;
190
191         stasis_publish(app->topic, message);
192 }
193
194 /*! Forward a endpoint's topics to an app */
195 static struct app_forwards *forwards_create_endpoint(struct stasis_app *app,
196         struct ast_endpoint *endpoint)
197 {
198         struct app_forwards *forwards;
199         int ret = 0;
200
201         if (!app) {
202                 return NULL;
203         }
204
205         forwards = forwards_create(app, endpoint ? ast_endpoint_get_id(endpoint) : ENDPOINT_ALL);
206         if (!forwards) {
207                 return NULL;
208         }
209
210         forwards->forward_type = FORWARD_ENDPOINT;
211         if (endpoint) {
212                 forwards->topic_forward = stasis_forward_all(ast_endpoint_topic(endpoint),
213                         app->topic);
214                 forwards->topic_cached_forward = stasis_forward_all(
215                         ast_endpoint_topic_cached(endpoint), app->topic);
216
217                 if (!forwards->topic_forward || !forwards->topic_cached_forward) {
218                         /* Half-subscribed is a bad thing */
219                         forwards_unsubscribe(forwards);
220                         ao2_ref(forwards, -1);
221                         return NULL;
222                 }
223         } else {
224                 /* Since endpoint subscriptions also subscribe to channels, in the case
225                  * of all endpoint subscriptions, we only want messages for the endpoints.
226                  * As such, we route those particular messages and then re-publish them
227                  * on the app's topic.
228                  */
229                 ast_assert(app->endpoint_router == NULL);
230                 app->endpoint_router = stasis_message_router_create(ast_endpoint_topic_all_cached());
231                 if (!app->endpoint_router) {
232                         forwards_unsubscribe(forwards);
233                         ao2_ref(forwards, -1);
234                         return NULL;
235                 }
236
237                 ret |= stasis_message_router_add(app->endpoint_router,
238                         ast_endpoint_state_type(), endpoint_state_cb, app);
239                 ret |= stasis_message_router_add(app->endpoint_router,
240                         ast_endpoint_contact_state_type(), endpoint_state_cb, app);
241
242                 if (ret) {
243                         ao2_ref(app->endpoint_router, -1);
244                         app->endpoint_router = NULL;
245                         ao2_ref(forwards, -1);
246                         return NULL;
247                 }
248         }
249
250         return forwards;
251 }
252
253 static int forwards_sort(const void *obj_left, const void *obj_right, int flags)
254 {
255         const struct app_forwards *object_left = obj_left;
256         const struct app_forwards *object_right = obj_right;
257         const char *right_key = obj_right;
258         int cmp;
259
260         switch (flags & (OBJ_POINTER | OBJ_KEY | OBJ_PARTIAL_KEY)) {
261         case OBJ_POINTER:
262                 right_key = object_right->id;
263                 /* Fall through */
264         case OBJ_KEY:
265                 cmp = strcmp(object_left->id, right_key);
266                 break;
267         case OBJ_PARTIAL_KEY:
268                 /*
269                  * We could also use a partial key struct containing a length
270                  * so strlen() does not get called for every comparison instead.
271                  */
272                 cmp = strncmp(object_left->id, right_key, strlen(right_key));
273                 break;
274         default:
275                 /* Sort can only work on something with a full or partial key. */
276                 ast_assert(0);
277                 cmp = 0;
278                 break;
279         }
280         return cmp;
281 }
282
283 static void app_dtor(void *obj)
284 {
285         struct stasis_app *app = obj;
286
287         ast_verb(1, "Destroying Stasis app %s\n", app->name);
288
289         ast_assert(app->router == NULL);
290         ast_assert(app->bridge_router == NULL);
291         ast_assert(app->endpoint_router == NULL);
292
293         ao2_cleanup(app->topic);
294         app->topic = NULL;
295         ao2_cleanup(app->forwards);
296         app->forwards = NULL;
297         ao2_cleanup(app->data);
298         app->data = NULL;
299 }
300
301 static void call_forwarded_handler(struct stasis_app *app, struct stasis_message *message)
302 {
303         struct ast_multi_channel_blob *payload = stasis_message_data(message);
304         struct ast_channel_snapshot *snapshot = ast_multi_channel_blob_get_channel(payload, "forwarded");
305         struct ast_channel *chan;
306
307         if (!snapshot) {
308                 return;
309         }
310
311         chan = ast_channel_get_by_name(snapshot->base->uniqueid);
312         if (!chan) {
313                 return;
314         }
315
316         app_subscribe_channel(app, chan);
317         ast_channel_unref(chan);
318 }
319
320 static void sub_subscription_change_handler(void *data, struct stasis_subscription *sub,
321         struct stasis_message *message)
322 {
323         struct stasis_app *app = data;
324
325         if (stasis_subscription_final_message(sub, message)) {
326                 ao2_cleanup(app);
327         }
328 }
329
330 static void sub_default_handler(void *data, struct stasis_subscription *sub,
331         struct stasis_message *message)
332 {
333         struct stasis_app *app = data;
334         struct ast_json *json;
335
336         /* The dial type can be converted to JSON so it will always be passed
337          * here.
338          */
339         if (stasis_message_type(message) == ast_channel_dial_type()) {
340                 call_forwarded_handler(app, message);
341         }
342
343         /* By default, send any message that has a JSON representation */
344         json = stasis_message_to_json(message, stasis_app_get_sanitizer());
345         if (!json) {
346                 return;
347         }
348
349         app_send(app, json);
350         ast_json_unref(json);
351 }
352
353 /*! \brief Typedef for callbacks that get called on channel snapshot updates */
354 typedef struct ast_json *(*channel_snapshot_monitor)(
355         struct ast_channel_snapshot *old_snapshot,
356         struct ast_channel_snapshot *new_snapshot,
357         const struct timeval *tv);
358
359 static struct ast_json *simple_channel_event(
360         const char *type,
361         struct ast_channel_snapshot *snapshot,
362         const struct timeval *tv)
363 {
364         struct ast_json *json_channel = ast_channel_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
365
366         if (!json_channel) {
367                 return NULL;
368         }
369
370         return ast_json_pack("{s: s, s: o, s: o}",
371                 "type", type,
372                 "timestamp", ast_json_timeval(*tv, NULL),
373                 "channel", json_channel);
374 }
375
376 static struct ast_json *channel_created_event(
377         struct ast_channel_snapshot *snapshot,
378         const struct timeval *tv)
379 {
380         return simple_channel_event("ChannelCreated", snapshot, tv);
381 }
382
383 static struct ast_json *channel_destroyed_event(
384         struct ast_channel_snapshot *snapshot,
385         const struct timeval *tv)
386 {
387         struct ast_json *json_channel = ast_channel_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
388
389         if (!json_channel) {
390                 return NULL;
391         }
392
393         return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
394                 "type", "ChannelDestroyed",
395                 "timestamp", ast_json_timeval(*tv, NULL),
396                 "cause", snapshot->hangup->cause,
397                 "cause_txt", ast_cause2str(snapshot->hangup->cause),
398                 "channel", json_channel);
399 }
400
401 static struct ast_json *channel_state_change_event(
402         struct ast_channel_snapshot *snapshot,
403         const struct timeval *tv)
404 {
405         return simple_channel_event("ChannelStateChange", snapshot, tv);
406 }
407
408 /*! \brief Handle channel state changes */
409 static struct ast_json *channel_state(
410         struct ast_channel_snapshot *old_snapshot,
411         struct ast_channel_snapshot *new_snapshot,
412         const struct timeval *tv)
413 {
414         struct ast_channel_snapshot *snapshot = new_snapshot ?
415                 new_snapshot : old_snapshot;
416
417         if (!old_snapshot) {
418                 return channel_created_event(snapshot, tv);
419         } else if (ast_test_flag(&new_snapshot->flags, AST_FLAG_DEAD)) {
420                 return channel_destroyed_event(snapshot, tv);
421         } else if (old_snapshot->state != new_snapshot->state) {
422                 return channel_state_change_event(snapshot, tv);
423         }
424
425         return NULL;
426 }
427
428 static struct ast_json *channel_dialplan(
429         struct ast_channel_snapshot *old_snapshot,
430         struct ast_channel_snapshot *new_snapshot,
431         const struct timeval *tv)
432 {
433         struct ast_json *json_channel;
434
435         /* No Newexten event on first channel snapshot */
436         if (!old_snapshot) {
437                 return NULL;
438         }
439
440         /* Empty application is not valid for a Newexten event */
441         if (ast_strlen_zero(new_snapshot->dialplan->appl)) {
442                 return NULL;
443         }
444
445         if (ast_channel_snapshot_cep_equal(old_snapshot, new_snapshot)) {
446                 return NULL;
447         }
448
449         json_channel = ast_channel_snapshot_to_json(new_snapshot, stasis_app_get_sanitizer());
450         if (!json_channel) {
451                 return NULL;
452         }
453
454         return ast_json_pack("{s: s, s: o, s: s, s: s, s: o}",
455                 "type", "ChannelDialplan",
456                 "timestamp", ast_json_timeval(*tv, NULL),
457                 "dialplan_app", new_snapshot->dialplan->appl,
458                 "dialplan_app_data", AST_JSON_UTF8_VALIDATE(new_snapshot->dialplan->data),
459                 "channel", json_channel);
460 }
461
462 static struct ast_json *channel_callerid(
463         struct ast_channel_snapshot *old_snapshot,
464         struct ast_channel_snapshot *new_snapshot,
465         const struct timeval *tv)
466 {
467         struct ast_json *json_channel;
468
469         /* No NewCallerid event on first channel snapshot */
470         if (!old_snapshot) {
471                 return NULL;
472         }
473
474         if (ast_channel_snapshot_caller_id_equal(old_snapshot, new_snapshot)) {
475                 return NULL;
476         }
477
478         json_channel = ast_channel_snapshot_to_json(new_snapshot, stasis_app_get_sanitizer());
479         if (!json_channel) {
480                 return NULL;
481         }
482
483         return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
484                 "type", "ChannelCallerId",
485                 "timestamp", ast_json_timeval(*tv, NULL),
486                 "caller_presentation", new_snapshot->caller->pres,
487                 "caller_presentation_txt", ast_describe_caller_presentation(
488                         new_snapshot->caller->pres),
489                 "channel", json_channel);
490 }
491
492 static struct ast_json *channel_connected_line(
493         struct ast_channel_snapshot *old_snapshot,
494         struct ast_channel_snapshot *new_snapshot,
495         const struct timeval *tv)
496 {
497         struct ast_json *json_channel;
498
499         /* No ChannelConnectedLine event on first channel snapshot */
500         if (!old_snapshot) {
501                 return NULL;
502         }
503
504         if (ast_channel_snapshot_connected_line_equal(old_snapshot, new_snapshot)) {
505                 return NULL;
506         }
507
508         json_channel = ast_channel_snapshot_to_json(new_snapshot, stasis_app_get_sanitizer());
509         if (!json_channel) {
510                 return NULL;
511         }
512
513         return ast_json_pack("{s: s, s: o, s: o}",
514                 "type", "ChannelConnectedLine",
515                 "timestamp", ast_json_timeval(*tv, NULL),
516                 "channel", json_channel);
517 }
518
519 static channel_snapshot_monitor channel_monitors[] = {
520         channel_state,
521         channel_dialplan,
522         channel_callerid,
523         channel_connected_line,
524 };
525
526 static void sub_channel_update_handler(void *data,
527         struct stasis_subscription *sub,
528         struct stasis_message *message)
529 {
530         struct stasis_app *app = data;
531         struct ast_channel_snapshot_update *update = stasis_message_data(message);
532         int i;
533
534         for (i = 0; i < ARRAY_LEN(channel_monitors); ++i) {
535                 struct ast_json *msg;
536
537                 msg = channel_monitors[i](update->old_snapshot, update->new_snapshot,
538                         stasis_message_timestamp(message));
539                 if (msg) {
540                         app_send(app, msg);
541                         ast_json_unref(msg);
542                 }
543         }
544
545         if (ast_test_flag(&update->new_snapshot->flags, AST_FLAG_DEAD)) {
546                 unsubscribe(app, "channel", update->new_snapshot->base->uniqueid, 1);
547         }
548 }
549
550 static struct ast_json *simple_endpoint_event(
551         const char *type,
552         struct ast_endpoint_snapshot *snapshot,
553         const struct timeval *tv)
554 {
555         struct ast_json *json_endpoint = ast_endpoint_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
556
557         if (!json_endpoint) {
558                 return NULL;
559         }
560
561         return ast_json_pack("{s: s, s: o, s: o}",
562                 "type", type,
563                 "timestamp", ast_json_timeval(*tv, NULL),
564                 "endpoint", json_endpoint);
565 }
566
567 static int message_received_handler(const char *endpoint_id, struct ast_json *json_msg, void *pvt)
568 {
569         struct ast_endpoint_snapshot *snapshot;
570         struct ast_json *json_endpoint;
571         struct ast_json *message;
572         struct stasis_app *app = pvt;
573         char *tech;
574         char *resource;
575
576         tech = ast_strdupa(endpoint_id);
577         resource = strchr(tech, '/');
578         if (resource) {
579                 resource[0] = '\0';
580                 resource++;
581         }
582
583         if (ast_strlen_zero(tech) || ast_strlen_zero(resource)) {
584                 return -1;
585         }
586
587         snapshot = ast_endpoint_latest_snapshot(tech, resource);
588         if (!snapshot) {
589                 return -1;
590         }
591
592         json_endpoint = ast_endpoint_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
593         ao2_ref(snapshot, -1);
594         if (!json_endpoint) {
595                 return -1;
596         }
597
598         message = ast_json_pack("{s: s, s: o, s: o, s: o}",
599                 "type", "TextMessageReceived",
600                 "timestamp", ast_json_timeval(ast_tvnow(), NULL),
601                 "endpoint", json_endpoint,
602                 "message", ast_json_ref(json_msg));
603         if (message) {
604                 app_send(app, message);
605                 ast_json_unref(message);
606         }
607
608         return 0;
609 }
610
611 static void sub_endpoint_update_handler(void *data,
612         struct stasis_subscription *sub,
613         struct stasis_message *message)
614 {
615         struct stasis_app *app = data;
616         struct stasis_cache_update *update;
617         struct ast_endpoint_snapshot *new_snapshot;
618         struct ast_endpoint_snapshot *old_snapshot;
619         const struct timeval *tv;
620
621         ast_assert(stasis_message_type(message) == stasis_cache_update_type());
622
623         update = stasis_message_data(message);
624
625         ast_assert(update->type == ast_endpoint_snapshot_type());
626
627         new_snapshot = stasis_message_data(update->new_snapshot);
628         old_snapshot = stasis_message_data(update->old_snapshot);
629
630         if (new_snapshot) {
631                 struct ast_json *json;
632
633                 tv = stasis_message_timestamp(update->new_snapshot);
634
635                 json = simple_endpoint_event("EndpointStateChange", new_snapshot, tv);
636                 if (!json) {
637                         return;
638                 }
639
640                 app_send(app, json);
641                 ast_json_unref(json);
642         }
643
644         if (!new_snapshot && old_snapshot) {
645                 unsubscribe(app, "endpoint", old_snapshot->id, 1);
646         }
647 }
648
649 static struct ast_json *simple_bridge_event(
650         const char *type,
651         struct ast_bridge_snapshot *snapshot,
652         const struct timeval *tv)
653 {
654         struct ast_json *json_bridge = ast_bridge_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
655         if (!json_bridge) {
656                 return NULL;
657         }
658
659         return ast_json_pack("{s: s, s: o, s: o}",
660                 "type", type,
661                 "timestamp", ast_json_timeval(*tv, NULL),
662                 "bridge", json_bridge);
663 }
664
665 static void sub_bridge_update_handler(void *data,
666         struct stasis_subscription *sub,
667         struct stasis_message *message)
668 {
669         struct ast_json *json = NULL;
670         struct stasis_app *app = data;
671         struct ast_bridge_snapshot_update *update;
672         const struct timeval *tv;
673
674         update = stasis_message_data(message);
675
676         tv = stasis_message_timestamp(message);
677
678         if (!update->new_snapshot) {
679                 json = simple_bridge_event("BridgeDestroyed", update->old_snapshot, tv);
680         } else if (!update->old_snapshot) {
681                 json = simple_bridge_event("BridgeCreated", update->new_snapshot, tv);
682         } else if (update->new_snapshot && update->old_snapshot
683                 && strcmp(update->new_snapshot->video_source_id, update->old_snapshot->video_source_id)) {
684                 json = simple_bridge_event("BridgeVideoSourceChanged", update->new_snapshot, tv);
685                 if (json && !ast_strlen_zero(update->old_snapshot->video_source_id)) {
686                         ast_json_object_set(json, "old_video_source_id",
687                                 ast_json_string_create(update->old_snapshot->video_source_id));
688                 }
689         }
690
691         if (json) {
692                 app_send(app, json);
693                 ast_json_unref(json);
694         }
695
696         if (!update->new_snapshot && update->old_snapshot) {
697                 unsubscribe(app, "bridge", update->old_snapshot->uniqueid, 1);
698         }
699 }
700
701
702 /*! \brief Helper function for determining if the application is subscribed to a given entity */
703 static int bridge_app_subscribed(struct stasis_app *app, const char *uniqueid)
704 {
705         struct app_forwards *forwards = NULL;
706
707         forwards = ao2_find(app->forwards, uniqueid, OBJ_SEARCH_KEY);
708         if (!forwards) {
709                 return 0;
710         }
711
712         ao2_ref(forwards, -1);
713         return 1;
714 }
715
716 static void bridge_merge_handler(void *data, struct stasis_subscription *sub,
717         struct stasis_message *message)
718 {
719         struct stasis_app *app = data;
720         struct ast_bridge_merge_message *merge;
721
722         merge = stasis_message_data(message);
723
724         /* Find out if we're subscribed to either bridge */
725         if (bridge_app_subscribed(app, merge->from->uniqueid) ||
726                 bridge_app_subscribed(app, merge->to->uniqueid)) {
727                 /* Forward the message to the app */
728                 stasis_publish(app->topic, message);
729         }
730 }
731
732 /*! \brief Callback function for checking if channels in a bridge are subscribed to */
733 static int bridge_app_subscribed_involved(struct stasis_app *app, struct ast_bridge_snapshot *snapshot)
734 {
735         int subscribed = 0;
736         struct ao2_iterator iter;
737         char *uniqueid;
738
739         if (bridge_app_subscribed(app, snapshot->uniqueid)) {
740                 return 1;
741         }
742
743         iter = ao2_iterator_init(snapshot->channels, 0);
744         for (; (uniqueid = ao2_iterator_next(&iter)); ao2_ref(uniqueid, -1)) {
745                 if (bridge_app_subscribed(app, uniqueid)) {
746                         subscribed = 1;
747                         ao2_ref(uniqueid, -1);
748                         break;
749                 }
750         }
751         ao2_iterator_destroy(&iter);
752
753         return subscribed;
754 }
755
756 static void bridge_blind_transfer_handler(void *data, struct stasis_subscription *sub,
757         struct stasis_message *message)
758 {
759         struct stasis_app *app = data;
760         struct ast_blind_transfer_message *transfer_msg = stasis_message_data(message);
761         struct ast_bridge_snapshot *bridge = transfer_msg->bridge;
762
763         if (bridge_app_subscribed(app, transfer_msg->transferer->base->uniqueid) ||
764                 (bridge && bridge_app_subscribed_involved(app, bridge))) {
765                 stasis_publish(app->topic, message);
766         }
767 }
768
769 static void bridge_attended_transfer_handler(void *data, struct stasis_subscription *sub,
770         struct stasis_message *message)
771 {
772         struct stasis_app *app = data;
773         struct ast_attended_transfer_message *transfer_msg = stasis_message_data(message);
774         int subscribed = 0;
775
776         subscribed = bridge_app_subscribed(app, transfer_msg->to_transferee.channel_snapshot->base->uniqueid);
777         if (!subscribed) {
778                 subscribed = bridge_app_subscribed(app, transfer_msg->to_transfer_target.channel_snapshot->base->uniqueid);
779         }
780         if (!subscribed && transfer_msg->to_transferee.bridge_snapshot) {
781                 subscribed = bridge_app_subscribed_involved(app, transfer_msg->to_transferee.bridge_snapshot);
782         }
783         if (!subscribed && transfer_msg->to_transfer_target.bridge_snapshot) {
784                 subscribed = bridge_app_subscribed_involved(app, transfer_msg->to_transfer_target.bridge_snapshot);
785         }
786
787         if (!subscribed) {
788                 switch (transfer_msg->dest_type) {
789                 case AST_ATTENDED_TRANSFER_DEST_BRIDGE_MERGE:
790                         subscribed = bridge_app_subscribed(app, transfer_msg->dest.bridge);
791                         break;
792                 case AST_ATTENDED_TRANSFER_DEST_LINK:
793                         subscribed = bridge_app_subscribed(app, transfer_msg->dest.links[0]->base->uniqueid);
794                         if (!subscribed) {
795                                 subscribed = bridge_app_subscribed(app, transfer_msg->dest.links[1]->base->uniqueid);
796                         }
797                         break;
798                 break;
799                 case AST_ATTENDED_TRANSFER_DEST_THREEWAY:
800                         subscribed = bridge_app_subscribed_involved(app, transfer_msg->dest.threeway.bridge_snapshot);
801                         if (!subscribed) {
802                                 subscribed = bridge_app_subscribed(app, transfer_msg->dest.threeway.channel_snapshot->base->uniqueid);
803                         }
804                         break;
805                 default:
806                         break;
807                 }
808         }
809
810         if (subscribed) {
811                 stasis_publish(app->topic, message);
812         }
813 }
814
815 static void bridge_subscription_change_handler(void *data, struct stasis_subscription *sub,
816         struct stasis_message *message)
817 {
818         struct stasis_app *app = data;
819
820         if (stasis_subscription_final_message(sub, message)) {
821                 ao2_cleanup(app);
822         }
823 }
824
825 void stasis_app_set_debug(struct stasis_app *app, int debug)
826 {
827         if (!app) {
828                 return;
829         }
830
831         app->debug = debug;
832 }
833
834 void stasis_app_set_debug_by_name(const char *app_name, int debug)
835 {
836         struct stasis_app *app = stasis_app_get_by_name(app_name);
837
838         if (!app) {
839                 return;
840         }
841
842         app->debug = debug;
843         ao2_cleanup(app);
844 }
845
846 int stasis_app_get_debug(struct stasis_app *app)
847 {
848         return (app ? app->debug : 0) || global_debug;
849 }
850
851 int stasis_app_get_debug_by_name(const char *app_name)
852 {
853         int debug_enabled = 0;
854
855         if (global_debug) {
856                 debug_enabled = 1;
857         } else {
858                 struct stasis_app *app = stasis_app_get_by_name(app_name);
859
860                 if (app) {
861                         if (app->debug) {
862                                 debug_enabled = 1;
863                         }
864                         ao2_ref(app, -1);
865                 }
866         }
867         return debug_enabled;
868 }
869
870 void stasis_app_set_global_debug(int debug)
871 {
872         global_debug = debug;
873         if (!global_debug) {
874                 struct ao2_container *app_names = stasis_app_get_all();
875                 struct ao2_iterator it_app_names;
876                 char *app_name;
877                 struct stasis_app *app;
878
879                 if (!app_names || !ao2_container_count(app_names)) {
880                         ao2_cleanup(app_names);
881                         return;
882                 }
883
884                 it_app_names = ao2_iterator_init(app_names, 0);
885                 while ((app_name = ao2_iterator_next(&it_app_names))) {
886                         if ((app = stasis_app_get_by_name(app_name))) {
887                                 stasis_app_set_debug(app, 0);
888                         }
889
890                         ao2_cleanup(app_name);
891                         ao2_cleanup(app);
892                 }
893                 ao2_iterator_cleanup(&it_app_names);
894                 ao2_cleanup(app_names);
895         }
896 }
897
898 struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *data, enum stasis_app_subscription_model subscription_model)
899 {
900         RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
901         size_t size;
902         int res = 0;
903
904         ast_assert(name != NULL);
905         ast_assert(handler != NULL);
906
907         ast_verb(1, "Creating Stasis app '%s'\n", name);
908
909         size = sizeof(*app) + strlen(name) + 1;
910         app = ao2_alloc_options(size, app_dtor, AO2_ALLOC_OPT_LOCK_MUTEX);
911         if (!app) {
912                 return NULL;
913         }
914         app->subscription_model = subscription_model;
915
916         app->forwards = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_MUTEX,
917                 AO2_CONTAINER_ALLOC_OPT_DUPS_OBJ_REJECT,
918                 forwards_sort, NULL);
919         if (!app->forwards) {
920                 return NULL;
921         }
922
923         app->topic = stasis_topic_create(name);
924         if (!app->topic) {
925                 return NULL;
926         }
927
928         app->bridge_router = stasis_message_router_create(ast_bridge_topic_all());
929         if (!app->bridge_router) {
930                 return NULL;
931         }
932
933         res |= stasis_message_router_add(app->bridge_router,
934                 ast_bridge_merge_message_type(), bridge_merge_handler, app);
935
936         res |= stasis_message_router_add(app->bridge_router,
937                 ast_blind_transfer_type(), bridge_blind_transfer_handler, app);
938
939         res |= stasis_message_router_add(app->bridge_router,
940                 ast_attended_transfer_type(), bridge_attended_transfer_handler, app);
941
942         res |= stasis_message_router_add(app->bridge_router,
943                 stasis_subscription_change_type(), bridge_subscription_change_handler, app);
944
945         if (res != 0) {
946                 return NULL;
947         }
948         /* Bridge router holds a reference */
949         ao2_ref(app, +1);
950
951         app->router = stasis_message_router_create(app->topic);
952         if (!app->router) {
953                 return NULL;
954         }
955
956         res |= stasis_message_router_add(app->router,
957                 ast_bridge_snapshot_type(), sub_bridge_update_handler, app);
958
959         res |= stasis_message_router_add(app->router,
960                 ast_channel_snapshot_type(), sub_channel_update_handler, app);
961
962         res |= stasis_message_router_add_cache_update(app->router,
963                 ast_endpoint_snapshot_type(), sub_endpoint_update_handler, app);
964
965         res |= stasis_message_router_add(app->router,
966                 stasis_subscription_change_type(), sub_subscription_change_handler, app);
967
968         stasis_message_router_set_formatters_default(app->router,
969                 sub_default_handler, app, STASIS_SUBSCRIPTION_FORMATTER_JSON);
970
971         if (res != 0) {
972                 return NULL;
973         }
974         /* Router holds a reference */
975         ao2_ref(app, +1);
976
977         strncpy(app->name, name, size - sizeof(*app));
978         app->handler = handler;
979         app->data = ao2_bump(data);
980
981         ao2_ref(app, +1);
982         return app;
983 }
984
985 struct stasis_topic *ast_app_get_topic(struct stasis_app *app)
986 {
987         return app->topic;
988 }
989
990 /*!
991  * \brief Send a message to the given application.
992  * \param app App to send the message to.
993  * \param message Message to send.
994  */
995 void app_send(struct stasis_app *app, struct ast_json *message)
996 {
997         stasis_app_cb handler;
998         char eid[20];
999         void *data;
1000
1001         if (ast_json_object_set(message, "asterisk_id", ast_json_string_create(
1002                         ast_eid_to_str(eid, sizeof(eid), &ast_eid_default)))) {
1003                 ast_log(AST_LOG_WARNING, "Failed to append EID to outgoing event %s\n",
1004                         ast_json_string_get(ast_json_object_get(message, "type")));
1005         }
1006
1007         /* Copy off mutable state with lock held */
1008         ao2_lock(app);
1009         handler = app->handler;
1010         data = ao2_bump(app->data);
1011         ao2_unlock(app);
1012         /* Name is immutable; no need to copy */
1013
1014         if (handler) {
1015                 handler(data, app->name, message);
1016         } else {
1017                 ast_verb(3,
1018                         "Inactive Stasis app '%s' missed message\n", app->name);
1019         }
1020         ao2_cleanup(data);
1021 }
1022
1023 void app_deactivate(struct stasis_app *app)
1024 {
1025         ao2_lock(app);
1026
1027         ast_verb(1, "Deactivating Stasis app '%s'\n", app->name);
1028         app->handler = NULL;
1029         ao2_cleanup(app->data);
1030         app->data = NULL;
1031
1032         ao2_unlock(app);
1033 }
1034
1035 void app_shutdown(struct stasis_app *app)
1036 {
1037         ao2_lock(app);
1038
1039         ast_assert(app_is_finished(app));
1040
1041         stasis_message_router_unsubscribe(app->router);
1042         app->router = NULL;
1043         stasis_message_router_unsubscribe(app->bridge_router);
1044         app->bridge_router = NULL;
1045         stasis_message_router_unsubscribe(app->endpoint_router);
1046         app->endpoint_router = NULL;
1047
1048         ao2_unlock(app);
1049 }
1050
1051 int app_is_active(struct stasis_app *app)
1052 {
1053         int ret;
1054
1055         ao2_lock(app);
1056         ret = app->handler != NULL;
1057         ao2_unlock(app);
1058
1059         return ret;
1060 }
1061
1062 int app_is_finished(struct stasis_app *app)
1063 {
1064         int ret;
1065
1066         ao2_lock(app);
1067         ret = app->handler == NULL && ao2_container_count(app->forwards) == 0;
1068         ao2_unlock(app);
1069
1070         return ret;
1071 }
1072
1073 void app_update(struct stasis_app *app, stasis_app_cb handler, void *data)
1074 {
1075         ao2_lock(app);
1076         if (app->handler && app->data) {
1077                 struct ast_json *msg;
1078
1079                 ast_verb(1, "Replacing Stasis app '%s'\n", app->name);
1080
1081                 msg = ast_json_pack("{s: s, s: s}",
1082                         "type", "ApplicationReplaced",
1083                         "application", app->name);
1084                 if (msg) {
1085                         app_send(app, msg);
1086                         ast_json_unref(msg);
1087                 }
1088         } else {
1089                 ast_verb(1, "Activating Stasis app '%s'\n", app->name);
1090         }
1091
1092         app->handler = handler;
1093         ao2_replace(app->data, data);
1094         ao2_unlock(app);
1095 }
1096
1097 const char *stasis_app_name(const struct stasis_app *app)
1098 {
1099         return app->name;
1100 }
1101
1102 static int forwards_filter_by_type(void *obj, void *arg, int flags)
1103 {
1104         struct app_forwards *forward = obj;
1105         enum forward_type *forward_type = arg;
1106
1107         if (forward->forward_type == *forward_type) {
1108                 return CMP_MATCH;
1109         }
1110
1111         return 0;
1112 }
1113
1114 void stasis_app_to_cli(const struct stasis_app *app, struct ast_cli_args *a)
1115 {
1116         struct ao2_iterator *channels;
1117         struct ao2_iterator *endpoints;
1118         struct ao2_iterator *bridges;
1119         struct app_forwards *forward;
1120         enum forward_type forward_type;
1121
1122         ast_cli(a->fd, "Name: %s\n"
1123                 "  Debug: %s\n"
1124                 "  Subscription Model: %s\n",
1125                 app->name,
1126                 app->debug ? "Yes" : "No",
1127                 app->subscription_model == STASIS_APP_SUBSCRIBE_ALL ?
1128                         "Global Resource Subscription" :
1129                         "Application/Explicit Resource Subscription");
1130         ast_cli(a->fd, "  Subscriptions: %d\n", ao2_container_count(app->forwards));
1131
1132         ast_cli(a->fd, "    Channels:\n");
1133         forward_type = FORWARD_CHANNEL;
1134         channels = ao2_callback(app->forwards, OBJ_MULTIPLE,
1135                 forwards_filter_by_type, &forward_type);
1136         if (channels) {
1137                 while ((forward = ao2_iterator_next(channels))) {
1138                         ast_cli(a->fd, "      %s (%d)\n", forward->id, forward->interested);
1139                         ao2_ref(forward, -1);
1140                 }
1141                 ao2_iterator_destroy(channels);
1142         }
1143
1144         ast_cli(a->fd, "    Bridges:\n");
1145         forward_type = FORWARD_BRIDGE;
1146         bridges = ao2_callback(app->forwards, OBJ_MULTIPLE,
1147                 forwards_filter_by_type, &forward_type);
1148         if (bridges) {
1149                 while ((forward = ao2_iterator_next(bridges))) {
1150                         ast_cli(a->fd, "      %s (%d)\n", forward->id, forward->interested);
1151                         ao2_ref(forward, -1);
1152                 }
1153                 ao2_iterator_destroy(bridges);
1154         }
1155
1156         ast_cli(a->fd, "    Endpoints:\n");
1157         forward_type = FORWARD_ENDPOINT;
1158         endpoints = ao2_callback(app->forwards, OBJ_MULTIPLE,
1159                 forwards_filter_by_type, &forward_type);
1160         if (endpoints) {
1161                 while ((forward = ao2_iterator_next(endpoints))) {
1162                         ast_cli(a->fd, "      %s (%d)\n", forward->id, forward->interested);
1163                         ao2_ref(forward, -1);
1164                 }
1165                 ao2_iterator_destroy(endpoints);
1166         }
1167 }
1168
1169 struct ast_json *app_to_json(const struct stasis_app *app)
1170 {
1171         struct ast_json *json;
1172         struct ast_json *channels;
1173         struct ast_json *bridges;
1174         struct ast_json *endpoints;
1175         struct ao2_iterator i;
1176         struct app_forwards *forwards;
1177
1178         json = ast_json_pack("{s: s, s: [], s: [], s: []}",
1179                 "name", app->name,
1180                 "channel_ids", "bridge_ids", "endpoint_ids");
1181         if (!json) {
1182                 return NULL;
1183         }
1184         channels = ast_json_object_get(json, "channel_ids");
1185         bridges = ast_json_object_get(json, "bridge_ids");
1186         endpoints = ast_json_object_get(json, "endpoint_ids");
1187
1188         i = ao2_iterator_init(app->forwards, 0);
1189         while ((forwards = ao2_iterator_next(&i))) {
1190                 struct ast_json *array = NULL;
1191                 int append_res;
1192
1193                 switch (forwards->forward_type) {
1194                 case FORWARD_CHANNEL:
1195                         array = channels;
1196                         break;
1197                 case FORWARD_BRIDGE:
1198                         array = bridges;
1199                         break;
1200                 case FORWARD_ENDPOINT:
1201                         array = endpoints;
1202                         break;
1203                 }
1204
1205                 /* If forward_type value is unexpected this will safely return an error. */
1206                 append_res = ast_json_array_append(array, ast_json_string_create(forwards->id));
1207                 ao2_ref(forwards, -1);
1208
1209                 if (append_res != 0) {
1210                         ast_log(LOG_ERROR, "Error building response\n");
1211                         ao2_iterator_destroy(&i);
1212                         ast_json_unref(json);
1213
1214                         return NULL;
1215                 }
1216         }
1217         ao2_iterator_destroy(&i);
1218
1219         return json;
1220 }
1221
1222 int app_subscribe_channel(struct stasis_app *app, struct ast_channel *chan)
1223 {
1224         struct app_forwards *forwards;
1225
1226         if (!app) {
1227                 return -1;
1228         }
1229
1230         ao2_lock(app->forwards);
1231         /* If subscribed to all, don't subscribe again */
1232         forwards = ao2_find(app->forwards, CHANNEL_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1233         if (forwards) {
1234                 ao2_unlock(app->forwards);
1235                 ao2_ref(forwards, -1);
1236
1237                 return 0;
1238         }
1239
1240         forwards = ao2_find(app->forwards,
1241                 chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL,
1242                 OBJ_SEARCH_KEY | OBJ_NOLOCK);
1243         if (!forwards) {
1244                 int res;
1245
1246                 /* Forwards not found, create one */
1247                 forwards = forwards_create_channel(app, chan);
1248                 if (!forwards) {
1249                         ao2_unlock(app->forwards);
1250
1251                         return -1;
1252                 }
1253
1254                 res = ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
1255                 if (!res) {
1256                         ao2_unlock(app->forwards);
1257                         ao2_ref(forwards, -1);
1258
1259                         return -1;
1260                 }
1261         }
1262
1263         ++forwards->interested;
1264         ast_debug(3, "Channel '%s' is %d interested in %s\n",
1265                 chan ? ast_channel_uniqueid(chan) : "ALL",
1266                 forwards->interested,
1267                 app->name);
1268
1269         ao2_unlock(app->forwards);
1270         ao2_ref(forwards, -1);
1271
1272         return 0;
1273 }
1274
1275 static int subscribe_channel(struct stasis_app *app, void *obj)
1276 {
1277         return app_subscribe_channel(app, obj);
1278 }
1279
1280 static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate)
1281 {
1282         struct app_forwards *forwards;
1283
1284         if (!id) {
1285                 if (!strcmp(kind, "bridge")) {
1286                         id = BRIDGE_ALL;
1287                 } else if (!strcmp(kind, "channel")) {
1288                         id = CHANNEL_ALL;
1289                 } else if (!strcmp(kind, "endpoint")) {
1290                         id = ENDPOINT_ALL;
1291                 } else {
1292                         ast_log(LOG_WARNING, "Unknown subscription kind '%s'\n", kind);
1293                         return -1;
1294                 }
1295         }
1296
1297         ao2_lock(app->forwards);
1298         forwards = ao2_find(app->forwards, id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1299         if (!forwards) {
1300                 ao2_unlock(app->forwards);
1301                 ast_debug(3, "App '%s' not subscribed to %s '%s'\n", app->name, kind, id);
1302                 return -1;
1303         }
1304         forwards->interested--;
1305
1306         ast_debug(3, "%s '%s': is %d interested in %s\n", kind, id, forwards->interested, app->name);
1307         if (forwards->interested == 0 || terminate) {
1308                 /* No one is interested any more; unsubscribe */
1309                 ast_debug(3, "%s '%s' unsubscribed from %s\n", kind, id, app->name);
1310                 forwards_unsubscribe(forwards);
1311                 ao2_find(app->forwards, forwards,
1312                         OBJ_POINTER | OBJ_NOLOCK | OBJ_UNLINK |
1313                         OBJ_NODATA);
1314
1315                 if (!strcmp(kind, "endpoint")) {
1316                         messaging_app_unsubscribe_endpoint(app->name, id);
1317                 }
1318         }
1319         ao2_unlock(app->forwards);
1320         ao2_ref(forwards, -1);
1321
1322         return 0;
1323 }
1324
1325 int app_unsubscribe_channel(struct stasis_app *app, struct ast_channel *chan)
1326 {
1327         if (!app) {
1328                 return -1;
1329         }
1330
1331         return app_unsubscribe_channel_id(app, chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL);
1332 }
1333
1334 int app_unsubscribe_channel_id(struct stasis_app *app, const char *channel_id)
1335 {
1336         if (!app) {
1337                 return -1;
1338         }
1339
1340         return unsubscribe(app, "channel", channel_id, 0);
1341 }
1342
1343 int app_is_subscribed_channel_id(struct stasis_app *app, const char *channel_id)
1344 {
1345         struct app_forwards *forwards;
1346
1347         if (ast_strlen_zero(channel_id)) {
1348                 channel_id = CHANNEL_ALL;
1349         }
1350         forwards = ao2_find(app->forwards, channel_id, OBJ_SEARCH_KEY);
1351         ao2_cleanup(forwards);
1352
1353         return forwards != NULL;
1354 }
1355
1356 static void *channel_find(const struct stasis_app *app, const char *id)
1357 {
1358         return ast_channel_get_by_name(id);
1359 }
1360
1361 struct stasis_app_event_source channel_event_source = {
1362         .scheme = "channel:",
1363         .find = channel_find,
1364         .subscribe = subscribe_channel,
1365         .unsubscribe = app_unsubscribe_channel_id,
1366         .is_subscribed = app_is_subscribed_channel_id
1367 };
1368
1369 int app_subscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
1370 {
1371         struct app_forwards *forwards;
1372
1373         if (!app) {
1374                 return -1;
1375         }
1376
1377         ao2_lock(app->forwards);
1378         /* If subscribed to all, don't subscribe again */
1379         forwards = ao2_find(app->forwards, BRIDGE_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1380         if (forwards) {
1381                 ao2_unlock(app->forwards);
1382                 ao2_ref(forwards, -1);
1383
1384                 return 0;
1385         }
1386
1387         forwards = ao2_find(app->forwards,
1388                 bridge ? bridge->uniqueid : BRIDGE_ALL,
1389                 OBJ_SEARCH_KEY | OBJ_NOLOCK);
1390         if (!forwards) {
1391                 int res;
1392
1393                 /* Forwards not found, create one */
1394                 forwards = forwards_create_bridge(app, bridge);
1395                 if (!forwards) {
1396                         ao2_unlock(app->forwards);
1397
1398                         return -1;
1399                 }
1400
1401                 res = ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
1402                 if (!res) {
1403                         ao2_unlock(app->forwards);
1404                         ao2_ref(forwards, -1);
1405
1406                         return -1;
1407                 }
1408         }
1409
1410         ++forwards->interested;
1411         ast_debug(3, "Bridge '%s' is %d interested in %s\n",
1412                 bridge ? bridge->uniqueid : "ALL",
1413                 forwards->interested,
1414                 app->name);
1415
1416         ao2_unlock(app->forwards);
1417         ao2_ref(forwards, -1);
1418
1419         return 0;
1420 }
1421
1422 static int subscribe_bridge(struct stasis_app *app, void *obj)
1423 {
1424         return app_subscribe_bridge(app, obj);
1425 }
1426
1427 int app_unsubscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
1428 {
1429         if (!app) {
1430                 return -1;
1431         }
1432
1433         return app_unsubscribe_bridge_id(app, bridge ? bridge->uniqueid : BRIDGE_ALL);
1434 }
1435
1436 int app_unsubscribe_bridge_id(struct stasis_app *app, const char *bridge_id)
1437 {
1438         if (!app) {
1439                 return -1;
1440         }
1441
1442         return unsubscribe(app, "bridge", bridge_id, 0);
1443 }
1444
1445 int app_is_subscribed_bridge_id(struct stasis_app *app, const char *bridge_id)
1446 {
1447         struct app_forwards *forwards;
1448
1449         if (ast_strlen_zero(bridge_id)) {
1450                 bridge_id = BRIDGE_ALL;
1451         }
1452
1453         forwards = ao2_find(app->forwards, bridge_id, OBJ_SEARCH_KEY);
1454         ao2_cleanup(forwards);
1455
1456         return forwards != NULL;
1457 }
1458
1459 static void *bridge_find(const struct stasis_app *app, const char *id)
1460 {
1461         return stasis_app_bridge_find_by_id(id);
1462 }
1463
1464 struct stasis_app_event_source bridge_event_source = {
1465         .scheme = "bridge:",
1466         .find = bridge_find,
1467         .subscribe = subscribe_bridge,
1468         .unsubscribe = app_unsubscribe_bridge_id,
1469         .is_subscribed = app_is_subscribed_bridge_id
1470 };
1471
1472 int app_subscribe_endpoint(struct stasis_app *app, struct ast_endpoint *endpoint)
1473 {
1474         struct app_forwards *forwards;
1475
1476         if (!app) {
1477                 return -1;
1478         }
1479
1480         ao2_lock(app->forwards);
1481         /* If subscribed to all, don't subscribe again */
1482         forwards = ao2_find(app->forwards, ENDPOINT_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1483         if (forwards) {
1484                 ao2_unlock(app->forwards);
1485                 ao2_ref(forwards, -1);
1486
1487                 return 0;
1488         }
1489
1490         forwards = ao2_find(app->forwards,
1491                 endpoint ? ast_endpoint_get_id(endpoint) : ENDPOINT_ALL,
1492                 OBJ_SEARCH_KEY | OBJ_NOLOCK);
1493         if (!forwards) {
1494                 int res;
1495
1496                 /* Forwards not found, create one */
1497                 forwards = forwards_create_endpoint(app, endpoint);
1498                 if (!forwards) {
1499                         ao2_unlock(app->forwards);
1500
1501                         return -1;
1502                 }
1503
1504                 res = ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
1505                 if (!res) {
1506                         ao2_unlock(app->forwards);
1507                         ao2_ref(forwards, -1);
1508
1509                         return -1;
1510                 }
1511
1512                 /* Subscribe for messages */
1513                 messaging_app_subscribe_endpoint(app->name, endpoint, &message_received_handler, app);
1514         }
1515
1516         ++forwards->interested;
1517         ast_debug(3, "Endpoint '%s' is %d interested in %s\n",
1518                 endpoint ? ast_endpoint_get_id(endpoint) : "ALL",
1519                 forwards->interested,
1520                 app->name);
1521
1522         ao2_unlock(app->forwards);
1523         ao2_ref(forwards, -1);
1524
1525         return 0;
1526 }
1527
1528 static int subscribe_endpoint(struct stasis_app *app, void *obj)
1529 {
1530         return app_subscribe_endpoint(app, obj);
1531 }
1532
1533 int app_unsubscribe_endpoint_id(struct stasis_app *app, const char *endpoint_id)
1534 {
1535         if (!app) {
1536                 return -1;
1537         }
1538
1539         return unsubscribe(app, "endpoint", endpoint_id, 0);
1540 }
1541
1542 int app_is_subscribed_endpoint_id(struct stasis_app *app, const char *endpoint_id)
1543 {
1544         struct app_forwards *forwards;
1545
1546         if (ast_strlen_zero(endpoint_id)) {
1547                 endpoint_id = ENDPOINT_ALL;
1548         }
1549         forwards = ao2_find(app->forwards, endpoint_id, OBJ_SEARCH_KEY);
1550         ao2_cleanup(forwards);
1551
1552         return forwards != NULL;
1553 }
1554
1555 static void *endpoint_find(const struct stasis_app *app, const char *id)
1556 {
1557         return ast_endpoint_find_by_id(id);
1558 }
1559
1560 struct stasis_app_event_source endpoint_event_source = {
1561         .scheme = "endpoint:",
1562         .find = endpoint_find,
1563         .subscribe = subscribe_endpoint,
1564         .unsubscribe = app_unsubscribe_endpoint_id,
1565         .is_subscribed = app_is_subscribed_endpoint_id
1566 };
1567
1568 void stasis_app_register_event_sources(void)
1569 {
1570         stasis_app_register_event_source(&channel_event_source);
1571         stasis_app_register_event_source(&bridge_event_source);
1572         stasis_app_register_event_source(&endpoint_event_source);
1573 }
1574
1575 void stasis_app_unregister_event_sources(void)
1576 {
1577         stasis_app_unregister_event_source(&endpoint_event_source);
1578         stasis_app_unregister_event_source(&bridge_event_source);
1579         stasis_app_unregister_event_source(&channel_event_source);
1580 }