9317e087dab9744e02954d51da876c96df0f2d14
[asterisk/asterisk.git] / res / stasis / app.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis application support.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 #include "asterisk.h"
27
28 #include "app.h"
29 #include "control.h"
30 #include "messaging.h"
31
32 #include "asterisk/callerid.h"
33 #include "asterisk/cli.h"
34 #include "asterisk/stasis_app.h"
35 #include "asterisk/stasis_bridges.h"
36 #include "asterisk/stasis_channels.h"
37 #include "asterisk/stasis_endpoints.h"
38 #include "asterisk/stasis_message_router.h"
39
40 #define BRIDGE_ALL "__AST_BRIDGE_ALL_TOPIC"
41 #define CHANNEL_ALL "__AST_CHANNEL_ALL_TOPIC"
42 #define ENDPOINT_ALL "__AST_ENDPOINT_ALL_TOPIC"
43
44 /*! Global debug flag.  No need for locking */
45 int global_debug;
46
47 static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate);
48
49 struct stasis_app {
50         /*! Aggregation topic for this application. */
51         struct stasis_topic *topic;
52         /*! Router for handling messages forwarded to \a topic. */
53         struct stasis_message_router *router;
54         /*! Router for handling messages to the bridge all \a topic. */
55         struct stasis_message_router *bridge_router;
56         /*! Optional router for handling endpoint messages in 'all' subscriptions */
57         struct stasis_message_router *endpoint_router;
58         /*! Container of the channel forwards to this app's topic. */
59         struct ao2_container *forwards;
60         /*! Callback function for this application. */
61         stasis_app_cb handler;
62         /*! Opaque data to hand to callback function. */
63         void *data;
64         /*! Subscription model for the application */
65         enum stasis_app_subscription_model subscription_model;
66         /*! Whether or not someone wants to see debug messages about this app */
67         int debug;
68         /*! Name of the Stasis application */
69         char name[];
70 };
71
72 enum forward_type {
73         FORWARD_CHANNEL,
74         FORWARD_BRIDGE,
75         FORWARD_ENDPOINT,
76 };
77
78 /*! Subscription info for a particular channel/bridge. */
79 struct app_forwards {
80         /*! Count of number of times this channel/bridge has been subscribed */
81         int interested;
82
83         /*! Forward for the regular topic */
84         struct stasis_forward *topic_forward;
85         /*! Forward for the caching topic */
86         struct stasis_forward *topic_cached_forward;
87
88         /* Type of object being forwarded */
89         enum forward_type forward_type;
90         /*! Unique id of the object being forwarded */
91         char id[];
92 };
93
94 static void forwards_dtor(void *obj)
95 {
96 #ifdef AST_DEVMODE
97         struct app_forwards *forwards = obj;
98 #endif /* AST_DEVMODE */
99
100         ast_assert(forwards->topic_forward == NULL);
101         ast_assert(forwards->topic_cached_forward == NULL);
102 }
103
104 static void forwards_unsubscribe(struct app_forwards *forwards)
105 {
106         stasis_forward_cancel(forwards->topic_forward);
107         forwards->topic_forward = NULL;
108         stasis_forward_cancel(forwards->topic_cached_forward);
109         forwards->topic_cached_forward = NULL;
110 }
111
112 static struct app_forwards *forwards_create(struct stasis_app *app,
113         const char *id)
114 {
115         struct app_forwards *forwards;
116
117         if (!app || ast_strlen_zero(id)) {
118                 return NULL;
119         }
120
121         forwards = ao2_t_alloc(sizeof(*forwards) + strlen(id) + 1, forwards_dtor, id);
122         if (!forwards) {
123                 return NULL;
124         }
125
126         strcpy(forwards->id, id); /* SAFE */
127
128         return forwards;
129 }
130
131 /*! Forward a channel's topics to an app */
132 static struct app_forwards *forwards_create_channel(struct stasis_app *app,
133         struct ast_channel *chan)
134 {
135         struct app_forwards *forwards;
136
137         if (!app) {
138                 return NULL;
139         }
140
141         forwards = forwards_create(app, chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL);
142         if (!forwards) {
143                 return NULL;
144         }
145
146         forwards->forward_type = FORWARD_CHANNEL;
147         forwards->topic_forward = stasis_forward_all(
148                 chan ? ast_channel_topic(chan) : ast_channel_topic_all(),
149                 app->topic);
150
151         if (!forwards->topic_forward) {
152                 ao2_ref(forwards, -1);
153                 return NULL;
154         }
155
156         return forwards;
157 }
158
159 /*! Forward a bridge's topics to an app */
160 static struct app_forwards *forwards_create_bridge(struct stasis_app *app,
161         struct ast_bridge *bridge)
162 {
163         struct app_forwards *forwards;
164
165         if (!app) {
166                 return NULL;
167         }
168
169         forwards = forwards_create(app, bridge ? bridge->uniqueid : BRIDGE_ALL);
170         if (!forwards) {
171                 return NULL;
172         }
173
174         forwards->forward_type = FORWARD_BRIDGE;
175         forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge), app->topic);
176
177         if (!forwards->topic_forward && bridge) {
178                 forwards_unsubscribe(forwards);
179                 ao2_ref(forwards, -1);
180                 return NULL;
181         }
182
183         return forwards;
184 }
185
186 static void endpoint_state_cb(void *data, struct stasis_subscription *sub,
187         struct stasis_message *message)
188 {
189         struct stasis_app *app = data;
190
191         stasis_publish(app->topic, message);
192 }
193
194 /*! Forward a endpoint's topics to an app */
195 static struct app_forwards *forwards_create_endpoint(struct stasis_app *app,
196         struct ast_endpoint *endpoint)
197 {
198         struct app_forwards *forwards;
199         int ret = 0;
200
201         if (!app) {
202                 return NULL;
203         }
204
205         forwards = forwards_create(app, endpoint ? ast_endpoint_get_id(endpoint) : ENDPOINT_ALL);
206         if (!forwards) {
207                 return NULL;
208         }
209
210         forwards->forward_type = FORWARD_ENDPOINT;
211         if (endpoint) {
212                 forwards->topic_forward = stasis_forward_all(ast_endpoint_topic(endpoint),
213                         app->topic);
214                 forwards->topic_cached_forward = stasis_forward_all(
215                         ast_endpoint_topic_cached(endpoint), app->topic);
216
217                 if (!forwards->topic_forward || !forwards->topic_cached_forward) {
218                         /* Half-subscribed is a bad thing */
219                         forwards_unsubscribe(forwards);
220                         ao2_ref(forwards, -1);
221                         return NULL;
222                 }
223         } else {
224                 /* Since endpoint subscriptions also subscribe to channels, in the case
225                  * of all endpoint subscriptions, we only want messages for the endpoints.
226                  * As such, we route those particular messages and then re-publish them
227                  * on the app's topic.
228                  */
229                 ast_assert(app->endpoint_router == NULL);
230                 app->endpoint_router = stasis_message_router_create(ast_endpoint_topic_all_cached());
231                 if (!app->endpoint_router) {
232                         forwards_unsubscribe(forwards);
233                         ao2_ref(forwards, -1);
234                         return NULL;
235                 }
236
237                 ret |= stasis_message_router_add(app->endpoint_router,
238                         ast_endpoint_state_type(), endpoint_state_cb, app);
239                 ret |= stasis_message_router_add(app->endpoint_router,
240                         ast_endpoint_contact_state_type(), endpoint_state_cb, app);
241
242                 if (ret) {
243                         ao2_ref(app->endpoint_router, -1);
244                         app->endpoint_router = NULL;
245                         ao2_ref(forwards, -1);
246                         return NULL;
247                 }
248         }
249
250         return forwards;
251 }
252
253 static int forwards_sort(const void *obj_left, const void *obj_right, int flags)
254 {
255         const struct app_forwards *object_left = obj_left;
256         const struct app_forwards *object_right = obj_right;
257         const char *right_key = obj_right;
258         int cmp;
259
260         switch (flags & (OBJ_POINTER | OBJ_KEY | OBJ_PARTIAL_KEY)) {
261         case OBJ_POINTER:
262                 right_key = object_right->id;
263                 /* Fall through */
264         case OBJ_KEY:
265                 cmp = strcmp(object_left->id, right_key);
266                 break;
267         case OBJ_PARTIAL_KEY:
268                 /*
269                  * We could also use a partial key struct containing a length
270                  * so strlen() does not get called for every comparison instead.
271                  */
272                 cmp = strncmp(object_left->id, right_key, strlen(right_key));
273                 break;
274         default:
275                 /* Sort can only work on something with a full or partial key. */
276                 ast_assert(0);
277                 cmp = 0;
278                 break;
279         }
280         return cmp;
281 }
282
283 static void app_dtor(void *obj)
284 {
285         struct stasis_app *app = obj;
286         size_t size = strlen("stasis-") + strlen(app->name) + 1;
287         char context_name[size];
288
289         ast_verb(1, "Destroying Stasis app %s\n", app->name);
290
291         ast_assert(app->router == NULL);
292         ast_assert(app->bridge_router == NULL);
293         ast_assert(app->endpoint_router == NULL);
294
295         /* If we created a context for this application, remove it */
296         strcpy(context_name, "stasis-");
297         strcat(context_name, app->name);
298         ast_context_destroy_by_name(context_name, "res_stasis");
299
300         ao2_cleanup(app->topic);
301         app->topic = NULL;
302         ao2_cleanup(app->forwards);
303         app->forwards = NULL;
304         ao2_cleanup(app->data);
305         app->data = NULL;
306 }
307
308 static void call_forwarded_handler(struct stasis_app *app, struct stasis_message *message)
309 {
310         struct ast_multi_channel_blob *payload = stasis_message_data(message);
311         struct ast_channel_snapshot *snapshot = ast_multi_channel_blob_get_channel(payload, "forwarded");
312         struct ast_channel *chan;
313
314         if (!snapshot) {
315                 return;
316         }
317
318         chan = ast_channel_get_by_name(snapshot->base->uniqueid);
319         if (!chan) {
320                 return;
321         }
322
323         app_subscribe_channel(app, chan);
324         ast_channel_unref(chan);
325 }
326
327 static void sub_subscription_change_handler(void *data, struct stasis_subscription *sub,
328         struct stasis_message *message)
329 {
330         struct stasis_app *app = data;
331
332         if (stasis_subscription_final_message(sub, message)) {
333                 ao2_cleanup(app);
334         }
335 }
336
337 static void sub_default_handler(void *data, struct stasis_subscription *sub,
338         struct stasis_message *message)
339 {
340         struct stasis_app *app = data;
341         struct ast_json *json;
342
343         /* The dial type can be converted to JSON so it will always be passed
344          * here.
345          */
346         if (stasis_message_type(message) == ast_channel_dial_type()) {
347                 call_forwarded_handler(app, message);
348         }
349
350         /* By default, send any message that has a JSON representation */
351         json = stasis_message_to_json(message, stasis_app_get_sanitizer());
352         if (!json) {
353                 return;
354         }
355
356         app_send(app, json);
357         ast_json_unref(json);
358 }
359
360 /*! \brief Typedef for callbacks that get called on channel snapshot updates */
361 typedef struct ast_json *(*channel_snapshot_monitor)(
362         struct ast_channel_snapshot *old_snapshot,
363         struct ast_channel_snapshot *new_snapshot,
364         const struct timeval *tv);
365
366 static struct ast_json *simple_channel_event(
367         const char *type,
368         struct ast_channel_snapshot *snapshot,
369         const struct timeval *tv)
370 {
371         struct ast_json *json_channel = ast_channel_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
372
373         if (!json_channel) {
374                 return NULL;
375         }
376
377         return ast_json_pack("{s: s, s: o, s: o}",
378                 "type", type,
379                 "timestamp", ast_json_timeval(*tv, NULL),
380                 "channel", json_channel);
381 }
382
383 static struct ast_json *channel_created_event(
384         struct ast_channel_snapshot *snapshot,
385         const struct timeval *tv)
386 {
387         return simple_channel_event("ChannelCreated", snapshot, tv);
388 }
389
390 static struct ast_json *channel_destroyed_event(
391         struct ast_channel_snapshot *snapshot,
392         const struct timeval *tv)
393 {
394         struct ast_json *json_channel = ast_channel_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
395
396         if (!json_channel) {
397                 return NULL;
398         }
399
400         return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
401                 "type", "ChannelDestroyed",
402                 "timestamp", ast_json_timeval(*tv, NULL),
403                 "cause", snapshot->hangup->cause,
404                 "cause_txt", ast_cause2str(snapshot->hangup->cause),
405                 "channel", json_channel);
406 }
407
408 static struct ast_json *channel_state_change_event(
409         struct ast_channel_snapshot *snapshot,
410         const struct timeval *tv)
411 {
412         return simple_channel_event("ChannelStateChange", snapshot, tv);
413 }
414
415 /*! \brief Handle channel state changes */
416 static struct ast_json *channel_state(
417         struct ast_channel_snapshot *old_snapshot,
418         struct ast_channel_snapshot *new_snapshot,
419         const struct timeval *tv)
420 {
421         struct ast_channel_snapshot *snapshot = new_snapshot ?
422                 new_snapshot : old_snapshot;
423
424         if (!old_snapshot) {
425                 return channel_created_event(snapshot, tv);
426         } else if (ast_test_flag(&new_snapshot->flags, AST_FLAG_DEAD)) {
427                 return channel_destroyed_event(snapshot, tv);
428         } else if (old_snapshot->state != new_snapshot->state) {
429                 return channel_state_change_event(snapshot, tv);
430         }
431
432         return NULL;
433 }
434
435 static struct ast_json *channel_dialplan(
436         struct ast_channel_snapshot *old_snapshot,
437         struct ast_channel_snapshot *new_snapshot,
438         const struct timeval *tv)
439 {
440         struct ast_json *json_channel;
441
442         /* No Newexten event on first channel snapshot */
443         if (!old_snapshot) {
444                 return NULL;
445         }
446
447         /* Empty application is not valid for a Newexten event */
448         if (ast_strlen_zero(new_snapshot->dialplan->appl)) {
449                 return NULL;
450         }
451
452         if (ast_channel_snapshot_cep_equal(old_snapshot, new_snapshot)) {
453                 return NULL;
454         }
455
456         json_channel = ast_channel_snapshot_to_json(new_snapshot, stasis_app_get_sanitizer());
457         if (!json_channel) {
458                 return NULL;
459         }
460
461         return ast_json_pack("{s: s, s: o, s: s, s: s, s: o}",
462                 "type", "ChannelDialplan",
463                 "timestamp", ast_json_timeval(*tv, NULL),
464                 "dialplan_app", new_snapshot->dialplan->appl,
465                 "dialplan_app_data", AST_JSON_UTF8_VALIDATE(new_snapshot->dialplan->data),
466                 "channel", json_channel);
467 }
468
469 static struct ast_json *channel_callerid(
470         struct ast_channel_snapshot *old_snapshot,
471         struct ast_channel_snapshot *new_snapshot,
472         const struct timeval *tv)
473 {
474         struct ast_json *json_channel;
475
476         /* No NewCallerid event on first channel snapshot */
477         if (!old_snapshot) {
478                 return NULL;
479         }
480
481         if (ast_channel_snapshot_caller_id_equal(old_snapshot, new_snapshot)) {
482                 return NULL;
483         }
484
485         json_channel = ast_channel_snapshot_to_json(new_snapshot, stasis_app_get_sanitizer());
486         if (!json_channel) {
487                 return NULL;
488         }
489
490         return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
491                 "type", "ChannelCallerId",
492                 "timestamp", ast_json_timeval(*tv, NULL),
493                 "caller_presentation", new_snapshot->caller->pres,
494                 "caller_presentation_txt", ast_describe_caller_presentation(
495                         new_snapshot->caller->pres),
496                 "channel", json_channel);
497 }
498
499 static struct ast_json *channel_connected_line(
500         struct ast_channel_snapshot *old_snapshot,
501         struct ast_channel_snapshot *new_snapshot,
502         const struct timeval *tv)
503 {
504         struct ast_json *json_channel;
505
506         /* No ChannelConnectedLine event on first channel snapshot */
507         if (!old_snapshot) {
508                 return NULL;
509         }
510
511         if (ast_channel_snapshot_connected_line_equal(old_snapshot, new_snapshot)) {
512                 return NULL;
513         }
514
515         json_channel = ast_channel_snapshot_to_json(new_snapshot, stasis_app_get_sanitizer());
516         if (!json_channel) {
517                 return NULL;
518         }
519
520         return ast_json_pack("{s: s, s: o, s: o}",
521                 "type", "ChannelConnectedLine",
522                 "timestamp", ast_json_timeval(*tv, NULL),
523                 "channel", json_channel);
524 }
525
526 static channel_snapshot_monitor channel_monitors[] = {
527         channel_state,
528         channel_dialplan,
529         channel_callerid,
530         channel_connected_line,
531 };
532
533 static void sub_channel_update_handler(void *data,
534         struct stasis_subscription *sub,
535         struct stasis_message *message)
536 {
537         struct stasis_app *app = data;
538         struct ast_channel_snapshot_update *update = stasis_message_data(message);
539         int i;
540
541         for (i = 0; i < ARRAY_LEN(channel_monitors); ++i) {
542                 struct ast_json *msg;
543
544                 msg = channel_monitors[i](update->old_snapshot, update->new_snapshot,
545                         stasis_message_timestamp(message));
546                 if (msg) {
547                         app_send(app, msg);
548                         ast_json_unref(msg);
549                 }
550         }
551
552         if (ast_test_flag(&update->new_snapshot->flags, AST_FLAG_DEAD)) {
553                 unsubscribe(app, "channel", update->new_snapshot->base->uniqueid, 1);
554         }
555 }
556
557 static struct ast_json *simple_endpoint_event(
558         const char *type,
559         struct ast_endpoint_snapshot *snapshot,
560         const struct timeval *tv)
561 {
562         struct ast_json *json_endpoint = ast_endpoint_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
563
564         if (!json_endpoint) {
565                 return NULL;
566         }
567
568         return ast_json_pack("{s: s, s: o, s: o}",
569                 "type", type,
570                 "timestamp", ast_json_timeval(*tv, NULL),
571                 "endpoint", json_endpoint);
572 }
573
574 static int message_received_handler(const char *endpoint_id, struct ast_json *json_msg, void *pvt)
575 {
576         struct ast_endpoint_snapshot *snapshot;
577         struct ast_json *json_endpoint;
578         struct ast_json *message;
579         struct stasis_app *app = pvt;
580         char *tech;
581         char *resource;
582
583         tech = ast_strdupa(endpoint_id);
584         resource = strchr(tech, '/');
585         if (resource) {
586                 resource[0] = '\0';
587                 resource++;
588         }
589
590         if (ast_strlen_zero(tech) || ast_strlen_zero(resource)) {
591                 return -1;
592         }
593
594         snapshot = ast_endpoint_latest_snapshot(tech, resource);
595         if (!snapshot) {
596                 return -1;
597         }
598
599         json_endpoint = ast_endpoint_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
600         ao2_ref(snapshot, -1);
601         if (!json_endpoint) {
602                 return -1;
603         }
604
605         message = ast_json_pack("{s: s, s: o, s: o, s: o}",
606                 "type", "TextMessageReceived",
607                 "timestamp", ast_json_timeval(ast_tvnow(), NULL),
608                 "endpoint", json_endpoint,
609                 "message", ast_json_ref(json_msg));
610         if (message) {
611                 app_send(app, message);
612                 ast_json_unref(message);
613         }
614
615         return 0;
616 }
617
618 static void sub_endpoint_update_handler(void *data,
619         struct stasis_subscription *sub,
620         struct stasis_message *message)
621 {
622         struct stasis_app *app = data;
623         struct stasis_cache_update *update;
624         struct ast_endpoint_snapshot *new_snapshot;
625         struct ast_endpoint_snapshot *old_snapshot;
626         const struct timeval *tv;
627
628         ast_assert(stasis_message_type(message) == stasis_cache_update_type());
629
630         update = stasis_message_data(message);
631
632         ast_assert(update->type == ast_endpoint_snapshot_type());
633
634         new_snapshot = stasis_message_data(update->new_snapshot);
635         old_snapshot = stasis_message_data(update->old_snapshot);
636
637         if (new_snapshot) {
638                 struct ast_json *json;
639
640                 tv = stasis_message_timestamp(update->new_snapshot);
641
642                 json = simple_endpoint_event("EndpointStateChange", new_snapshot, tv);
643                 if (!json) {
644                         return;
645                 }
646
647                 app_send(app, json);
648                 ast_json_unref(json);
649         }
650
651         if (!new_snapshot && old_snapshot) {
652                 unsubscribe(app, "endpoint", old_snapshot->id, 1);
653         }
654 }
655
656 static struct ast_json *simple_bridge_event(
657         const char *type,
658         struct ast_bridge_snapshot *snapshot,
659         const struct timeval *tv)
660 {
661         struct ast_json *json_bridge = ast_bridge_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
662         if (!json_bridge) {
663                 return NULL;
664         }
665
666         return ast_json_pack("{s: s, s: o, s: o}",
667                 "type", type,
668                 "timestamp", ast_json_timeval(*tv, NULL),
669                 "bridge", json_bridge);
670 }
671
672 static void sub_bridge_update_handler(void *data,
673         struct stasis_subscription *sub,
674         struct stasis_message *message)
675 {
676         struct ast_json *json = NULL;
677         struct stasis_app *app = data;
678         struct ast_bridge_snapshot_update *update;
679         const struct timeval *tv;
680
681         update = stasis_message_data(message);
682
683         tv = stasis_message_timestamp(message);
684
685         if (!update->new_snapshot) {
686                 json = simple_bridge_event("BridgeDestroyed", update->old_snapshot, tv);
687         } else if (!update->old_snapshot) {
688                 json = simple_bridge_event("BridgeCreated", update->new_snapshot, tv);
689         } else if (update->new_snapshot && update->old_snapshot
690                 && strcmp(update->new_snapshot->video_source_id, update->old_snapshot->video_source_id)) {
691                 json = simple_bridge_event("BridgeVideoSourceChanged", update->new_snapshot, tv);
692                 if (json && !ast_strlen_zero(update->old_snapshot->video_source_id)) {
693                         ast_json_object_set(json, "old_video_source_id",
694                                 ast_json_string_create(update->old_snapshot->video_source_id));
695                 }
696         }
697
698         if (json) {
699                 app_send(app, json);
700                 ast_json_unref(json);
701         }
702
703         if (!update->new_snapshot && update->old_snapshot) {
704                 unsubscribe(app, "bridge", update->old_snapshot->uniqueid, 1);
705         }
706 }
707
708
709 /*! \brief Helper function for determining if the application is subscribed to a given entity */
710 static int bridge_app_subscribed(struct stasis_app *app, const char *uniqueid)
711 {
712         struct app_forwards *forwards = NULL;
713
714         forwards = ao2_find(app->forwards, uniqueid, OBJ_SEARCH_KEY);
715         if (!forwards) {
716                 return 0;
717         }
718
719         ao2_ref(forwards, -1);
720         return 1;
721 }
722
723 static void bridge_merge_handler(void *data, struct stasis_subscription *sub,
724         struct stasis_message *message)
725 {
726         struct stasis_app *app = data;
727         struct ast_bridge_merge_message *merge;
728
729         merge = stasis_message_data(message);
730
731         /* Find out if we're subscribed to either bridge */
732         if (bridge_app_subscribed(app, merge->from->uniqueid) ||
733                 bridge_app_subscribed(app, merge->to->uniqueid)) {
734                 /* Forward the message to the app */
735                 stasis_publish(app->topic, message);
736         }
737 }
738
739 /*! \brief Callback function for checking if channels in a bridge are subscribed to */
740 static int bridge_app_subscribed_involved(struct stasis_app *app, struct ast_bridge_snapshot *snapshot)
741 {
742         int subscribed = 0;
743         struct ao2_iterator iter;
744         char *uniqueid;
745
746         if (bridge_app_subscribed(app, snapshot->uniqueid)) {
747                 return 1;
748         }
749
750         iter = ao2_iterator_init(snapshot->channels, 0);
751         for (; (uniqueid = ao2_iterator_next(&iter)); ao2_ref(uniqueid, -1)) {
752                 if (bridge_app_subscribed(app, uniqueid)) {
753                         subscribed = 1;
754                         ao2_ref(uniqueid, -1);
755                         break;
756                 }
757         }
758         ao2_iterator_destroy(&iter);
759
760         return subscribed;
761 }
762
763 static void bridge_blind_transfer_handler(void *data, struct stasis_subscription *sub,
764         struct stasis_message *message)
765 {
766         struct stasis_app *app = data;
767         struct ast_blind_transfer_message *transfer_msg = stasis_message_data(message);
768         struct ast_bridge_snapshot *bridge = transfer_msg->bridge;
769
770         if (bridge_app_subscribed(app, transfer_msg->transferer->base->uniqueid) ||
771                 (bridge && bridge_app_subscribed_involved(app, bridge))) {
772                 stasis_publish(app->topic, message);
773         }
774 }
775
776 static void bridge_attended_transfer_handler(void *data, struct stasis_subscription *sub,
777         struct stasis_message *message)
778 {
779         struct stasis_app *app = data;
780         struct ast_attended_transfer_message *transfer_msg = stasis_message_data(message);
781         int subscribed = 0;
782
783         subscribed = bridge_app_subscribed(app, transfer_msg->to_transferee.channel_snapshot->base->uniqueid);
784         if (!subscribed) {
785                 subscribed = bridge_app_subscribed(app, transfer_msg->to_transfer_target.channel_snapshot->base->uniqueid);
786         }
787         if (!subscribed && transfer_msg->to_transferee.bridge_snapshot) {
788                 subscribed = bridge_app_subscribed_involved(app, transfer_msg->to_transferee.bridge_snapshot);
789         }
790         if (!subscribed && transfer_msg->to_transfer_target.bridge_snapshot) {
791                 subscribed = bridge_app_subscribed_involved(app, transfer_msg->to_transfer_target.bridge_snapshot);
792         }
793
794         if (!subscribed) {
795                 switch (transfer_msg->dest_type) {
796                 case AST_ATTENDED_TRANSFER_DEST_BRIDGE_MERGE:
797                         subscribed = bridge_app_subscribed(app, transfer_msg->dest.bridge);
798                         break;
799                 case AST_ATTENDED_TRANSFER_DEST_LINK:
800                         subscribed = bridge_app_subscribed(app, transfer_msg->dest.links[0]->base->uniqueid);
801                         if (!subscribed) {
802                                 subscribed = bridge_app_subscribed(app, transfer_msg->dest.links[1]->base->uniqueid);
803                         }
804                         break;
805                 break;
806                 case AST_ATTENDED_TRANSFER_DEST_THREEWAY:
807                         subscribed = bridge_app_subscribed_involved(app, transfer_msg->dest.threeway.bridge_snapshot);
808                         if (!subscribed) {
809                                 subscribed = bridge_app_subscribed(app, transfer_msg->dest.threeway.channel_snapshot->base->uniqueid);
810                         }
811                         break;
812                 default:
813                         break;
814                 }
815         }
816
817         if (subscribed) {
818                 stasis_publish(app->topic, message);
819         }
820 }
821
822 static void bridge_subscription_change_handler(void *data, struct stasis_subscription *sub,
823         struct stasis_message *message)
824 {
825         struct stasis_app *app = data;
826
827         if (stasis_subscription_final_message(sub, message)) {
828                 ao2_cleanup(app);
829         }
830 }
831
832 void stasis_app_set_debug(struct stasis_app *app, int debug)
833 {
834         if (!app) {
835                 return;
836         }
837
838         app->debug = debug;
839 }
840
841 void stasis_app_set_debug_by_name(const char *app_name, int debug)
842 {
843         struct stasis_app *app = stasis_app_get_by_name(app_name);
844
845         if (!app) {
846                 return;
847         }
848
849         app->debug = debug;
850         ao2_cleanup(app);
851 }
852
853 int stasis_app_get_debug(struct stasis_app *app)
854 {
855         return (app ? app->debug : 0) || global_debug;
856 }
857
858 int stasis_app_get_debug_by_name(const char *app_name)
859 {
860         int debug_enabled = 0;
861
862         if (global_debug) {
863                 debug_enabled = 1;
864         } else {
865                 struct stasis_app *app = stasis_app_get_by_name(app_name);
866
867                 if (app) {
868                         if (app->debug) {
869                                 debug_enabled = 1;
870                         }
871                         ao2_ref(app, -1);
872                 }
873         }
874         return debug_enabled;
875 }
876
877 void stasis_app_set_global_debug(int debug)
878 {
879         global_debug = debug;
880         if (!global_debug) {
881                 struct ao2_container *app_names = stasis_app_get_all();
882                 struct ao2_iterator it_app_names;
883                 char *app_name;
884                 struct stasis_app *app;
885
886                 if (!app_names || !ao2_container_count(app_names)) {
887                         ao2_cleanup(app_names);
888                         return;
889                 }
890
891                 it_app_names = ao2_iterator_init(app_names, 0);
892                 while ((app_name = ao2_iterator_next(&it_app_names))) {
893                         if ((app = stasis_app_get_by_name(app_name))) {
894                                 stasis_app_set_debug(app, 0);
895                         }
896
897                         ao2_cleanup(app_name);
898                         ao2_cleanup(app);
899                 }
900                 ao2_iterator_cleanup(&it_app_names);
901                 ao2_cleanup(app_names);
902         }
903 }
904
905 struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *data, enum stasis_app_subscription_model subscription_model)
906 {
907         RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
908         size_t size;
909         int res = 0;
910         size_t context_size = strlen("stasis-") + strlen(name) + 1;
911         char context_name[context_size];
912
913         ast_assert(name != NULL);
914         ast_assert(handler != NULL);
915
916         ast_verb(1, "Creating Stasis app '%s'\n", name);
917
918         size = sizeof(*app) + strlen(name) + 1;
919         app = ao2_alloc_options(size, app_dtor, AO2_ALLOC_OPT_LOCK_MUTEX);
920         if (!app) {
921                 return NULL;
922         }
923         app->subscription_model = subscription_model;
924
925         app->forwards = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_MUTEX,
926                 AO2_CONTAINER_ALLOC_OPT_DUPS_OBJ_REJECT,
927                 forwards_sort, NULL);
928         if (!app->forwards) {
929                 return NULL;
930         }
931
932         app->topic = stasis_topic_create(name);
933         if (!app->topic) {
934                 return NULL;
935         }
936
937         app->bridge_router = stasis_message_router_create(ast_bridge_topic_all());
938         if (!app->bridge_router) {
939                 return NULL;
940         }
941
942         res |= stasis_message_router_add(app->bridge_router,
943                 ast_bridge_merge_message_type(), bridge_merge_handler, app);
944
945         res |= stasis_message_router_add(app->bridge_router,
946                 ast_blind_transfer_type(), bridge_blind_transfer_handler, app);
947
948         res |= stasis_message_router_add(app->bridge_router,
949                 ast_attended_transfer_type(), bridge_attended_transfer_handler, app);
950
951         res |= stasis_message_router_add(app->bridge_router,
952                 stasis_subscription_change_type(), bridge_subscription_change_handler, app);
953
954         if (res != 0) {
955                 return NULL;
956         }
957         /* Bridge router holds a reference */
958         ao2_ref(app, +1);
959
960         app->router = stasis_message_router_create(app->topic);
961         if (!app->router) {
962                 return NULL;
963         }
964
965         res |= stasis_message_router_add(app->router,
966                 ast_bridge_snapshot_type(), sub_bridge_update_handler, app);
967
968         res |= stasis_message_router_add(app->router,
969                 ast_channel_snapshot_type(), sub_channel_update_handler, app);
970
971         res |= stasis_message_router_add_cache_update(app->router,
972                 ast_endpoint_snapshot_type(), sub_endpoint_update_handler, app);
973
974         res |= stasis_message_router_add(app->router,
975                 stasis_subscription_change_type(), sub_subscription_change_handler, app);
976
977         stasis_message_router_set_formatters_default(app->router,
978                 sub_default_handler, app, STASIS_SUBSCRIPTION_FORMATTER_JSON);
979
980         if (res != 0) {
981                 return NULL;
982         }
983         /* Router holds a reference */
984         ao2_ref(app, +1);
985
986         strncpy(app->name, name, size - sizeof(*app));
987         app->handler = handler;
988         app->data = ao2_bump(data);
989
990         /* Create a context, a match-all extension, and a 'h' extension for this application. Note that
991          * this should only be done if a context does not already exist. */
992         strcpy(context_name, "stasis-");
993         strcat(context_name, name);
994         if (!ast_context_find(context_name)) {
995                 if (!ast_context_find_or_create(NULL, NULL, context_name, "res_stasis")) {
996                         ast_log(LOG_WARNING, "Could not create context '%s' for Stasis application '%s'\n", context_name, name);
997                 } else {
998                         ast_add_extension(context_name, 0, "_.", 1, NULL, NULL, "Stasis", ast_strdup(name), ast_free_ptr, "res_stasis");
999                         ast_add_extension(context_name, 0, "h", 1, NULL, NULL, "NoOp", NULL, NULL, "res_stasis");
1000                 }
1001         } else {
1002                 ast_log(LOG_WARNING, "Not creating context '%s' for Stasis application '%s' because it already exists\n",
1003                         context_name, name);
1004         }
1005
1006         ao2_ref(app, +1);
1007         return app;
1008 }
1009
1010 struct stasis_topic *ast_app_get_topic(struct stasis_app *app)
1011 {
1012         return app->topic;
1013 }
1014
1015 /*!
1016  * \brief Send a message to the given application.
1017  * \param app App to send the message to.
1018  * \param message Message to send.
1019  */
1020 void app_send(struct stasis_app *app, struct ast_json *message)
1021 {
1022         stasis_app_cb handler;
1023         char eid[20];
1024         void *data;
1025
1026         if (ast_json_object_set(message, "asterisk_id", ast_json_string_create(
1027                         ast_eid_to_str(eid, sizeof(eid), &ast_eid_default)))) {
1028                 ast_log(AST_LOG_WARNING, "Failed to append EID to outgoing event %s\n",
1029                         ast_json_string_get(ast_json_object_get(message, "type")));
1030         }
1031
1032         /* Copy off mutable state with lock held */
1033         ao2_lock(app);
1034         handler = app->handler;
1035         data = ao2_bump(app->data);
1036         ao2_unlock(app);
1037         /* Name is immutable; no need to copy */
1038
1039         if (handler) {
1040                 handler(data, app->name, message);
1041         } else {
1042                 ast_verb(3,
1043                         "Inactive Stasis app '%s' missed message\n", app->name);
1044         }
1045         ao2_cleanup(data);
1046 }
1047
1048 void app_deactivate(struct stasis_app *app)
1049 {
1050         ao2_lock(app);
1051
1052         ast_verb(1, "Deactivating Stasis app '%s'\n", app->name);
1053         app->handler = NULL;
1054         ao2_cleanup(app->data);
1055         app->data = NULL;
1056
1057         ao2_unlock(app);
1058 }
1059
1060 void app_shutdown(struct stasis_app *app)
1061 {
1062         ao2_lock(app);
1063
1064         ast_assert(app_is_finished(app));
1065
1066         stasis_message_router_unsubscribe(app->router);
1067         app->router = NULL;
1068         stasis_message_router_unsubscribe(app->bridge_router);
1069         app->bridge_router = NULL;
1070         stasis_message_router_unsubscribe(app->endpoint_router);
1071         app->endpoint_router = NULL;
1072
1073         ao2_unlock(app);
1074 }
1075
1076 int app_is_active(struct stasis_app *app)
1077 {
1078         int ret;
1079
1080         ao2_lock(app);
1081         ret = app->handler != NULL;
1082         ao2_unlock(app);
1083
1084         return ret;
1085 }
1086
1087 int app_is_finished(struct stasis_app *app)
1088 {
1089         int ret;
1090
1091         ao2_lock(app);
1092         ret = app->handler == NULL && ao2_container_count(app->forwards) == 0;
1093         ao2_unlock(app);
1094
1095         return ret;
1096 }
1097
1098 void app_update(struct stasis_app *app, stasis_app_cb handler, void *data)
1099 {
1100         ao2_lock(app);
1101         if (app->handler && app->data) {
1102                 struct ast_json *msg;
1103
1104                 ast_verb(1, "Replacing Stasis app '%s'\n", app->name);
1105
1106                 msg = ast_json_pack("{s: s, s: s}",
1107                         "type", "ApplicationReplaced",
1108                         "application", app->name);
1109                 if (msg) {
1110                         app_send(app, msg);
1111                         ast_json_unref(msg);
1112                 }
1113         } else {
1114                 ast_verb(1, "Activating Stasis app '%s'\n", app->name);
1115         }
1116
1117         app->handler = handler;
1118         ao2_replace(app->data, data);
1119         ao2_unlock(app);
1120 }
1121
1122 const char *stasis_app_name(const struct stasis_app *app)
1123 {
1124         return app->name;
1125 }
1126
1127 static int forwards_filter_by_type(void *obj, void *arg, int flags)
1128 {
1129         struct app_forwards *forward = obj;
1130         enum forward_type *forward_type = arg;
1131
1132         if (forward->forward_type == *forward_type) {
1133                 return CMP_MATCH;
1134         }
1135
1136         return 0;
1137 }
1138
1139 void stasis_app_to_cli(const struct stasis_app *app, struct ast_cli_args *a)
1140 {
1141         struct ao2_iterator *channels;
1142         struct ao2_iterator *endpoints;
1143         struct ao2_iterator *bridges;
1144         struct app_forwards *forward;
1145         enum forward_type forward_type;
1146
1147         ast_cli(a->fd, "Name: %s\n"
1148                 "  Debug: %s\n"
1149                 "  Subscription Model: %s\n",
1150                 app->name,
1151                 app->debug ? "Yes" : "No",
1152                 app->subscription_model == STASIS_APP_SUBSCRIBE_ALL ?
1153                         "Global Resource Subscription" :
1154                         "Application/Explicit Resource Subscription");
1155         ast_cli(a->fd, "  Subscriptions: %d\n", ao2_container_count(app->forwards));
1156
1157         ast_cli(a->fd, "    Channels:\n");
1158         forward_type = FORWARD_CHANNEL;
1159         channels = ao2_callback(app->forwards, OBJ_MULTIPLE,
1160                 forwards_filter_by_type, &forward_type);
1161         if (channels) {
1162                 while ((forward = ao2_iterator_next(channels))) {
1163                         ast_cli(a->fd, "      %s (%d)\n", forward->id, forward->interested);
1164                         ao2_ref(forward, -1);
1165                 }
1166                 ao2_iterator_destroy(channels);
1167         }
1168
1169         ast_cli(a->fd, "    Bridges:\n");
1170         forward_type = FORWARD_BRIDGE;
1171         bridges = ao2_callback(app->forwards, OBJ_MULTIPLE,
1172                 forwards_filter_by_type, &forward_type);
1173         if (bridges) {
1174                 while ((forward = ao2_iterator_next(bridges))) {
1175                         ast_cli(a->fd, "      %s (%d)\n", forward->id, forward->interested);
1176                         ao2_ref(forward, -1);
1177                 }
1178                 ao2_iterator_destroy(bridges);
1179         }
1180
1181         ast_cli(a->fd, "    Endpoints:\n");
1182         forward_type = FORWARD_ENDPOINT;
1183         endpoints = ao2_callback(app->forwards, OBJ_MULTIPLE,
1184                 forwards_filter_by_type, &forward_type);
1185         if (endpoints) {
1186                 while ((forward = ao2_iterator_next(endpoints))) {
1187                         ast_cli(a->fd, "      %s (%d)\n", forward->id, forward->interested);
1188                         ao2_ref(forward, -1);
1189                 }
1190                 ao2_iterator_destroy(endpoints);
1191         }
1192 }
1193
1194 struct ast_json *app_to_json(const struct stasis_app *app)
1195 {
1196         struct ast_json *json;
1197         struct ast_json *channels;
1198         struct ast_json *bridges;
1199         struct ast_json *endpoints;
1200         struct ao2_iterator i;
1201         struct app_forwards *forwards;
1202
1203         json = ast_json_pack("{s: s, s: [], s: [], s: []}",
1204                 "name", app->name,
1205                 "channel_ids", "bridge_ids", "endpoint_ids");
1206         if (!json) {
1207                 return NULL;
1208         }
1209         channels = ast_json_object_get(json, "channel_ids");
1210         bridges = ast_json_object_get(json, "bridge_ids");
1211         endpoints = ast_json_object_get(json, "endpoint_ids");
1212
1213         i = ao2_iterator_init(app->forwards, 0);
1214         while ((forwards = ao2_iterator_next(&i))) {
1215                 struct ast_json *array = NULL;
1216                 int append_res;
1217
1218                 switch (forwards->forward_type) {
1219                 case FORWARD_CHANNEL:
1220                         array = channels;
1221                         break;
1222                 case FORWARD_BRIDGE:
1223                         array = bridges;
1224                         break;
1225                 case FORWARD_ENDPOINT:
1226                         array = endpoints;
1227                         break;
1228                 }
1229
1230                 /* If forward_type value is unexpected this will safely return an error. */
1231                 append_res = ast_json_array_append(array, ast_json_string_create(forwards->id));
1232                 ao2_ref(forwards, -1);
1233
1234                 if (append_res != 0) {
1235                         ast_log(LOG_ERROR, "Error building response\n");
1236                         ao2_iterator_destroy(&i);
1237                         ast_json_unref(json);
1238
1239                         return NULL;
1240                 }
1241         }
1242         ao2_iterator_destroy(&i);
1243
1244         return json;
1245 }
1246
1247 int app_subscribe_channel(struct stasis_app *app, struct ast_channel *chan)
1248 {
1249         struct app_forwards *forwards;
1250
1251         if (!app) {
1252                 return -1;
1253         }
1254
1255         ao2_lock(app->forwards);
1256         /* If subscribed to all, don't subscribe again */
1257         forwards = ao2_find(app->forwards, CHANNEL_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1258         if (forwards) {
1259                 ao2_unlock(app->forwards);
1260                 ao2_ref(forwards, -1);
1261
1262                 return 0;
1263         }
1264
1265         forwards = ao2_find(app->forwards,
1266                 chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL,
1267                 OBJ_SEARCH_KEY | OBJ_NOLOCK);
1268         if (!forwards) {
1269                 int res;
1270
1271                 /* Forwards not found, create one */
1272                 forwards = forwards_create_channel(app, chan);
1273                 if (!forwards) {
1274                         ao2_unlock(app->forwards);
1275
1276                         return -1;
1277                 }
1278
1279                 res = ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
1280                 if (!res) {
1281                         ao2_unlock(app->forwards);
1282                         ao2_ref(forwards, -1);
1283
1284                         return -1;
1285                 }
1286         }
1287
1288         ++forwards->interested;
1289         ast_debug(3, "Channel '%s' is %d interested in %s\n",
1290                 chan ? ast_channel_uniqueid(chan) : "ALL",
1291                 forwards->interested,
1292                 app->name);
1293
1294         ao2_unlock(app->forwards);
1295         ao2_ref(forwards, -1);
1296
1297         return 0;
1298 }
1299
1300 static int subscribe_channel(struct stasis_app *app, void *obj)
1301 {
1302         return app_subscribe_channel(app, obj);
1303 }
1304
1305 static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate)
1306 {
1307         struct app_forwards *forwards;
1308
1309         if (!id) {
1310                 if (!strcmp(kind, "bridge")) {
1311                         id = BRIDGE_ALL;
1312                 } else if (!strcmp(kind, "channel")) {
1313                         id = CHANNEL_ALL;
1314                 } else if (!strcmp(kind, "endpoint")) {
1315                         id = ENDPOINT_ALL;
1316                 } else {
1317                         ast_log(LOG_WARNING, "Unknown subscription kind '%s'\n", kind);
1318                         return -1;
1319                 }
1320         }
1321
1322         ao2_lock(app->forwards);
1323         forwards = ao2_find(app->forwards, id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1324         if (!forwards) {
1325                 ao2_unlock(app->forwards);
1326                 ast_debug(3, "App '%s' not subscribed to %s '%s'\n", app->name, kind, id);
1327                 return -1;
1328         }
1329         forwards->interested--;
1330
1331         ast_debug(3, "%s '%s': is %d interested in %s\n", kind, id, forwards->interested, app->name);
1332         if (forwards->interested == 0 || terminate) {
1333                 /* No one is interested any more; unsubscribe */
1334                 ast_debug(3, "%s '%s' unsubscribed from %s\n", kind, id, app->name);
1335                 forwards_unsubscribe(forwards);
1336                 ao2_find(app->forwards, forwards,
1337                         OBJ_POINTER | OBJ_NOLOCK | OBJ_UNLINK |
1338                         OBJ_NODATA);
1339
1340                 if (!strcmp(kind, "endpoint")) {
1341                         messaging_app_unsubscribe_endpoint(app->name, id);
1342                 }
1343         }
1344         ao2_unlock(app->forwards);
1345         ao2_ref(forwards, -1);
1346
1347         return 0;
1348 }
1349
1350 int app_unsubscribe_channel(struct stasis_app *app, struct ast_channel *chan)
1351 {
1352         if (!app) {
1353                 return -1;
1354         }
1355
1356         return app_unsubscribe_channel_id(app, chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL);
1357 }
1358
1359 int app_unsubscribe_channel_id(struct stasis_app *app, const char *channel_id)
1360 {
1361         if (!app) {
1362                 return -1;
1363         }
1364
1365         return unsubscribe(app, "channel", channel_id, 0);
1366 }
1367
1368 int app_is_subscribed_channel_id(struct stasis_app *app, const char *channel_id)
1369 {
1370         struct app_forwards *forwards;
1371
1372         if (ast_strlen_zero(channel_id)) {
1373                 channel_id = CHANNEL_ALL;
1374         }
1375         forwards = ao2_find(app->forwards, channel_id, OBJ_SEARCH_KEY);
1376         ao2_cleanup(forwards);
1377
1378         return forwards != NULL;
1379 }
1380
1381 static void *channel_find(const struct stasis_app *app, const char *id)
1382 {
1383         return ast_channel_get_by_name(id);
1384 }
1385
1386 struct stasis_app_event_source channel_event_source = {
1387         .scheme = "channel:",
1388         .find = channel_find,
1389         .subscribe = subscribe_channel,
1390         .unsubscribe = app_unsubscribe_channel_id,
1391         .is_subscribed = app_is_subscribed_channel_id
1392 };
1393
1394 int app_subscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
1395 {
1396         struct app_forwards *forwards;
1397
1398         if (!app) {
1399                 return -1;
1400         }
1401
1402         ao2_lock(app->forwards);
1403         /* If subscribed to all, don't subscribe again */
1404         forwards = ao2_find(app->forwards, BRIDGE_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1405         if (forwards) {
1406                 ao2_unlock(app->forwards);
1407                 ao2_ref(forwards, -1);
1408
1409                 return 0;
1410         }
1411
1412         forwards = ao2_find(app->forwards,
1413                 bridge ? bridge->uniqueid : BRIDGE_ALL,
1414                 OBJ_SEARCH_KEY | OBJ_NOLOCK);
1415         if (!forwards) {
1416                 int res;
1417
1418                 /* Forwards not found, create one */
1419                 forwards = forwards_create_bridge(app, bridge);
1420                 if (!forwards) {
1421                         ao2_unlock(app->forwards);
1422
1423                         return -1;
1424                 }
1425
1426                 res = ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
1427                 if (!res) {
1428                         ao2_unlock(app->forwards);
1429                         ao2_ref(forwards, -1);
1430
1431                         return -1;
1432                 }
1433         }
1434
1435         ++forwards->interested;
1436         ast_debug(3, "Bridge '%s' is %d interested in %s\n",
1437                 bridge ? bridge->uniqueid : "ALL",
1438                 forwards->interested,
1439                 app->name);
1440
1441         ao2_unlock(app->forwards);
1442         ao2_ref(forwards, -1);
1443
1444         return 0;
1445 }
1446
1447 static int subscribe_bridge(struct stasis_app *app, void *obj)
1448 {
1449         return app_subscribe_bridge(app, obj);
1450 }
1451
1452 int app_unsubscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
1453 {
1454         if (!app) {
1455                 return -1;
1456         }
1457
1458         return app_unsubscribe_bridge_id(app, bridge ? bridge->uniqueid : BRIDGE_ALL);
1459 }
1460
1461 int app_unsubscribe_bridge_id(struct stasis_app *app, const char *bridge_id)
1462 {
1463         if (!app) {
1464                 return -1;
1465         }
1466
1467         return unsubscribe(app, "bridge", bridge_id, 0);
1468 }
1469
1470 int app_is_subscribed_bridge_id(struct stasis_app *app, const char *bridge_id)
1471 {
1472         struct app_forwards *forwards;
1473
1474         if (ast_strlen_zero(bridge_id)) {
1475                 bridge_id = BRIDGE_ALL;
1476         }
1477
1478         forwards = ao2_find(app->forwards, bridge_id, OBJ_SEARCH_KEY);
1479         ao2_cleanup(forwards);
1480
1481         return forwards != NULL;
1482 }
1483
1484 static void *bridge_find(const struct stasis_app *app, const char *id)
1485 {
1486         return stasis_app_bridge_find_by_id(id);
1487 }
1488
1489 struct stasis_app_event_source bridge_event_source = {
1490         .scheme = "bridge:",
1491         .find = bridge_find,
1492         .subscribe = subscribe_bridge,
1493         .unsubscribe = app_unsubscribe_bridge_id,
1494         .is_subscribed = app_is_subscribed_bridge_id
1495 };
1496
1497 int app_subscribe_endpoint(struct stasis_app *app, struct ast_endpoint *endpoint)
1498 {
1499         struct app_forwards *forwards;
1500
1501         if (!app) {
1502                 return -1;
1503         }
1504
1505         ao2_lock(app->forwards);
1506         /* If subscribed to all, don't subscribe again */
1507         forwards = ao2_find(app->forwards, ENDPOINT_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1508         if (forwards) {
1509                 ao2_unlock(app->forwards);
1510                 ao2_ref(forwards, -1);
1511
1512                 return 0;
1513         }
1514
1515         forwards = ao2_find(app->forwards,
1516                 endpoint ? ast_endpoint_get_id(endpoint) : ENDPOINT_ALL,
1517                 OBJ_SEARCH_KEY | OBJ_NOLOCK);
1518         if (!forwards) {
1519                 int res;
1520
1521                 /* Forwards not found, create one */
1522                 forwards = forwards_create_endpoint(app, endpoint);
1523                 if (!forwards) {
1524                         ao2_unlock(app->forwards);
1525
1526                         return -1;
1527                 }
1528
1529                 res = ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
1530                 if (!res) {
1531                         ao2_unlock(app->forwards);
1532                         ao2_ref(forwards, -1);
1533
1534                         return -1;
1535                 }
1536
1537                 /* Subscribe for messages */
1538                 messaging_app_subscribe_endpoint(app->name, endpoint, &message_received_handler, app);
1539         }
1540
1541         ++forwards->interested;
1542         ast_debug(3, "Endpoint '%s' is %d interested in %s\n",
1543                 endpoint ? ast_endpoint_get_id(endpoint) : "ALL",
1544                 forwards->interested,
1545                 app->name);
1546
1547         ao2_unlock(app->forwards);
1548         ao2_ref(forwards, -1);
1549
1550         return 0;
1551 }
1552
1553 static int subscribe_endpoint(struct stasis_app *app, void *obj)
1554 {
1555         return app_subscribe_endpoint(app, obj);
1556 }
1557
1558 int app_unsubscribe_endpoint_id(struct stasis_app *app, const char *endpoint_id)
1559 {
1560         if (!app) {
1561                 return -1;
1562         }
1563
1564         return unsubscribe(app, "endpoint", endpoint_id, 0);
1565 }
1566
1567 int app_is_subscribed_endpoint_id(struct stasis_app *app, const char *endpoint_id)
1568 {
1569         struct app_forwards *forwards;
1570
1571         if (ast_strlen_zero(endpoint_id)) {
1572                 endpoint_id = ENDPOINT_ALL;
1573         }
1574         forwards = ao2_find(app->forwards, endpoint_id, OBJ_SEARCH_KEY);
1575         ao2_cleanup(forwards);
1576
1577         return forwards != NULL;
1578 }
1579
1580 static void *endpoint_find(const struct stasis_app *app, const char *id)
1581 {
1582         return ast_endpoint_find_by_id(id);
1583 }
1584
1585 struct stasis_app_event_source endpoint_event_source = {
1586         .scheme = "endpoint:",
1587         .find = endpoint_find,
1588         .subscribe = subscribe_endpoint,
1589         .unsubscribe = app_unsubscribe_endpoint_id,
1590         .is_subscribed = app_is_subscribed_endpoint_id
1591 };
1592
1593 void stasis_app_register_event_sources(void)
1594 {
1595         stasis_app_register_event_source(&channel_event_source);
1596         stasis_app_register_event_source(&bridge_event_source);
1597         stasis_app_register_event_source(&endpoint_event_source);
1598 }
1599
1600 void stasis_app_unregister_event_sources(void)
1601 {
1602         stasis_app_unregister_event_source(&endpoint_event_source);
1603         stasis_app_unregister_event_source(&bridge_event_source);
1604         stasis_app_unregister_event_source(&channel_event_source);
1605 }