b99e23205e560fef439f0854bdb1a5048b245329
[asterisk/asterisk.git] / res / stasis / app.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis application support.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 #include "asterisk.h"
27
28 ASTERISK_REGISTER_FILE()
29
30 #include "app.h"
31 #include "control.h"
32 #include "messaging.h"
33
34 #include "asterisk/callerid.h"
35 #include "asterisk/stasis_app.h"
36 #include "asterisk/stasis_bridges.h"
37 #include "asterisk/stasis_channels.h"
38 #include "asterisk/stasis_endpoints.h"
39 #include "asterisk/stasis_message_router.h"
40
41 static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate);
42
43 struct stasis_app {
44         /*! Aggregation topic for this application. */
45         struct stasis_topic *topic;
46         /*! Router for handling messages forwarded to \a topic. */
47         struct stasis_message_router *router;
48         /*! Router for handling messages to the bridge all \a topic. */
49         struct stasis_message_router *bridge_router;
50         /*! Container of the channel forwards to this app's topic. */
51         struct ao2_container *forwards;
52         /*! Callback function for this application. */
53         stasis_app_cb handler;
54         /*! Opaque data to hand to callback function. */
55         void *data;
56         /*! Name of the Stasis application */
57         char name[];
58 };
59
60 enum forward_type {
61         FORWARD_CHANNEL,
62         FORWARD_BRIDGE,
63         FORWARD_ENDPOINT,
64 };
65
66 /*! Subscription info for a particular channel/bridge. */
67 struct app_forwards {
68         /*! Count of number of times this channel/bridge has been subscribed */
69         int interested;
70
71         /*! Forward for the regular topic */
72         struct stasis_forward *topic_forward;
73         /*! Forward for the caching topic */
74         struct stasis_forward *topic_cached_forward;
75
76         /* Type of object being forwarded */
77         enum forward_type forward_type;
78         /*! Unique id of the object being forwarded */
79         char id[];
80 };
81
82 static void forwards_dtor(void *obj)
83 {
84 #ifdef AST_DEVMODE
85         struct app_forwards *forwards = obj;
86 #endif /* AST_DEVMODE */
87
88         ast_assert(forwards->topic_forward == NULL);
89         ast_assert(forwards->topic_cached_forward == NULL);
90 }
91
92 static void forwards_unsubscribe(struct app_forwards *forwards)
93 {
94         stasis_forward_cancel(forwards->topic_forward);
95         forwards->topic_forward = NULL;
96         stasis_forward_cancel(forwards->topic_cached_forward);
97         forwards->topic_cached_forward = NULL;
98 }
99
100 static struct app_forwards *forwards_create(struct stasis_app *app,
101         const char *id)
102 {
103         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
104
105         if (!app || ast_strlen_zero(id)) {
106                 return NULL;
107         }
108
109         forwards = ao2_alloc(sizeof(*forwards) + strlen(id) + 1, forwards_dtor);
110         if (!forwards) {
111                 return NULL;
112         }
113
114         strcpy(forwards->id, id);
115
116         ao2_ref(forwards, +1);
117         return forwards;
118 }
119
120 /*! Forward a channel's topics to an app */
121 static struct app_forwards *forwards_create_channel(struct stasis_app *app,
122         struct ast_channel *chan)
123 {
124         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
125
126         if (!app || !chan) {
127                 return NULL;
128         }
129
130         forwards = forwards_create(app, ast_channel_uniqueid(chan));
131         if (!forwards) {
132                 return NULL;
133         }
134
135         forwards->forward_type = FORWARD_CHANNEL;
136         forwards->topic_forward = stasis_forward_all(ast_channel_topic(chan),
137                 app->topic);
138         if (!forwards->topic_forward) {
139                 return NULL;
140         }
141
142         forwards->topic_cached_forward = stasis_forward_all(
143                 ast_channel_topic_cached(chan), app->topic);
144         if (!forwards->topic_cached_forward) {
145                 /* Half-subscribed is a bad thing */
146                 stasis_forward_cancel(forwards->topic_forward);
147                 forwards->topic_forward = NULL;
148                 return NULL;
149         }
150
151         ao2_ref(forwards, +1);
152         return forwards;
153 }
154
155 /*! Forward a bridge's topics to an app */
156 static struct app_forwards *forwards_create_bridge(struct stasis_app *app,
157         struct ast_bridge *bridge)
158 {
159         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
160
161         if (!app || !bridge) {
162                 return NULL;
163         }
164
165         forwards = forwards_create(app, bridge->uniqueid);
166         if (!forwards) {
167                 return NULL;
168         }
169
170         forwards->forward_type = FORWARD_BRIDGE;
171         forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge),
172                 app->topic);
173         if (!forwards->topic_forward) {
174                 return NULL;
175         }
176
177         forwards->topic_cached_forward = stasis_forward_all(
178                 ast_bridge_topic_cached(bridge), app->topic);
179         if (!forwards->topic_cached_forward) {
180                 /* Half-subscribed is a bad thing */
181                 stasis_forward_cancel(forwards->topic_forward);
182                 forwards->topic_forward = NULL;
183                 return NULL;
184         }
185
186         ao2_ref(forwards, +1);
187         return forwards;
188 }
189
190 /*! Forward a endpoint's topics to an app */
191 static struct app_forwards *forwards_create_endpoint(struct stasis_app *app,
192         struct ast_endpoint *endpoint)
193 {
194         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
195
196         if (!app || !endpoint) {
197                 return NULL;
198         }
199
200         forwards = forwards_create(app, ast_endpoint_get_id(endpoint));
201         if (!forwards) {
202                 return NULL;
203         }
204
205         forwards->forward_type = FORWARD_ENDPOINT;
206         forwards->topic_forward = stasis_forward_all(ast_endpoint_topic(endpoint),
207                 app->topic);
208         if (!forwards->topic_forward) {
209                 return NULL;
210         }
211
212         forwards->topic_cached_forward = stasis_forward_all(
213                 ast_endpoint_topic_cached(endpoint), app->topic);
214         if (!forwards->topic_cached_forward) {
215                 /* Half-subscribed is a bad thing */
216                 stasis_forward_cancel(forwards->topic_forward);
217                 forwards->topic_forward = NULL;
218                 return NULL;
219         }
220
221         ao2_ref(forwards, +1);
222         return forwards;
223 }
224
225 static int forwards_sort(const void *obj_left, const void *obj_right, int flags)
226 {
227         const struct app_forwards *object_left = obj_left;
228         const struct app_forwards *object_right = obj_right;
229         const char *right_key = obj_right;
230         int cmp;
231
232         switch (flags & (OBJ_POINTER | OBJ_KEY | OBJ_PARTIAL_KEY)) {
233         case OBJ_POINTER:
234                 right_key = object_right->id;
235                 /* Fall through */
236         case OBJ_KEY:
237                 cmp = strcmp(object_left->id, right_key);
238                 break;
239         case OBJ_PARTIAL_KEY:
240                 /*
241                  * We could also use a partial key struct containing a length
242                  * so strlen() does not get called for every comparison instead.
243                  */
244                 cmp = strncmp(object_left->id, right_key, strlen(right_key));
245                 break;
246         default:
247                 /* Sort can only work on something with a full or partial key. */
248                 ast_assert(0);
249                 cmp = 0;
250                 break;
251         }
252         return cmp;
253 }
254
255 static void app_dtor(void *obj)
256 {
257         struct stasis_app *app = obj;
258
259         ast_verb(1, "Destroying Stasis app %s\n", app->name);
260
261         ast_assert(app->router == NULL);
262         ast_assert(app->bridge_router == NULL);
263
264         ao2_cleanup(app->topic);
265         app->topic = NULL;
266         ao2_cleanup(app->forwards);
267         app->forwards = NULL;
268         ao2_cleanup(app->data);
269         app->data = NULL;
270 }
271
272 static void call_forwarded_handler(struct stasis_app *app, struct stasis_message *message)
273 {
274         struct ast_multi_channel_blob *payload = stasis_message_data(message);
275         struct ast_channel_snapshot *snapshot = ast_multi_channel_blob_get_channel(payload, "forwarded");
276         struct ast_channel *chan;
277
278         if (!snapshot) {
279                 return;
280         }
281
282         chan = ast_channel_get_by_name(snapshot->uniqueid);
283         if (!chan) {
284                 return;
285         }
286
287         app_subscribe_channel(app, chan);
288         ast_channel_unref(chan);
289 }
290
291 static void sub_default_handler(void *data, struct stasis_subscription *sub,
292         struct stasis_message *message)
293 {
294         struct stasis_app *app = data;
295         RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
296
297         if (stasis_subscription_final_message(sub, message)) {
298                 ao2_cleanup(app);
299         }
300
301         if (stasis_message_type(message) == ast_channel_dial_type()) {
302                 call_forwarded_handler(app, message);
303         }
304
305         /* By default, send any message that has a JSON representation */
306         json = stasis_message_to_json(message, stasis_app_get_sanitizer());
307         if (!json) {
308                 return;
309         }
310
311         app_send(app, json);
312 }
313
314 /*! \brief Typedef for callbacks that get called on channel snapshot updates */
315 typedef struct ast_json *(*channel_snapshot_monitor)(
316         struct ast_channel_snapshot *old_snapshot,
317         struct ast_channel_snapshot *new_snapshot,
318         const struct timeval *tv);
319
320 static struct ast_json *simple_channel_event(
321         const char *type,
322         struct ast_channel_snapshot *snapshot,
323         const struct timeval *tv)
324 {
325         struct ast_json *json_channel = ast_channel_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
326
327         if (!json_channel) {
328                 return NULL;
329         }
330
331         return ast_json_pack("{s: s, s: o, s: o}",
332                 "type", type,
333                 "timestamp", ast_json_timeval(*tv, NULL),
334                 "channel", json_channel);
335 }
336
337 static struct ast_json *channel_created_event(
338         struct ast_channel_snapshot *snapshot,
339         const struct timeval *tv)
340 {
341         return simple_channel_event("ChannelCreated", snapshot, tv);
342 }
343
344 static struct ast_json *channel_destroyed_event(
345         struct ast_channel_snapshot *snapshot,
346         const struct timeval *tv)
347 {
348         struct ast_json *json_channel = ast_channel_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
349
350         if (!json_channel) {
351                 return NULL;
352         }
353
354         return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
355                 "type", "ChannelDestroyed",
356                 "timestamp", ast_json_timeval(*tv, NULL),
357                 "cause", snapshot->hangupcause,
358                 "cause_txt", ast_cause2str(snapshot->hangupcause),
359                 "channel", json_channel);
360 }
361
362 static struct ast_json *channel_state_change_event(
363         struct ast_channel_snapshot *snapshot,
364         const struct timeval *tv)
365 {
366         return simple_channel_event("ChannelStateChange", snapshot, tv);
367 }
368
369 /*! \brief Handle channel state changes */
370 static struct ast_json *channel_state(
371         struct ast_channel_snapshot *old_snapshot,
372         struct ast_channel_snapshot *new_snapshot,
373         const struct timeval *tv)
374 {
375         struct ast_channel_snapshot *snapshot = new_snapshot ?
376                 new_snapshot : old_snapshot;
377
378         if (!old_snapshot) {
379                 return channel_created_event(snapshot, tv);
380         } else if (!new_snapshot) {
381                 return channel_destroyed_event(snapshot, tv);
382         } else if (old_snapshot->state != new_snapshot->state) {
383                 return channel_state_change_event(snapshot, tv);
384         }
385
386         return NULL;
387 }
388
389 static struct ast_json *channel_dialplan(
390         struct ast_channel_snapshot *old_snapshot,
391         struct ast_channel_snapshot *new_snapshot,
392         const struct timeval *tv)
393 {
394         struct ast_json *json_channel;
395
396         /* No Newexten event on cache clear or first event */
397         if (!old_snapshot || !new_snapshot) {
398                 return NULL;
399         }
400
401         /* Empty application is not valid for a Newexten event */
402         if (ast_strlen_zero(new_snapshot->appl)) {
403                 return NULL;
404         }
405
406         if (ast_channel_snapshot_cep_equal(old_snapshot, new_snapshot)) {
407                 return NULL;
408         }
409
410         json_channel = ast_channel_snapshot_to_json(new_snapshot, stasis_app_get_sanitizer());
411         if (!json_channel) {
412                 return NULL;
413         }
414
415         return ast_json_pack("{s: s, s: o, s: s, s: s, s: o}",
416                 "type", "ChannelDialplan",
417                 "timestamp", ast_json_timeval(*tv, NULL),
418                 "dialplan_app", new_snapshot->appl,
419                 "dialplan_app_data", new_snapshot->data,
420                 "channel", json_channel);
421 }
422
423 static struct ast_json *channel_callerid(
424         struct ast_channel_snapshot *old_snapshot,
425         struct ast_channel_snapshot *new_snapshot,
426         const struct timeval *tv)
427 {
428         struct ast_json *json_channel;
429
430         /* No NewCallerid event on cache clear or first event */
431         if (!old_snapshot || !new_snapshot) {
432                 return NULL;
433         }
434
435         if (ast_channel_snapshot_caller_id_equal(old_snapshot, new_snapshot)) {
436                 return NULL;
437         }
438
439         json_channel = ast_channel_snapshot_to_json(new_snapshot, stasis_app_get_sanitizer());
440         if (!json_channel) {
441                 return NULL;
442         }
443
444         return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
445                 "type", "ChannelCallerId",
446                 "timestamp", ast_json_timeval(*tv, NULL),
447                 "caller_presentation", new_snapshot->caller_pres,
448                 "caller_presentation_txt", ast_describe_caller_presentation(
449                         new_snapshot->caller_pres),
450                 "channel", json_channel);
451 }
452
453 static struct ast_json *channel_connected_line(
454         struct ast_channel_snapshot *old_snapshot,
455         struct ast_channel_snapshot *new_snapshot,
456         const struct timeval *tv)
457 {
458         struct ast_json *json_channel;
459
460         /* No ChannelConnectedLine event on cache clear or first event */
461         if (!old_snapshot || !new_snapshot) {
462                 return NULL;
463         }
464
465         if (ast_channel_snapshot_connected_line_equal(old_snapshot, new_snapshot)) {
466                 return NULL;
467         }
468
469         json_channel = ast_channel_snapshot_to_json(new_snapshot, stasis_app_get_sanitizer());
470         if (!json_channel) {
471                 return NULL;
472         }
473
474         return ast_json_pack("{s: s, s: o, s: o}",
475                 "type", "ChannelConnectedLine",
476                 "timestamp", ast_json_timeval(*tv, NULL),
477                 "channel", json_channel);
478 }
479
480 static channel_snapshot_monitor channel_monitors[] = {
481         channel_state,
482         channel_dialplan,
483         channel_callerid,
484         channel_connected_line,
485 };
486
487 static void sub_channel_update_handler(void *data,
488         struct stasis_subscription *sub,
489         struct stasis_message *message)
490 {
491         struct stasis_app *app = data;
492         struct stasis_cache_update *update;
493         struct ast_channel_snapshot *new_snapshot;
494         struct ast_channel_snapshot *old_snapshot;
495         const struct timeval *tv;
496         int i;
497
498         ast_assert(stasis_message_type(message) == stasis_cache_update_type());
499
500         update = stasis_message_data(message);
501
502         ast_assert(update->type == ast_channel_snapshot_type());
503
504         new_snapshot = stasis_message_data(update->new_snapshot);
505         old_snapshot = stasis_message_data(update->old_snapshot);
506
507         /* Pull timestamp from the new snapshot, or from the update message
508          * when there isn't one. */
509         tv = update->new_snapshot ?
510                 stasis_message_timestamp(update->new_snapshot) :
511                 stasis_message_timestamp(message);
512
513         for (i = 0; i < ARRAY_LEN(channel_monitors); ++i) {
514                 RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
515
516                 msg = channel_monitors[i](old_snapshot, new_snapshot, tv);
517                 if (msg) {
518                         app_send(app, msg);
519                 }
520         }
521
522         if (!new_snapshot && old_snapshot) {
523                 unsubscribe(app, "channel", old_snapshot->uniqueid, 1);
524         }
525 }
526
527 static struct ast_json *simple_endpoint_event(
528         const char *type,
529         struct ast_endpoint_snapshot *snapshot,
530         const struct timeval *tv)
531 {
532         struct ast_json *json_endpoint = ast_endpoint_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
533
534         if (!json_endpoint) {
535                 return NULL;
536         }
537
538         return ast_json_pack("{s: s, s: o, s: o}",
539                 "type", type,
540                 "timestamp", ast_json_timeval(*tv, NULL),
541                 "endpoint", json_endpoint);
542 }
543
544 static int message_received_handler(const char *endpoint_id, struct ast_json *json_msg, void *pvt)
545 {
546         RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
547         struct ast_json *json_endpoint;
548         struct stasis_app *app = pvt;
549         char *tech;
550         char *resource;
551
552         tech = ast_strdupa(endpoint_id);
553         resource = strchr(tech, '/');
554         if (resource) {
555                 resource[0] = '\0';
556                 resource++;
557         }
558
559         if (ast_strlen_zero(tech) || ast_strlen_zero(resource)) {
560                 return -1;
561         }
562
563         snapshot = ast_endpoint_latest_snapshot(tech, resource);
564         if (!snapshot) {
565                 return -1;
566         }
567
568         json_endpoint = ast_endpoint_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
569         if (!json_endpoint) {
570                 return -1;
571         }
572
573         app_send(app, ast_json_pack("{s: s, s: o, s: o, s: O}",
574                 "type", "TextMessageReceived",
575                 "timestamp", ast_json_timeval(ast_tvnow(), NULL),
576                 "endpoint", json_endpoint,
577                 "message", json_msg));
578
579         return 0;
580 }
581
582 static void sub_endpoint_update_handler(void *data,
583         struct stasis_subscription *sub,
584         struct stasis_message *message)
585 {
586         RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
587         struct stasis_app *app = data;
588         struct stasis_cache_update *update;
589         struct ast_endpoint_snapshot *new_snapshot;
590         struct ast_endpoint_snapshot *old_snapshot;
591         const struct timeval *tv;
592
593         ast_assert(stasis_message_type(message) == stasis_cache_update_type());
594
595         update = stasis_message_data(message);
596
597         ast_assert(update->type == ast_endpoint_snapshot_type());
598
599         new_snapshot = stasis_message_data(update->new_snapshot);
600         old_snapshot = stasis_message_data(update->old_snapshot);
601
602         if (new_snapshot) {
603                 tv = stasis_message_timestamp(update->new_snapshot);
604
605                 json = simple_endpoint_event("EndpointStateChange", new_snapshot, tv);
606                 if (!json) {
607                         return;
608                 }
609
610                 app_send(app, json);
611         }
612
613         if (!new_snapshot && old_snapshot) {
614                 unsubscribe(app, "endpoint", old_snapshot->id, 1);
615         }
616 }
617
618 static struct ast_json *simple_bridge_event(
619         const char *type,
620         struct ast_bridge_snapshot *snapshot,
621         const struct timeval *tv)
622 {
623         struct ast_json *json_bridge = ast_bridge_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
624         if (!json_bridge) {
625                 return NULL;
626         }
627
628         return ast_json_pack("{s: s, s: o, s: o}",
629                 "type", type,
630                 "timestamp", ast_json_timeval(*tv, NULL),
631                 "bridge", json_bridge);
632 }
633
634 static void sub_bridge_update_handler(void *data,
635         struct stasis_subscription *sub,
636         struct stasis_message *message)
637 {
638         RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
639         struct stasis_app *app = data;
640         struct stasis_cache_update *update;
641         struct ast_bridge_snapshot *new_snapshot;
642         struct ast_bridge_snapshot *old_snapshot;
643         const struct timeval *tv;
644
645         ast_assert(stasis_message_type(message) == stasis_cache_update_type());
646
647         update = stasis_message_data(message);
648
649         ast_assert(update->type == ast_bridge_snapshot_type());
650
651         new_snapshot = stasis_message_data(update->new_snapshot);
652         old_snapshot = stasis_message_data(update->old_snapshot);
653         tv = update->new_snapshot ?
654                 stasis_message_timestamp(update->new_snapshot) :
655                 stasis_message_timestamp(message);
656
657         if (!new_snapshot) {
658                 json = simple_bridge_event("BridgeDestroyed", old_snapshot, tv);
659         } else if (!old_snapshot) {
660                 json = simple_bridge_event("BridgeCreated", new_snapshot, tv);
661         }
662
663         if (json) {
664                 app_send(app, json);
665         }
666
667         if (!new_snapshot && old_snapshot) {
668                 unsubscribe(app, "bridge", old_snapshot->uniqueid, 1);
669         }
670 }
671
672
673 /*! \brief Helper function for determining if the application is subscribed to a given entity */
674 static int bridge_app_subscribed(struct stasis_app *app, const char *uniqueid)
675 {
676         struct app_forwards *forwards = NULL;
677
678         forwards = ao2_find(app->forwards, uniqueid, OBJ_SEARCH_KEY);
679         if (!forwards) {
680                 return 0;
681         }
682
683         ao2_ref(forwards, -1);
684         return 1;
685 }
686
687 static void bridge_merge_handler(void *data, struct stasis_subscription *sub,
688         struct stasis_message *message)
689 {
690         struct stasis_app *app = data;
691         struct ast_bridge_merge_message *merge;
692
693         merge = stasis_message_data(message);
694
695         /* Find out if we're subscribed to either bridge */
696         if (bridge_app_subscribed(app, merge->from->uniqueid) ||
697                 bridge_app_subscribed(app, merge->to->uniqueid)) {
698                 /* Forward the message to the app */
699                 stasis_publish(app->topic, message);
700         }
701 }
702
703 /*! \brief Callback function for checking if channels in a bridge are subscribed to */
704 static int bridge_app_subscribed_involved(struct stasis_app *app, struct ast_bridge_snapshot *snapshot)
705 {
706         int subscribed = 0;
707         struct ao2_iterator iter;
708         char *uniqueid;
709
710         if (bridge_app_subscribed(app, snapshot->uniqueid)) {
711                 return 1;
712         }
713
714         iter = ao2_iterator_init(snapshot->channels, 0);
715         for (; (uniqueid = ao2_iterator_next(&iter)); ao2_ref(uniqueid, -1)) {
716                 if (bridge_app_subscribed(app, uniqueid)) {
717                         subscribed = 1;
718                         ao2_ref(uniqueid, -1);
719                         break;
720                 }
721         }
722         ao2_iterator_destroy(&iter);
723
724         return subscribed;
725 }
726
727 static void bridge_blind_transfer_handler(void *data, struct stasis_subscription *sub,
728         struct stasis_message *message)
729 {
730         struct stasis_app *app = data;
731         struct ast_blind_transfer_message *transfer_msg = stasis_message_data(message);
732         struct ast_bridge_snapshot *bridge = transfer_msg->bridge;
733
734         if (bridge_app_subscribed(app, transfer_msg->transferer->uniqueid) ||
735                 (bridge && bridge_app_subscribed_involved(app, bridge))) {
736                 stasis_publish(app->topic, message);
737         }
738 }
739
740 static void bridge_attended_transfer_handler(void *data, struct stasis_subscription *sub,
741         struct stasis_message *message)
742 {
743         struct stasis_app *app = data;
744         struct ast_attended_transfer_message *transfer_msg = stasis_message_data(message);
745         int subscribed = 0;
746
747         subscribed = bridge_app_subscribed(app, transfer_msg->to_transferee.channel_snapshot->uniqueid);
748         if (!subscribed) {
749                 subscribed = bridge_app_subscribed(app, transfer_msg->to_transfer_target.channel_snapshot->uniqueid);
750         }
751         if (!subscribed && transfer_msg->to_transferee.bridge_snapshot) {
752                 subscribed = bridge_app_subscribed_involved(app, transfer_msg->to_transferee.bridge_snapshot);
753         }
754         if (!subscribed && transfer_msg->to_transfer_target.bridge_snapshot) {
755                 subscribed = bridge_app_subscribed_involved(app, transfer_msg->to_transfer_target.bridge_snapshot);
756         }
757
758         if (!subscribed) {
759                 switch (transfer_msg->dest_type) {
760                 case AST_ATTENDED_TRANSFER_DEST_BRIDGE_MERGE:
761                         subscribed = bridge_app_subscribed(app, transfer_msg->dest.bridge);
762                         break;
763                 case AST_ATTENDED_TRANSFER_DEST_LINK:
764                         subscribed = bridge_app_subscribed(app, transfer_msg->dest.links[0]->uniqueid);
765                         if (!subscribed) {
766                                 subscribed = bridge_app_subscribed(app, transfer_msg->dest.links[1]->uniqueid);
767                         }
768                         break;
769                 break;
770                 case AST_ATTENDED_TRANSFER_DEST_THREEWAY:
771                         subscribed = bridge_app_subscribed_involved(app, transfer_msg->dest.threeway.bridge_snapshot);
772                         if (!subscribed) {
773                                 subscribed = bridge_app_subscribed(app, transfer_msg->dest.threeway.channel_snapshot->uniqueid);
774                         }
775                         break;
776                 default:
777                         break;
778                 }
779         }
780
781         if (subscribed) {
782                 stasis_publish(app->topic, message);
783         }
784 }
785
786 static void bridge_default_handler(void *data, struct stasis_subscription *sub,
787         struct stasis_message *message)
788 {
789         struct stasis_app *app = data;
790
791         if (stasis_subscription_final_message(sub, message)) {
792                 ao2_cleanup(app);
793         }
794 }
795
796 struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *data)
797 {
798         RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
799         size_t size;
800         int res = 0;
801
802         ast_assert(name != NULL);
803         ast_assert(handler != NULL);
804
805         ast_verb(1, "Creating Stasis app '%s'\n", name);
806
807         size = sizeof(*app) + strlen(name) + 1;
808         app = ao2_alloc_options(size, app_dtor, AO2_ALLOC_OPT_LOCK_MUTEX);
809
810         if (!app) {
811                 return NULL;
812         }
813
814         app->forwards = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_MUTEX,
815                 AO2_CONTAINER_ALLOC_OPT_DUPS_OBJ_REJECT,
816                 forwards_sort, NULL);
817         if (!app->forwards) {
818                 return NULL;
819         }
820
821         app->topic = stasis_topic_create(name);
822         if (!app->topic) {
823                 return NULL;
824         }
825
826         app->bridge_router = stasis_message_router_create(ast_bridge_topic_all());
827         if (!app->bridge_router) {
828                 return NULL;
829         }
830
831         res |= stasis_message_router_add(app->bridge_router,
832                 ast_bridge_merge_message_type(), bridge_merge_handler, app);
833
834         res |= stasis_message_router_add(app->bridge_router,
835                 ast_blind_transfer_type(), bridge_blind_transfer_handler, app);
836
837         res |= stasis_message_router_add(app->bridge_router,
838                 ast_attended_transfer_type(), bridge_attended_transfer_handler, app);
839
840         res |= stasis_message_router_set_default(app->bridge_router,
841                 bridge_default_handler, app);
842
843         if (res != 0) {
844                 return NULL;
845         }
846         /* Bridge router holds a reference */
847         ao2_ref(app, +1);
848
849         app->router = stasis_message_router_create(app->topic);
850         if (!app->router) {
851                 return NULL;
852         }
853
854         res |= stasis_message_router_add_cache_update(app->router,
855                 ast_bridge_snapshot_type(), sub_bridge_update_handler, app);
856
857         res |= stasis_message_router_add_cache_update(app->router,
858                 ast_channel_snapshot_type(), sub_channel_update_handler, app);
859
860         res |= stasis_message_router_add_cache_update(app->router,
861                 ast_endpoint_snapshot_type(), sub_endpoint_update_handler, app);
862
863         res |= stasis_message_router_set_default(app->router,
864                 sub_default_handler, app);
865
866         if (res != 0) {
867                 return NULL;
868         }
869         /* Router holds a reference */
870         ao2_ref(app, +1);
871
872         strncpy(app->name, name, size - sizeof(*app));
873         app->handler = handler;
874         app->data = ao2_bump(data);
875
876         ao2_ref(app, +1);
877         return app;
878 }
879
880 struct stasis_topic *ast_app_get_topic(struct stasis_app *app) {
881         return app->topic;
882 }
883
884 /*!
885  * \brief Send a message to the given application.
886  * \param app App to send the message to.
887  * \param message Message to send.
888  */
889 void app_send(struct stasis_app *app, struct ast_json *message)
890 {
891         stasis_app_cb handler;
892         RAII_VAR(void *, data, NULL, ao2_cleanup);
893
894         /* Copy off mutable state with lock held */
895         {
896                 SCOPED_AO2LOCK(lock, app);
897                 handler = app->handler;
898                 if (app->data) {
899                         ao2_ref(app->data, +1);
900                         data = app->data;
901                 }
902                 /* Name is immutable; no need to copy */
903         }
904
905         if (!handler) {
906                 ast_verb(3,
907                         "Inactive Stasis app '%s' missed message\n", app->name);
908                 return;
909         }
910
911         handler(data, app->name, message);
912 }
913
914 void app_deactivate(struct stasis_app *app)
915 {
916         SCOPED_AO2LOCK(lock, app);
917         ast_verb(1, "Deactivating Stasis app '%s'\n", app->name);
918         app->handler = NULL;
919         ao2_cleanup(app->data);
920         app->data = NULL;
921 }
922
923 void app_shutdown(struct stasis_app *app)
924 {
925         SCOPED_AO2LOCK(lock, app);
926
927         ast_assert(app_is_finished(app));
928
929         stasis_message_router_unsubscribe(app->router);
930         app->router = NULL;
931         stasis_message_router_unsubscribe(app->bridge_router);
932         app->bridge_router = NULL;
933 }
934
935 int app_is_active(struct stasis_app *app)
936 {
937         SCOPED_AO2LOCK(lock, app);
938         return app->handler != NULL;
939 }
940
941 int app_is_finished(struct stasis_app *app)
942 {
943         SCOPED_AO2LOCK(lock, app);
944
945         return app->handler == NULL && ao2_container_count(app->forwards) == 0;
946 }
947
948 void app_update(struct stasis_app *app, stasis_app_cb handler, void *data)
949 {
950         SCOPED_AO2LOCK(lock, app);
951
952         if (app->handler && app->data) {
953                 RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
954
955                 ast_verb(1, "Replacing Stasis app '%s'\n", app->name);
956
957                 msg = ast_json_pack("{s: s, s: s}",
958                         "type", "ApplicationReplaced",
959                         "application", app->name);
960                 if (msg) {
961                         app_send(app, msg);
962                 }
963         } else {
964                 ast_verb(1, "Activating Stasis app '%s'\n", app->name);
965         }
966
967         app->handler = handler;
968         ao2_cleanup(app->data);
969         if (data) {
970                 ao2_ref(data, +1);
971         }
972         app->data = data;
973 }
974
975 const char *app_name(const struct stasis_app *app)
976 {
977         return app->name;
978 }
979
980 struct ast_json *app_to_json(const struct stasis_app *app)
981 {
982         RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
983         struct ast_json *channels;
984         struct ast_json *bridges;
985         struct ast_json *endpoints;
986         struct ao2_iterator i;
987         void *obj;
988
989         json = ast_json_pack("{s: s, s: [], s: [], s: []}",
990                 "name", app->name,
991                 "channel_ids", "bridge_ids", "endpoint_ids");
992         channels = ast_json_object_get(json, "channel_ids");
993         bridges = ast_json_object_get(json, "bridge_ids");
994         endpoints = ast_json_object_get(json, "endpoint_ids");
995
996         i = ao2_iterator_init(app->forwards, 0);
997         while ((obj = ao2_iterator_next(&i))) {
998                 RAII_VAR(struct app_forwards *, forwards, obj, ao2_cleanup);
999                 RAII_VAR(struct ast_json *, id, NULL, ast_json_unref);
1000                 int append_res = -1;
1001
1002                 id = ast_json_string_create(forwards->id);
1003
1004                 switch (forwards->forward_type) {
1005                 case FORWARD_CHANNEL:
1006                         append_res = ast_json_array_append(channels,
1007                                 ast_json_ref(id));
1008                         break;
1009                 case FORWARD_BRIDGE:
1010                         append_res = ast_json_array_append(bridges,
1011                                 ast_json_ref(id));
1012                         break;
1013                 case FORWARD_ENDPOINT:
1014                         append_res = ast_json_array_append(endpoints,
1015                                 ast_json_ref(id));
1016                         break;
1017                 }
1018
1019                 if (append_res != 0) {
1020                         ast_log(LOG_ERROR, "Error building response\n");
1021                         ao2_iterator_destroy(&i);
1022                         return NULL;
1023                 }
1024         }
1025         ao2_iterator_destroy(&i);
1026
1027         return ast_json_ref(json);
1028 }
1029
1030 int app_subscribe_channel(struct stasis_app *app, struct ast_channel *chan)
1031 {
1032         int res;
1033
1034         if (!app || !chan) {
1035                 return -1;
1036         } else {
1037                 RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
1038                 SCOPED_AO2LOCK(lock, app->forwards);
1039
1040                 forwards = ao2_find(app->forwards, ast_channel_uniqueid(chan),
1041                         OBJ_SEARCH_KEY | OBJ_NOLOCK);
1042                 if (!forwards) {
1043                         /* Forwards not found, create one */
1044                         forwards = forwards_create_channel(app, chan);
1045                         if (!forwards) {
1046                                 return -1;
1047                         }
1048
1049                         res = ao2_link_flags(app->forwards, forwards,
1050                                 OBJ_NOLOCK);
1051                         if (!res) {
1052                                 return -1;
1053                         }
1054                 }
1055
1056                 ++forwards->interested;
1057                 ast_debug(3, "Channel '%s' is %d interested in %s\n", ast_channel_uniqueid(chan), forwards->interested, app->name);
1058                 return 0;
1059         }
1060 }
1061
1062 static int subscribe_channel(struct stasis_app *app, void *obj)
1063 {
1064         return app_subscribe_channel(app, obj);
1065 }
1066
1067 static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate)
1068 {
1069         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
1070         SCOPED_AO2LOCK(lock, app->forwards);
1071
1072         forwards = ao2_find(app->forwards, id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1073         if (!forwards) {
1074                 ast_debug(3, "App '%s' not subscribed to %s '%s'\n", app->name, kind, id);
1075                 return -1;
1076         }
1077         forwards->interested--;
1078
1079         ast_debug(3, "%s '%s': is %d interested in %s\n", kind, id, forwards->interested, app->name);
1080         if (forwards->interested == 0 || terminate) {
1081                 /* No one is interested any more; unsubscribe */
1082                 ast_debug(3, "%s '%s' unsubscribed from %s\n", kind, id, app->name);
1083                 forwards_unsubscribe(forwards);
1084                 ao2_find(app->forwards, forwards,
1085                         OBJ_POINTER | OBJ_NOLOCK | OBJ_UNLINK |
1086                         OBJ_NODATA);
1087
1088                 if (!strcmp(kind, "endpoint")) {
1089                         messaging_app_unsubscribe_endpoint(app->name, id);
1090                 }
1091         }
1092
1093         return 0;
1094 }
1095
1096 int app_unsubscribe_channel(struct stasis_app *app, struct ast_channel *chan)
1097 {
1098         if (!app || !chan) {
1099                 return -1;
1100         }
1101
1102         return app_unsubscribe_channel_id(app, ast_channel_uniqueid(chan));
1103 }
1104
1105 int app_unsubscribe_channel_id(struct stasis_app *app, const char *channel_id)
1106 {
1107         if (!app || !channel_id) {
1108                 return -1;
1109         }
1110
1111         return unsubscribe(app, "channel", channel_id, 0);
1112 }
1113
1114 int app_is_subscribed_channel_id(struct stasis_app *app, const char *channel_id)
1115 {
1116         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
1117         forwards = ao2_find(app->forwards, channel_id, OBJ_SEARCH_KEY);
1118         return forwards != NULL;
1119 }
1120
1121 static void *channel_find(const struct stasis_app *app, const char *id)
1122 {
1123         return ast_channel_get_by_name(id);
1124 }
1125
1126 struct stasis_app_event_source channel_event_source = {
1127         .scheme = "channel:",
1128         .find = channel_find,
1129         .subscribe = subscribe_channel,
1130         .unsubscribe = app_unsubscribe_channel_id,
1131         .is_subscribed = app_is_subscribed_channel_id
1132 };
1133
1134 int app_subscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
1135 {
1136         if (!app || !bridge) {
1137                 return -1;
1138         } else {
1139                 RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
1140                 SCOPED_AO2LOCK(lock, app->forwards);
1141
1142                 forwards = ao2_find(app->forwards, bridge->uniqueid,
1143                         OBJ_SEARCH_KEY | OBJ_NOLOCK);
1144
1145                 if (!forwards) {
1146                         /* Forwards not found, create one */
1147                         forwards = forwards_create_bridge(app, bridge);
1148                         if (!forwards) {
1149                                 return -1;
1150                         }
1151                         ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
1152                 }
1153
1154                 ++forwards->interested;
1155                 ast_debug(3, "Bridge '%s' is %d interested in %s\n", bridge->uniqueid, forwards->interested, app->name);
1156                 return 0;
1157         }
1158 }
1159
1160 static int subscribe_bridge(struct stasis_app *app, void *obj)
1161 {
1162         return app_subscribe_bridge(app, obj);
1163 }
1164
1165 int app_unsubscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
1166 {
1167         if (!app || !bridge) {
1168                 return -1;
1169         }
1170
1171         return app_unsubscribe_bridge_id(app, bridge->uniqueid);
1172 }
1173
1174 int app_unsubscribe_bridge_id(struct stasis_app *app, const char *bridge_id)
1175 {
1176         if (!app || !bridge_id) {
1177                 return -1;
1178         }
1179
1180         return unsubscribe(app, "bridge", bridge_id, 0);
1181 }
1182
1183 int app_is_subscribed_bridge_id(struct stasis_app *app, const char *bridge_id)
1184 {
1185         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
1186         forwards = ao2_find(app->forwards, bridge_id, OBJ_SEARCH_KEY);
1187         return forwards != NULL;
1188 }
1189
1190 static void *bridge_find(const struct stasis_app *app, const char *id)
1191 {
1192         return stasis_app_bridge_find_by_id(id);
1193 }
1194
1195 struct stasis_app_event_source bridge_event_source = {
1196         .scheme = "bridge:",
1197         .find = bridge_find,
1198         .subscribe = subscribe_bridge,
1199         .unsubscribe = app_unsubscribe_bridge_id,
1200         .is_subscribed = app_is_subscribed_bridge_id
1201 };
1202
1203 int app_subscribe_endpoint(struct stasis_app *app, struct ast_endpoint *endpoint)
1204 {
1205         if (!app || !endpoint) {
1206                 return -1;
1207         } else {
1208                 RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
1209                 SCOPED_AO2LOCK(lock, app->forwards);
1210
1211                 forwards = ao2_find(app->forwards, ast_endpoint_get_id(endpoint),
1212                         OBJ_SEARCH_KEY | OBJ_NOLOCK);
1213
1214                 if (!forwards) {
1215                         /* Forwards not found, create one */
1216                         forwards = forwards_create_endpoint(app, endpoint);
1217                         if (!forwards) {
1218                                 return -1;
1219                         }
1220                         ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
1221
1222                         /* Subscribe for messages */
1223                         messaging_app_subscribe_endpoint(app->name, endpoint, &message_received_handler, app);
1224                 }
1225
1226                 ++forwards->interested;
1227                 ast_debug(3, "Endpoint '%s' is %d interested in %s\n", ast_endpoint_get_id(endpoint), forwards->interested, app->name);
1228                 return 0;
1229         }
1230 }
1231
1232 static int subscribe_endpoint(struct stasis_app *app, void *obj)
1233 {
1234         return app_subscribe_endpoint(app, obj);
1235 }
1236
1237 int app_unsubscribe_endpoint_id(struct stasis_app *app, const char *endpoint_id)
1238 {
1239         if (!app || !endpoint_id) {
1240                 return -1;
1241         }
1242
1243         return unsubscribe(app, "endpoint", endpoint_id, 0);
1244 }
1245
1246 int app_is_subscribed_endpoint_id(struct stasis_app *app, const char *endpoint_id)
1247 {
1248         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
1249         forwards = ao2_find(app->forwards, endpoint_id, OBJ_SEARCH_KEY);
1250         return forwards != NULL;
1251 }
1252
1253 static void *endpoint_find(const struct stasis_app *app, const char *id)
1254 {
1255         return ast_endpoint_find_by_id(id);
1256 }
1257
1258 struct stasis_app_event_source endpoint_event_source = {
1259         .scheme = "endpoint:",
1260         .find = endpoint_find,
1261         .subscribe = subscribe_endpoint,
1262         .unsubscribe = app_unsubscribe_endpoint_id,
1263         .is_subscribed = app_is_subscribed_endpoint_id
1264 };
1265
1266 void stasis_app_register_event_sources(void)
1267 {
1268         stasis_app_register_event_source(&channel_event_source);
1269         stasis_app_register_event_source(&bridge_event_source);
1270         stasis_app_register_event_source(&endpoint_event_source);
1271 }
1272
1273 int stasis_app_is_core_event_source(struct stasis_app_event_source *obj)
1274 {
1275         return obj == &endpoint_event_source ||
1276                 obj == &bridge_event_source ||
1277                 obj == &channel_event_source;
1278 }
1279
1280 void stasis_app_unregister_event_sources(void)
1281 {
1282         stasis_app_unregister_event_source(&endpoint_event_source);
1283         stasis_app_unregister_event_source(&bridge_event_source);
1284         stasis_app_unregister_event_source(&channel_event_source);
1285 }
1286
1287