Fix race condition that could result in ARI transfer messages not being sent.
[asterisk/asterisk.git] / res / stasis / app.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis application support.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 #include "asterisk.h"
27
28 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
29
30 #include "app.h"
31 #include "control.h"
32 #include "messaging.h"
33
34 #include "asterisk/callerid.h"
35 #include "asterisk/stasis_app.h"
36 #include "asterisk/stasis_bridges.h"
37 #include "asterisk/stasis_channels.h"
38 #include "asterisk/stasis_endpoints.h"
39 #include "asterisk/stasis_message_router.h"
40
41 static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate);
42
43 struct stasis_app {
44         /*! Aggregation topic for this application. */
45         struct stasis_topic *topic;
46         /*! Router for handling messages forwarded to \a topic. */
47         struct stasis_message_router *router;
48         /*! Router for handling messages to the bridge all \a topic. */
49         struct stasis_message_router *bridge_router;
50         /*! Container of the channel forwards to this app's topic. */
51         struct ao2_container *forwards;
52         /*! Callback function for this application. */
53         stasis_app_cb handler;
54         /*! Opaque data to hand to callback function. */
55         void *data;
56         /*! Name of the Stasis application */
57         char name[];
58 };
59
60 enum forward_type {
61         FORWARD_CHANNEL,
62         FORWARD_BRIDGE,
63         FORWARD_ENDPOINT,
64 };
65
66 /*! Subscription info for a particular channel/bridge. */
67 struct app_forwards {
68         /*! Count of number of times this channel/bridge has been subscribed */
69         int interested;
70
71         /*! Forward for the regular topic */
72         struct stasis_forward *topic_forward;
73         /*! Forward for the caching topic */
74         struct stasis_forward *topic_cached_forward;
75
76         /* Type of object being forwarded */
77         enum forward_type forward_type;
78         /*! Unique id of the object being forwarded */
79         char id[];
80 };
81
82 static void forwards_dtor(void *obj)
83 {
84 #ifdef AST_DEVMODE
85         struct app_forwards *forwards = obj;
86 #endif /* AST_DEVMODE */
87
88         ast_assert(forwards->topic_forward == NULL);
89         ast_assert(forwards->topic_cached_forward == NULL);
90 }
91
92 static void forwards_unsubscribe(struct app_forwards *forwards)
93 {
94         stasis_forward_cancel(forwards->topic_forward);
95         forwards->topic_forward = NULL;
96         stasis_forward_cancel(forwards->topic_cached_forward);
97         forwards->topic_cached_forward = NULL;
98 }
99
100 static struct app_forwards *forwards_create(struct stasis_app *app,
101         const char *id)
102 {
103         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
104
105         if (!app || ast_strlen_zero(id)) {
106                 return NULL;
107         }
108
109         forwards = ao2_alloc(sizeof(*forwards) + strlen(id) + 1, forwards_dtor);
110         if (!forwards) {
111                 return NULL;
112         }
113
114         strcpy(forwards->id, id);
115
116         ao2_ref(forwards, +1);
117         return forwards;
118 }
119
120 /*! Forward a channel's topics to an app */
121 static struct app_forwards *forwards_create_channel(struct stasis_app *app,
122         struct ast_channel *chan)
123 {
124         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
125
126         if (!app || !chan) {
127                 return NULL;
128         }
129
130         forwards = forwards_create(app, ast_channel_uniqueid(chan));
131         if (!forwards) {
132                 return NULL;
133         }
134
135         forwards->forward_type = FORWARD_CHANNEL;
136         forwards->topic_forward = stasis_forward_all(ast_channel_topic(chan),
137                 app->topic);
138         if (!forwards->topic_forward) {
139                 return NULL;
140         }
141
142         forwards->topic_cached_forward = stasis_forward_all(
143                 ast_channel_topic_cached(chan), app->topic);
144         if (!forwards->topic_cached_forward) {
145                 /* Half-subscribed is a bad thing */
146                 stasis_forward_cancel(forwards->topic_forward);
147                 forwards->topic_forward = NULL;
148                 return NULL;
149         }
150
151         ao2_ref(forwards, +1);
152         return forwards;
153 }
154
155 /*! Forward a bridge's topics to an app */
156 static struct app_forwards *forwards_create_bridge(struct stasis_app *app,
157         struct ast_bridge *bridge)
158 {
159         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
160
161         if (!app || !bridge) {
162                 return NULL;
163         }
164
165         forwards = forwards_create(app, bridge->uniqueid);
166         if (!forwards) {
167                 return NULL;
168         }
169
170         forwards->forward_type = FORWARD_BRIDGE;
171         forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge),
172                 app->topic);
173         if (!forwards->topic_forward) {
174                 return NULL;
175         }
176
177         forwards->topic_cached_forward = stasis_forward_all(
178                 ast_bridge_topic_cached(bridge), app->topic);
179         if (!forwards->topic_cached_forward) {
180                 /* Half-subscribed is a bad thing */
181                 stasis_forward_cancel(forwards->topic_forward);
182                 forwards->topic_forward = NULL;
183                 return NULL;
184         }
185
186         ao2_ref(forwards, +1);
187         return forwards;
188 }
189
190 /*! Forward a endpoint's topics to an app */
191 static struct app_forwards *forwards_create_endpoint(struct stasis_app *app,
192         struct ast_endpoint *endpoint)
193 {
194         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
195
196         if (!app || !endpoint) {
197                 return NULL;
198         }
199
200         forwards = forwards_create(app, ast_endpoint_get_id(endpoint));
201         if (!forwards) {
202                 return NULL;
203         }
204
205         forwards->forward_type = FORWARD_ENDPOINT;
206         forwards->topic_forward = stasis_forward_all(ast_endpoint_topic(endpoint),
207                 app->topic);
208         if (!forwards->topic_forward) {
209                 return NULL;
210         }
211
212         forwards->topic_cached_forward = stasis_forward_all(
213                 ast_endpoint_topic_cached(endpoint), app->topic);
214         if (!forwards->topic_cached_forward) {
215                 /* Half-subscribed is a bad thing */
216                 stasis_forward_cancel(forwards->topic_forward);
217                 forwards->topic_forward = NULL;
218                 return NULL;
219         }
220
221         ao2_ref(forwards, +1);
222         return forwards;
223 }
224
225 static int forwards_sort(const void *obj_left, const void *obj_right, int flags)
226 {
227         const struct app_forwards *object_left = obj_left;
228         const struct app_forwards *object_right = obj_right;
229         const char *right_key = obj_right;
230         int cmp;
231
232         switch (flags & (OBJ_POINTER | OBJ_KEY | OBJ_PARTIAL_KEY)) {
233         case OBJ_POINTER:
234                 right_key = object_right->id;
235                 /* Fall through */
236         case OBJ_KEY:
237                 cmp = strcmp(object_left->id, right_key);
238                 break;
239         case OBJ_PARTIAL_KEY:
240                 /*
241                  * We could also use a partial key struct containing a length
242                  * so strlen() does not get called for every comparison instead.
243                  */
244                 cmp = strncmp(object_left->id, right_key, strlen(right_key));
245                 break;
246         default:
247                 /* Sort can only work on something with a full or partial key. */
248                 ast_assert(0);
249                 cmp = 0;
250                 break;
251         }
252         return cmp;
253 }
254
255 static void app_dtor(void *obj)
256 {
257         struct stasis_app *app = obj;
258
259         ast_verb(1, "Destroying Stasis app %s\n", app->name);
260
261         ast_assert(app->router == NULL);
262         ast_assert(app->bridge_router == NULL);
263
264         ao2_cleanup(app->topic);
265         app->topic = NULL;
266         ao2_cleanup(app->forwards);
267         app->forwards = NULL;
268         ao2_cleanup(app->data);
269         app->data = NULL;
270 }
271
272 static void call_forwarded_handler(struct stasis_app *app, struct stasis_message *message)
273 {
274         struct ast_multi_channel_blob *payload = stasis_message_data(message);
275         struct ast_channel_snapshot *snapshot = ast_multi_channel_blob_get_channel(payload, "forwarded");
276         struct ast_channel *chan;
277
278         if (!snapshot) {
279                 return;
280         }
281
282         chan = ast_channel_get_by_name(snapshot->uniqueid);
283         if (!chan) {
284                 return;
285         }
286
287         app_subscribe_channel(app, chan);
288         ast_channel_unref(chan);
289 }
290
291 static void sub_default_handler(void *data, struct stasis_subscription *sub,
292         struct stasis_message *message)
293 {
294         struct stasis_app *app = data;
295         RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
296
297         if (stasis_subscription_final_message(sub, message)) {
298                 ao2_cleanup(app);
299         }
300
301         if (stasis_message_type(message) == ast_channel_dial_type()) {
302                 call_forwarded_handler(app, message);
303         }
304
305         if (stasis_message_type(message) == app_end_message_type()) {
306                 app_end_message_handler(message);
307         }
308
309         /* By default, send any message that has a JSON representation */
310         json = stasis_message_to_json(message, stasis_app_get_sanitizer());
311         if (!json) {
312                 return;
313         }
314
315         app_send(app, json);
316 }
317
318 /*! \brief Typedef for callbacks that get called on channel snapshot updates */
319 typedef struct ast_json *(*channel_snapshot_monitor)(
320         struct ast_channel_snapshot *old_snapshot,
321         struct ast_channel_snapshot *new_snapshot,
322         const struct timeval *tv);
323
324 static struct ast_json *simple_channel_event(
325         const char *type,
326         struct ast_channel_snapshot *snapshot,
327         const struct timeval *tv)
328 {
329         struct ast_json *json_channel = ast_channel_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
330
331         if (!json_channel) {
332                 return NULL;
333         }
334
335         return ast_json_pack("{s: s, s: o, s: o}",
336                 "type", type,
337                 "timestamp", ast_json_timeval(*tv, NULL),
338                 "channel", json_channel);
339 }
340
341 static struct ast_json *channel_created_event(
342         struct ast_channel_snapshot *snapshot,
343         const struct timeval *tv)
344 {
345         return simple_channel_event("ChannelCreated", snapshot, tv);
346 }
347
348 static struct ast_json *channel_destroyed_event(
349         struct ast_channel_snapshot *snapshot,
350         const struct timeval *tv)
351 {
352         struct ast_json *json_channel = ast_channel_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
353
354         if (!json_channel) {
355                 return NULL;
356         }
357
358         return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
359                 "type", "ChannelDestroyed",
360                 "timestamp", ast_json_timeval(*tv, NULL),
361                 "cause", snapshot->hangupcause,
362                 "cause_txt", ast_cause2str(snapshot->hangupcause),
363                 "channel", json_channel);
364 }
365
366 static struct ast_json *channel_state_change_event(
367         struct ast_channel_snapshot *snapshot,
368         const struct timeval *tv)
369 {
370         return simple_channel_event("ChannelStateChange", snapshot, tv);
371 }
372
373 /*! \brief Handle channel state changes */
374 static struct ast_json *channel_state(
375         struct ast_channel_snapshot *old_snapshot,
376         struct ast_channel_snapshot *new_snapshot,
377         const struct timeval *tv)
378 {
379         struct ast_channel_snapshot *snapshot = new_snapshot ?
380                 new_snapshot : old_snapshot;
381
382         if (!old_snapshot) {
383                 return channel_created_event(snapshot, tv);
384         } else if (!new_snapshot) {
385                 return channel_destroyed_event(snapshot, tv);
386         } else if (old_snapshot->state != new_snapshot->state) {
387                 return channel_state_change_event(snapshot, tv);
388         }
389
390         return NULL;
391 }
392
393 static struct ast_json *channel_dialplan(
394         struct ast_channel_snapshot *old_snapshot,
395         struct ast_channel_snapshot *new_snapshot,
396         const struct timeval *tv)
397 {
398         struct ast_json *json_channel;
399
400         /* No Newexten event on cache clear or first event */
401         if (!old_snapshot || !new_snapshot) {
402                 return NULL;
403         }
404
405         /* Empty application is not valid for a Newexten event */
406         if (ast_strlen_zero(new_snapshot->appl)) {
407                 return NULL;
408         }
409
410         if (ast_channel_snapshot_cep_equal(old_snapshot, new_snapshot)) {
411                 return NULL;
412         }
413
414         json_channel = ast_channel_snapshot_to_json(new_snapshot, stasis_app_get_sanitizer());
415         if (!json_channel) {
416                 return NULL;
417         }
418
419         return ast_json_pack("{s: s, s: o, s: s, s: s, s: o}",
420                 "type", "ChannelDialplan",
421                 "timestamp", ast_json_timeval(*tv, NULL),
422                 "dialplan_app", new_snapshot->appl,
423                 "dialplan_app_data", new_snapshot->data,
424                 "channel", json_channel);
425 }
426
427 static struct ast_json *channel_callerid(
428         struct ast_channel_snapshot *old_snapshot,
429         struct ast_channel_snapshot *new_snapshot,
430         const struct timeval *tv)
431 {
432         struct ast_json *json_channel;
433
434         /* No NewCallerid event on cache clear or first event */
435         if (!old_snapshot || !new_snapshot) {
436                 return NULL;
437         }
438
439         if (ast_channel_snapshot_caller_id_equal(old_snapshot, new_snapshot)) {
440                 return NULL;
441         }
442
443         json_channel = ast_channel_snapshot_to_json(new_snapshot, stasis_app_get_sanitizer());
444         if (!json_channel) {
445                 return NULL;
446         }
447
448         return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
449                 "type", "ChannelCallerId",
450                 "timestamp", ast_json_timeval(*tv, NULL),
451                 "caller_presentation", new_snapshot->caller_pres,
452                 "caller_presentation_txt", ast_describe_caller_presentation(
453                         new_snapshot->caller_pres),
454                 "channel", json_channel);
455 }
456
457 static channel_snapshot_monitor channel_monitors[] = {
458         channel_state,
459         channel_dialplan,
460         channel_callerid,
461 };
462
463 static void sub_channel_update_handler(void *data,
464         struct stasis_subscription *sub,
465         struct stasis_message *message)
466 {
467         struct stasis_app *app = data;
468         struct stasis_cache_update *update;
469         struct ast_channel_snapshot *new_snapshot;
470         struct ast_channel_snapshot *old_snapshot;
471         const struct timeval *tv;
472         int i;
473
474         ast_assert(stasis_message_type(message) == stasis_cache_update_type());
475
476         update = stasis_message_data(message);
477
478         ast_assert(update->type == ast_channel_snapshot_type());
479
480         new_snapshot = stasis_message_data(update->new_snapshot);
481         old_snapshot = stasis_message_data(update->old_snapshot);
482
483         /* Pull timestamp from the new snapshot, or from the update message
484          * when there isn't one. */
485         tv = update->new_snapshot ?
486                 stasis_message_timestamp(update->new_snapshot) :
487                 stasis_message_timestamp(message);
488
489         for (i = 0; i < ARRAY_LEN(channel_monitors); ++i) {
490                 RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
491
492                 msg = channel_monitors[i](old_snapshot, new_snapshot, tv);
493                 if (msg) {
494                         app_send(app, msg);
495                 }
496         }
497
498         if (!new_snapshot && old_snapshot) {
499                 unsubscribe(app, "channel", old_snapshot->uniqueid, 1);
500         }
501 }
502
503 static struct ast_json *simple_endpoint_event(
504         const char *type,
505         struct ast_endpoint_snapshot *snapshot,
506         const struct timeval *tv)
507 {
508         struct ast_json *json_endpoint = ast_endpoint_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
509
510         if (!json_endpoint) {
511                 return NULL;
512         }
513
514         return ast_json_pack("{s: s, s: o, s: o}",
515                 "type", type,
516                 "timestamp", ast_json_timeval(*tv, NULL),
517                 "endpoint", json_endpoint);
518 }
519
520 static int message_received_handler(const char *endpoint_id, struct ast_json *json_msg, void *pvt)
521 {
522         RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
523         struct ast_json *json_endpoint;
524         struct stasis_app *app = pvt;
525         char *tech;
526         char *resource;
527
528         tech = ast_strdupa(endpoint_id);
529         resource = strchr(tech, '/');
530         if (resource) {
531                 resource[0] = '\0';
532                 resource++;
533         }
534
535         if (ast_strlen_zero(tech) || ast_strlen_zero(resource)) {
536                 return -1;
537         }
538
539         snapshot = ast_endpoint_latest_snapshot(tech, resource);
540         if (!snapshot) {
541                 return -1;
542         }
543
544         json_endpoint = ast_endpoint_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
545         if (!json_endpoint) {
546                 return -1;
547         }
548
549         app_send(app, ast_json_pack("{s: s, s: o, s: o, s: O}",
550                 "type", "TextMessageReceived",
551                 "timestamp", ast_json_timeval(ast_tvnow(), NULL),
552                 "endpoint", json_endpoint,
553                 "message", json_msg));
554
555         return 0;
556 }
557
558 static void sub_endpoint_update_handler(void *data,
559         struct stasis_subscription *sub,
560         struct stasis_message *message)
561 {
562         RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
563         struct stasis_app *app = data;
564         struct stasis_cache_update *update;
565         struct ast_endpoint_snapshot *new_snapshot;
566         struct ast_endpoint_snapshot *old_snapshot;
567         const struct timeval *tv;
568
569         ast_assert(stasis_message_type(message) == stasis_cache_update_type());
570
571         update = stasis_message_data(message);
572
573         ast_assert(update->type == ast_endpoint_snapshot_type());
574
575         new_snapshot = stasis_message_data(update->new_snapshot);
576         old_snapshot = stasis_message_data(update->old_snapshot);
577
578         if (new_snapshot) {
579                 tv = stasis_message_timestamp(update->new_snapshot);
580
581                 json = simple_endpoint_event("EndpointStateChange", new_snapshot, tv);
582                 if (!json) {
583                         return;
584                 }
585
586                 app_send(app, json);
587         }
588
589         if (!new_snapshot && old_snapshot) {
590                 unsubscribe(app, "endpoint", old_snapshot->id, 1);
591         }
592 }
593
594 static struct ast_json *simple_bridge_event(
595         const char *type,
596         struct ast_bridge_snapshot *snapshot,
597         const struct timeval *tv)
598 {
599         struct ast_json *json_bridge = ast_bridge_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
600         if (!json_bridge) {
601                 return NULL;
602         }
603
604         return ast_json_pack("{s: s, s: o, s: o}",
605                 "type", type,
606                 "timestamp", ast_json_timeval(*tv, NULL),
607                 "bridge", json_bridge);
608 }
609
610 static void sub_bridge_update_handler(void *data,
611         struct stasis_subscription *sub,
612         struct stasis_message *message)
613 {
614         RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
615         struct stasis_app *app = data;
616         struct stasis_cache_update *update;
617         struct ast_bridge_snapshot *new_snapshot;
618         struct ast_bridge_snapshot *old_snapshot;
619         const struct timeval *tv;
620
621         ast_assert(stasis_message_type(message) == stasis_cache_update_type());
622
623         update = stasis_message_data(message);
624
625         ast_assert(update->type == ast_bridge_snapshot_type());
626
627         new_snapshot = stasis_message_data(update->new_snapshot);
628         old_snapshot = stasis_message_data(update->old_snapshot);
629         tv = update->new_snapshot ?
630                 stasis_message_timestamp(update->new_snapshot) :
631                 stasis_message_timestamp(message);
632
633         if (!new_snapshot) {
634                 json = simple_bridge_event("BridgeDestroyed", old_snapshot, tv);
635         } else if (!old_snapshot) {
636                 json = simple_bridge_event("BridgeCreated", new_snapshot, tv);
637         }
638
639         if (json) {
640                 app_send(app, json);
641         }
642
643         if (!new_snapshot && old_snapshot) {
644                 unsubscribe(app, "bridge", old_snapshot->uniqueid, 1);
645         }
646 }
647
648
649 /*! \brief Helper function for determining if the application is subscribed to a given entity */
650 static int bridge_app_subscribed(struct stasis_app *app, const char *uniqueid)
651 {
652         struct app_forwards *forwards = NULL;
653
654         forwards = ao2_find(app->forwards, uniqueid, OBJ_SEARCH_KEY);
655         if (!forwards) {
656                 return 0;
657         }
658
659         ao2_ref(forwards, -1);
660         return 1;
661 }
662
663 static void bridge_merge_handler(void *data, struct stasis_subscription *sub,
664         struct stasis_message *message)
665 {
666         struct stasis_app *app = data;
667         struct ast_bridge_merge_message *merge;
668
669         merge = stasis_message_data(message);
670
671         /* Find out if we're subscribed to either bridge */
672         if (bridge_app_subscribed(app, merge->from->uniqueid) ||
673                 bridge_app_subscribed(app, merge->to->uniqueid)) {
674                 /* Forward the message to the app */
675                 stasis_publish(app->topic, message);
676         }
677 }
678
679 /*! \brief Callback function for checking if channels in a bridge are subscribed to */
680 static int bridge_app_subscribed_involved(struct stasis_app *app, struct ast_bridge_snapshot *snapshot)
681 {
682         int subscribed = 0;
683         struct ao2_iterator iter;
684         char *uniqueid;
685
686         if (bridge_app_subscribed(app, snapshot->uniqueid)) {
687                 return 1;
688         }
689
690         iter = ao2_iterator_init(snapshot->channels, 0);
691         for (; (uniqueid = ao2_iterator_next(&iter)); ao2_ref(uniqueid, -1)) {
692                 if (bridge_app_subscribed(app, uniqueid)) {
693                         subscribed = 1;
694                         ao2_ref(uniqueid, -1);
695                         break;
696                 }
697         }
698         ao2_iterator_destroy(&iter);
699
700         return subscribed;
701 }
702
703 static void set_replacement_channel(struct ast_channel_snapshot *to_be_replaced,
704                 struct ast_channel_snapshot *replacing)
705 {
706         struct stasis_app_control *control = stasis_app_control_find_by_channel_id(
707                 to_be_replaced->uniqueid);
708         struct ast_channel *chan = ast_channel_get_by_name(replacing->uniqueid);
709
710         if (control && chan) {
711                 ast_channel_lock(chan);
712                 app_set_replace_channel_app(chan, app_name(control_app(control)));
713                 app_set_replace_channel_snapshot(chan, to_be_replaced);
714                 ast_channel_unlock(chan);
715         }
716         ast_channel_cleanup(chan);
717         ao2_cleanup(control);
718 }
719
720 static void bridge_blind_transfer_handler(void *data, struct stasis_subscription *sub,
721         struct stasis_message *message)
722 {
723         struct stasis_app *app = data;
724         struct ast_blind_transfer_message *transfer_msg = stasis_message_data(message);
725         struct ast_bridge_snapshot *bridge = transfer_msg->bridge;
726
727         if (transfer_msg->replace_channel) {
728                 set_replacement_channel(transfer_msg->transferer, transfer_msg->replace_channel);
729         }
730
731         if (bridge_app_subscribed(app, transfer_msg->transferer->uniqueid) ||
732                 (bridge && bridge_app_subscribed_involved(app, bridge))) {
733                 stasis_publish(app->topic, message);
734         }
735 }
736
737 static void bridge_attended_transfer_handler(void *data, struct stasis_subscription *sub,
738         struct stasis_message *message)
739 {
740         struct stasis_app *app = data;
741         struct ast_attended_transfer_message *transfer_msg = stasis_message_data(message);
742         int subscribed = 0;
743
744         subscribed = bridge_app_subscribed(app, transfer_msg->to_transferee.channel_snapshot->uniqueid);
745         if (!subscribed) {
746                 subscribed = bridge_app_subscribed(app, transfer_msg->to_transfer_target.channel_snapshot->uniqueid);
747         }
748         if (!subscribed && transfer_msg->to_transferee.bridge_snapshot) {
749                 subscribed = bridge_app_subscribed_involved(app, transfer_msg->to_transferee.bridge_snapshot);
750         }
751         if (!subscribed && transfer_msg->to_transfer_target.bridge_snapshot) {
752                 subscribed = bridge_app_subscribed_involved(app, transfer_msg->to_transfer_target.bridge_snapshot);
753         }
754
755         if (!subscribed) {
756                 switch (transfer_msg->dest_type) {
757                 case AST_ATTENDED_TRANSFER_DEST_BRIDGE_MERGE:
758                         subscribed = bridge_app_subscribed(app, transfer_msg->dest.bridge);
759                         break;
760                 case AST_ATTENDED_TRANSFER_DEST_LINK:
761                         subscribed = bridge_app_subscribed(app, transfer_msg->dest.links[0]->uniqueid);
762                         if (!subscribed) {
763                                 subscribed = bridge_app_subscribed(app, transfer_msg->dest.links[1]->uniqueid);
764                         }
765                         break;
766                 break;
767                 case AST_ATTENDED_TRANSFER_DEST_THREEWAY:
768                         subscribed = bridge_app_subscribed_involved(app, transfer_msg->dest.threeway.bridge_snapshot);
769                         if (!subscribed) {
770                                 subscribed = bridge_app_subscribed(app, transfer_msg->dest.threeway.channel_snapshot->uniqueid);
771                         }
772                         break;
773                 default:
774                         break;
775                 }
776         }
777
778         if (subscribed) {
779                 stasis_publish(app->topic, message);
780         }
781
782         if (transfer_msg->replace_channel) {
783                 set_replacement_channel(transfer_msg->to_transferee.channel_snapshot,
784                                 transfer_msg->replace_channel);
785         }
786
787         if (transfer_msg->dest_type == AST_ATTENDED_TRANSFER_DEST_LINK) {
788                 set_replacement_channel(transfer_msg->to_transferee.channel_snapshot,
789                                 transfer_msg->dest.links[0]);
790                 set_replacement_channel(transfer_msg->to_transfer_target.channel_snapshot,
791                                 transfer_msg->dest.links[1]);
792         }
793 }
794
795 static void bridge_default_handler(void *data, struct stasis_subscription *sub,
796         struct stasis_message *message)
797 {
798         struct stasis_app *app = data;
799
800         if (stasis_subscription_final_message(sub, message)) {
801                 ao2_cleanup(app);
802         }
803 }
804
805 struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *data)
806 {
807         RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
808         size_t size;
809         int res = 0;
810
811         ast_assert(name != NULL);
812         ast_assert(handler != NULL);
813
814         ast_verb(1, "Creating Stasis app '%s'\n", name);
815
816         size = sizeof(*app) + strlen(name) + 1;
817         app = ao2_alloc_options(size, app_dtor, AO2_ALLOC_OPT_LOCK_MUTEX);
818
819         if (!app) {
820                 return NULL;
821         }
822
823         app->forwards = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_MUTEX,
824                 AO2_CONTAINER_ALLOC_OPT_DUPS_OBJ_REJECT,
825                 forwards_sort, NULL);
826         if (!app->forwards) {
827                 return NULL;
828         }
829
830         app->topic = stasis_topic_create(name);
831         if (!app->topic) {
832                 return NULL;
833         }
834
835         app->bridge_router = stasis_message_router_create(ast_bridge_topic_all());
836         if (!app->bridge_router) {
837                 return NULL;
838         }
839
840         res |= stasis_message_router_add(app->bridge_router,
841                 ast_bridge_merge_message_type(), bridge_merge_handler, app);
842
843         res |= stasis_message_router_add(app->bridge_router,
844                 ast_blind_transfer_type(), bridge_blind_transfer_handler, app);
845
846         res |= stasis_message_router_add(app->bridge_router,
847                 ast_attended_transfer_type(), bridge_attended_transfer_handler, app);
848
849         res |= stasis_message_router_set_default(app->bridge_router,
850                 bridge_default_handler, app);
851
852         if (res != 0) {
853                 return NULL;
854         }
855         /* Bridge router holds a reference */
856         ao2_ref(app, +1);
857
858         app->router = stasis_message_router_create(app->topic);
859         if (!app->router) {
860                 return NULL;
861         }
862
863         res |= stasis_message_router_add_cache_update(app->router,
864                 ast_bridge_snapshot_type(), sub_bridge_update_handler, app);
865
866         res |= stasis_message_router_add_cache_update(app->router,
867                 ast_channel_snapshot_type(), sub_channel_update_handler, app);
868
869         res |= stasis_message_router_add_cache_update(app->router,
870                 ast_endpoint_snapshot_type(), sub_endpoint_update_handler, app);
871
872         res |= stasis_message_router_set_default(app->router,
873                 sub_default_handler, app);
874
875         if (res != 0) {
876                 return NULL;
877         }
878         /* Router holds a reference */
879         ao2_ref(app, +1);
880
881         strncpy(app->name, name, size - sizeof(*app));
882         app->handler = handler;
883         ao2_ref(data, +1);
884         app->data = data;
885
886         ao2_ref(app, +1);
887         return app;
888 }
889
890 struct stasis_topic *ast_app_get_topic(struct stasis_app *app) {
891         return app->topic;
892 }
893
894 /*!
895  * \brief Send a message to the given application.
896  * \param app App to send the message to.
897  * \param message Message to send.
898  */
899 void app_send(struct stasis_app *app, struct ast_json *message)
900 {
901         stasis_app_cb handler;
902         RAII_VAR(void *, data, NULL, ao2_cleanup);
903
904         /* Copy off mutable state with lock held */
905         {
906                 SCOPED_AO2LOCK(lock, app);
907                 handler = app->handler;
908                 if (app->data) {
909                         ao2_ref(app->data, +1);
910                         data = app->data;
911                 }
912                 /* Name is immutable; no need to copy */
913         }
914
915         if (!handler) {
916                 ast_verb(3,
917                         "Inactive Stasis app '%s' missed message\n", app->name);
918                 return;
919         }
920
921         handler(data, app->name, message);
922 }
923
924 void app_deactivate(struct stasis_app *app)
925 {
926         SCOPED_AO2LOCK(lock, app);
927         ast_verb(1, "Deactivating Stasis app '%s'\n", app->name);
928         app->handler = NULL;
929         ao2_cleanup(app->data);
930         app->data = NULL;
931 }
932
933 void app_shutdown(struct stasis_app *app)
934 {
935         SCOPED_AO2LOCK(lock, app);
936
937         ast_assert(app_is_finished(app));
938
939         stasis_message_router_unsubscribe(app->router);
940         app->router = NULL;
941         stasis_message_router_unsubscribe(app->bridge_router);
942         app->bridge_router = NULL;
943 }
944
945 int app_is_active(struct stasis_app *app)
946 {
947         SCOPED_AO2LOCK(lock, app);
948         return app->handler != NULL;
949 }
950
951 int app_is_finished(struct stasis_app *app)
952 {
953         SCOPED_AO2LOCK(lock, app);
954
955         return app->handler == NULL && ao2_container_count(app->forwards) == 0;
956 }
957
958 void app_update(struct stasis_app *app, stasis_app_cb handler, void *data)
959 {
960         SCOPED_AO2LOCK(lock, app);
961
962         if (app->handler) {
963                 RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
964
965                 ast_verb(1, "Replacing Stasis app '%s'\n", app->name);
966
967                 msg = ast_json_pack("{s: s, s: s}",
968                         "type", "ApplicationReplaced",
969                         "application", app->name);
970                 if (msg) {
971                         app_send(app, msg);
972                 }
973         } else {
974                 ast_verb(1, "Activating Stasis app '%s'\n", app->name);
975         }
976
977         app->handler = handler;
978         ao2_cleanup(app->data);
979         if (data) {
980                 ao2_ref(data, +1);
981         }
982         app->data = data;
983 }
984
985 const char *app_name(const struct stasis_app *app)
986 {
987         return app->name;
988 }
989
990 struct ast_json *app_to_json(const struct stasis_app *app)
991 {
992         RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
993         struct ast_json *channels;
994         struct ast_json *bridges;
995         struct ast_json *endpoints;
996         struct ao2_iterator i;
997         void *obj;
998
999         json = ast_json_pack("{s: s, s: [], s: [], s: []}",
1000                 "name", app->name,
1001                 "channel_ids", "bridge_ids", "endpoint_ids");
1002         channels = ast_json_object_get(json, "channel_ids");
1003         bridges = ast_json_object_get(json, "bridge_ids");
1004         endpoints = ast_json_object_get(json, "endpoint_ids");
1005
1006         i = ao2_iterator_init(app->forwards, 0);
1007         while ((obj = ao2_iterator_next(&i))) {
1008                 RAII_VAR(struct app_forwards *, forwards, obj, ao2_cleanup);
1009                 RAII_VAR(struct ast_json *, id, NULL, ast_json_unref);
1010                 int append_res = -1;
1011
1012                 id = ast_json_string_create(forwards->id);
1013
1014                 switch (forwards->forward_type) {
1015                 case FORWARD_CHANNEL:
1016                         append_res = ast_json_array_append(channels,
1017                                 ast_json_ref(id));
1018                         break;
1019                 case FORWARD_BRIDGE:
1020                         append_res = ast_json_array_append(bridges,
1021                                 ast_json_ref(id));
1022                         break;
1023                 case FORWARD_ENDPOINT:
1024                         append_res = ast_json_array_append(endpoints,
1025                                 ast_json_ref(id));
1026                         break;
1027                 }
1028
1029                 if (append_res != 0) {
1030                         ast_log(LOG_ERROR, "Error building response\n");
1031                         ao2_iterator_destroy(&i);
1032                         return NULL;
1033                 }
1034         }
1035         ao2_iterator_destroy(&i);
1036
1037         return ast_json_ref(json);
1038 }
1039
1040 int app_subscribe_channel(struct stasis_app *app, struct ast_channel *chan)
1041 {
1042         int res;
1043
1044         if (!app || !chan) {
1045                 return -1;
1046         } else {
1047                 RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
1048                 SCOPED_AO2LOCK(lock, app->forwards);
1049
1050                 forwards = ao2_find(app->forwards, ast_channel_uniqueid(chan),
1051                         OBJ_SEARCH_KEY | OBJ_NOLOCK);
1052                 if (!forwards) {
1053                         /* Forwards not found, create one */
1054                         forwards = forwards_create_channel(app, chan);
1055                         if (!forwards) {
1056                                 return -1;
1057                         }
1058
1059                         res = ao2_link_flags(app->forwards, forwards,
1060                                 OBJ_NOLOCK);
1061                         if (!res) {
1062                                 return -1;
1063                         }
1064                 }
1065
1066                 ++forwards->interested;
1067                 ast_debug(3, "Channel '%s' is %d interested in %s\n", ast_channel_uniqueid(chan), forwards->interested, app->name);
1068                 return 0;
1069         }
1070 }
1071
1072 static int subscribe_channel(struct stasis_app *app, void *obj)
1073 {
1074         return app_subscribe_channel(app, obj);
1075 }
1076
1077 static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate)
1078 {
1079         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
1080         SCOPED_AO2LOCK(lock, app->forwards);
1081
1082         forwards = ao2_find(app->forwards, id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1083         if (!forwards) {
1084                 ast_debug(3, "App '%s' not subscribed to %s '%s'\n", app->name, kind, id);
1085                 return -1;
1086         }
1087         forwards->interested--;
1088
1089         ast_debug(3, "%s '%s': is %d interested in %s\n", kind, id, forwards->interested, app->name);
1090         if (forwards->interested == 0 || terminate) {
1091                 /* No one is interested any more; unsubscribe */
1092                 ast_debug(3, "%s '%s' unsubscribed from %s\n", kind, id, app->name);
1093                 forwards_unsubscribe(forwards);
1094                 ao2_find(app->forwards, forwards,
1095                         OBJ_POINTER | OBJ_NOLOCK | OBJ_UNLINK |
1096                         OBJ_NODATA);
1097
1098                 if (!strcmp(kind, "endpoint")) {
1099                         messaging_app_unsubscribe_endpoint(app->name, id);
1100                 }
1101         }
1102
1103         return 0;
1104 }
1105
1106 int app_unsubscribe_channel(struct stasis_app *app, struct ast_channel *chan)
1107 {
1108         if (!app || !chan) {
1109                 return -1;
1110         }
1111
1112         return app_unsubscribe_channel_id(app, ast_channel_uniqueid(chan));
1113 }
1114
1115 int app_unsubscribe_channel_id(struct stasis_app *app, const char *channel_id)
1116 {
1117         if (!app || !channel_id) {
1118                 return -1;
1119         }
1120
1121         return unsubscribe(app, "channel", channel_id, 0);
1122 }
1123
1124 int app_is_subscribed_channel_id(struct stasis_app *app, const char *channel_id)
1125 {
1126         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
1127         forwards = ao2_find(app->forwards, channel_id, OBJ_SEARCH_KEY);
1128         return forwards != NULL;
1129 }
1130
1131 int app_replace_channel_forwards(struct stasis_app *app, const char *old_id, struct ast_channel *new_chan)
1132 {
1133         RAII_VAR(struct app_forwards *, old_forwards, NULL, ao2_cleanup);
1134         struct app_forwards *new_forwards;
1135
1136         old_forwards = ao2_find(app->forwards, old_id, OBJ_SEARCH_KEY | OBJ_UNLINK);
1137         if (!old_forwards) {
1138                 return -1;
1139         }
1140
1141         new_forwards = forwards_create_channel(app, new_chan);
1142         if (!new_forwards) {
1143                 return -1;
1144         }
1145
1146         new_forwards->interested = old_forwards->interested;
1147         ao2_link_flags(app->forwards, new_forwards, 0);
1148         ao2_cleanup(new_forwards);
1149
1150         /* Clean up old forwards */
1151         forwards_unsubscribe(old_forwards);
1152         return 0;
1153 }
1154
1155 static void *channel_find(const struct stasis_app *app, const char *id)
1156 {
1157         return ast_channel_get_by_name(id);
1158 }
1159
1160 struct stasis_app_event_source channel_event_source = {
1161         .scheme = "channel:",
1162         .find = channel_find,
1163         .subscribe = subscribe_channel,
1164         .unsubscribe = app_unsubscribe_channel_id,
1165         .is_subscribed = app_is_subscribed_channel_id
1166 };
1167
1168 int app_subscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
1169 {
1170         if (!app || !bridge) {
1171                 return -1;
1172         } else {
1173                 RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
1174                 SCOPED_AO2LOCK(lock, app->forwards);
1175
1176                 forwards = ao2_find(app->forwards, bridge->uniqueid,
1177                         OBJ_SEARCH_KEY | OBJ_NOLOCK);
1178
1179                 if (!forwards) {
1180                         /* Forwards not found, create one */
1181                         forwards = forwards_create_bridge(app, bridge);
1182                         if (!forwards) {
1183                                 return -1;
1184                         }
1185                         ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
1186                 }
1187
1188                 ++forwards->interested;
1189                 ast_debug(3, "Bridge '%s' is %d interested in %s\n", bridge->uniqueid, forwards->interested, app->name);
1190                 return 0;
1191         }
1192 }
1193
1194 static int subscribe_bridge(struct stasis_app *app, void *obj)
1195 {
1196         return app_subscribe_bridge(app, obj);
1197 }
1198
1199 int app_unsubscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
1200 {
1201         if (!app || !bridge) {
1202                 return -1;
1203         }
1204
1205         return app_unsubscribe_bridge_id(app, bridge->uniqueid);
1206 }
1207
1208 int app_unsubscribe_bridge_id(struct stasis_app *app, const char *bridge_id)
1209 {
1210         if (!app || !bridge_id) {
1211                 return -1;
1212         }
1213
1214         return unsubscribe(app, "bridge", bridge_id, 0);
1215 }
1216
1217 int app_is_subscribed_bridge_id(struct stasis_app *app, const char *bridge_id)
1218 {
1219         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
1220         forwards = ao2_find(app->forwards, bridge_id, OBJ_SEARCH_KEY);
1221         return forwards != NULL;
1222 }
1223
1224 static void *bridge_find(const struct stasis_app *app, const char *id)
1225 {
1226         return stasis_app_bridge_find_by_id(id);
1227 }
1228
1229 struct stasis_app_event_source bridge_event_source = {
1230         .scheme = "bridge:",
1231         .find = bridge_find,
1232         .subscribe = subscribe_bridge,
1233         .unsubscribe = app_unsubscribe_bridge_id,
1234         .is_subscribed = app_is_subscribed_bridge_id
1235 };
1236
1237 int app_subscribe_endpoint(struct stasis_app *app, struct ast_endpoint *endpoint)
1238 {
1239         if (!app || !endpoint) {
1240                 return -1;
1241         } else {
1242                 RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
1243                 SCOPED_AO2LOCK(lock, app->forwards);
1244
1245                 forwards = ao2_find(app->forwards, ast_endpoint_get_id(endpoint),
1246                         OBJ_SEARCH_KEY | OBJ_NOLOCK);
1247
1248                 if (!forwards) {
1249                         /* Forwards not found, create one */
1250                         forwards = forwards_create_endpoint(app, endpoint);
1251                         if (!forwards) {
1252                                 return -1;
1253                         }
1254                         ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
1255
1256                         /* Subscribe for messages */
1257                         messaging_app_subscribe_endpoint(app->name, endpoint, &message_received_handler, app);
1258                 }
1259
1260                 ++forwards->interested;
1261                 ast_debug(3, "Endpoint '%s' is %d interested in %s\n", ast_endpoint_get_id(endpoint), forwards->interested, app->name);
1262                 return 0;
1263         }
1264 }
1265
1266 static int subscribe_endpoint(struct stasis_app *app, void *obj)
1267 {
1268         return app_subscribe_endpoint(app, obj);
1269 }
1270
1271 int app_unsubscribe_endpoint_id(struct stasis_app *app, const char *endpoint_id)
1272 {
1273         if (!app || !endpoint_id) {
1274                 return -1;
1275         }
1276
1277         return unsubscribe(app, "endpoint", endpoint_id, 0);
1278 }
1279
1280 int app_is_subscribed_endpoint_id(struct stasis_app *app, const char *endpoint_id)
1281 {
1282         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
1283         forwards = ao2_find(app->forwards, endpoint_id, OBJ_SEARCH_KEY);
1284         return forwards != NULL;
1285 }
1286
1287 static void *endpoint_find(const struct stasis_app *app, const char *id)
1288 {
1289         return ast_endpoint_find_by_id(id);
1290 }
1291
1292 struct stasis_app_event_source endpoint_event_source = {
1293         .scheme = "endpoint:",
1294         .find = endpoint_find,
1295         .subscribe = subscribe_endpoint,
1296         .unsubscribe = app_unsubscribe_endpoint_id,
1297         .is_subscribed = app_is_subscribed_endpoint_id
1298 };
1299
1300 void stasis_app_register_event_sources(void)
1301 {
1302         stasis_app_register_event_source(&channel_event_source);
1303         stasis_app_register_event_source(&bridge_event_source);
1304         stasis_app_register_event_source(&endpoint_event_source);
1305 }
1306
1307 int stasis_app_is_core_event_source(struct stasis_app_event_source *obj)
1308 {
1309         return obj == &endpoint_event_source ||
1310                 obj == &bridge_event_source ||
1311                 obj == &channel_event_source;
1312 }
1313
1314 void stasis_app_unregister_event_sources(void)
1315 {
1316         stasis_app_unregister_event_source(&endpoint_event_source);
1317         stasis_app_unregister_event_source(&bridge_event_source);
1318         stasis_app_unregister_event_source(&channel_event_source);
1319 }
1320
1321