7459696152934b5f54cd893bfc2f632d9f31da62
[asterisk/asterisk.git] / res / stasis / app.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis application support.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 #include "asterisk.h"
27
28 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
29
30 #include "app.h"
31 #include "control.h"
32 #include "messaging.h"
33
34 #include "asterisk/callerid.h"
35 #include "asterisk/stasis_app.h"
36 #include "asterisk/stasis_bridges.h"
37 #include "asterisk/stasis_channels.h"
38 #include "asterisk/stasis_endpoints.h"
39 #include "asterisk/stasis_message_router.h"
40
41 static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate);
42
43 struct stasis_app {
44         /*! Aggregation topic for this application. */
45         struct stasis_topic *topic;
46         /*! Router for handling messages forwarded to \a topic. */
47         struct stasis_message_router *router;
48         /*! Router for handling messages to the bridge all \a topic. */
49         struct stasis_message_router *bridge_router;
50         /*! Container of the channel forwards to this app's topic. */
51         struct ao2_container *forwards;
52         /*! Callback function for this application. */
53         stasis_app_cb handler;
54         /*! Opaque data to hand to callback function. */
55         void *data;
56         /*! Name of the Stasis application */
57         char name[];
58 };
59
60 enum forward_type {
61         FORWARD_CHANNEL,
62         FORWARD_BRIDGE,
63         FORWARD_ENDPOINT,
64 };
65
66 /*! Subscription info for a particular channel/bridge. */
67 struct app_forwards {
68         /*! Count of number of times this channel/bridge has been subscribed */
69         int interested;
70
71         /*! Forward for the regular topic */
72         struct stasis_forward *topic_forward;
73         /*! Forward for the caching topic */
74         struct stasis_forward *topic_cached_forward;
75
76         /* Type of object being forwarded */
77         enum forward_type forward_type;
78         /*! Unique id of the object being forwarded */
79         char id[];
80 };
81
82 static void forwards_dtor(void *obj)
83 {
84 #ifdef AST_DEVMODE
85         struct app_forwards *forwards = obj;
86 #endif /* AST_DEVMODE */
87
88         ast_assert(forwards->topic_forward == NULL);
89         ast_assert(forwards->topic_cached_forward == NULL);
90 }
91
92 static void forwards_unsubscribe(struct app_forwards *forwards)
93 {
94         stasis_forward_cancel(forwards->topic_forward);
95         forwards->topic_forward = NULL;
96         stasis_forward_cancel(forwards->topic_cached_forward);
97         forwards->topic_cached_forward = NULL;
98 }
99
100 static struct app_forwards *forwards_create(struct stasis_app *app,
101         const char *id)
102 {
103         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
104
105         if (!app || ast_strlen_zero(id)) {
106                 return NULL;
107         }
108
109         forwards = ao2_alloc(sizeof(*forwards) + strlen(id) + 1, forwards_dtor);
110         if (!forwards) {
111                 return NULL;
112         }
113
114         strcpy(forwards->id, id);
115
116         ao2_ref(forwards, +1);
117         return forwards;
118 }
119
120 /*! Forward a channel's topics to an app */
121 static struct app_forwards *forwards_create_channel(struct stasis_app *app,
122         struct ast_channel *chan)
123 {
124         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
125
126         if (!app || !chan) {
127                 return NULL;
128         }
129
130         forwards = forwards_create(app, ast_channel_uniqueid(chan));
131         if (!forwards) {
132                 return NULL;
133         }
134
135         forwards->forward_type = FORWARD_CHANNEL;
136         forwards->topic_forward = stasis_forward_all(ast_channel_topic(chan),
137                 app->topic);
138         if (!forwards->topic_forward) {
139                 return NULL;
140         }
141
142         forwards->topic_cached_forward = stasis_forward_all(
143                 ast_channel_topic_cached(chan), app->topic);
144         if (!forwards->topic_cached_forward) {
145                 /* Half-subscribed is a bad thing */
146                 stasis_forward_cancel(forwards->topic_forward);
147                 forwards->topic_forward = NULL;
148                 return NULL;
149         }
150
151         ao2_ref(forwards, +1);
152         return forwards;
153 }
154
155 /*! Forward a bridge's topics to an app */
156 static struct app_forwards *forwards_create_bridge(struct stasis_app *app,
157         struct ast_bridge *bridge)
158 {
159         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
160
161         if (!app || !bridge) {
162                 return NULL;
163         }
164
165         forwards = forwards_create(app, bridge->uniqueid);
166         if (!forwards) {
167                 return NULL;
168         }
169
170         forwards->forward_type = FORWARD_BRIDGE;
171         forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge),
172                 app->topic);
173         if (!forwards->topic_forward) {
174                 return NULL;
175         }
176
177         forwards->topic_cached_forward = stasis_forward_all(
178                 ast_bridge_topic_cached(bridge), app->topic);
179         if (!forwards->topic_cached_forward) {
180                 /* Half-subscribed is a bad thing */
181                 stasis_forward_cancel(forwards->topic_forward);
182                 forwards->topic_forward = NULL;
183                 return NULL;
184         }
185
186         ao2_ref(forwards, +1);
187         return forwards;
188 }
189
190 /*! Forward a endpoint's topics to an app */
191 static struct app_forwards *forwards_create_endpoint(struct stasis_app *app,
192         struct ast_endpoint *endpoint)
193 {
194         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
195
196         if (!app || !endpoint) {
197                 return NULL;
198         }
199
200         forwards = forwards_create(app, ast_endpoint_get_id(endpoint));
201         if (!forwards) {
202                 return NULL;
203         }
204
205         forwards->forward_type = FORWARD_ENDPOINT;
206         forwards->topic_forward = stasis_forward_all(ast_endpoint_topic(endpoint),
207                 app->topic);
208         if (!forwards->topic_forward) {
209                 return NULL;
210         }
211
212         forwards->topic_cached_forward = stasis_forward_all(
213                 ast_endpoint_topic_cached(endpoint), app->topic);
214         if (!forwards->topic_cached_forward) {
215                 /* Half-subscribed is a bad thing */
216                 stasis_forward_cancel(forwards->topic_forward);
217                 forwards->topic_forward = NULL;
218                 return NULL;
219         }
220
221         ao2_ref(forwards, +1);
222         return forwards;
223 }
224
225 static int forwards_sort(const void *obj_left, const void *obj_right, int flags)
226 {
227         const struct app_forwards *object_left = obj_left;
228         const struct app_forwards *object_right = obj_right;
229         const char *right_key = obj_right;
230         int cmp;
231
232         switch (flags & (OBJ_POINTER | OBJ_KEY | OBJ_PARTIAL_KEY)) {
233         case OBJ_POINTER:
234                 right_key = object_right->id;
235                 /* Fall through */
236         case OBJ_KEY:
237                 cmp = strcmp(object_left->id, right_key);
238                 break;
239         case OBJ_PARTIAL_KEY:
240                 /*
241                  * We could also use a partial key struct containing a length
242                  * so strlen() does not get called for every comparison instead.
243                  */
244                 cmp = strncmp(object_left->id, right_key, strlen(right_key));
245                 break;
246         default:
247                 /* Sort can only work on something with a full or partial key. */
248                 ast_assert(0);
249                 cmp = 0;
250                 break;
251         }
252         return cmp;
253 }
254
255 static void app_dtor(void *obj)
256 {
257         struct stasis_app *app = obj;
258
259         ast_verb(1, "Destroying Stasis app %s\n", app->name);
260
261         ast_assert(app->router == NULL);
262         ast_assert(app->bridge_router == NULL);
263
264         ao2_cleanup(app->topic);
265         app->topic = NULL;
266         ao2_cleanup(app->forwards);
267         app->forwards = NULL;
268         ao2_cleanup(app->data);
269         app->data = NULL;
270 }
271
272 static void call_forwarded_handler(struct stasis_app *app, struct stasis_message *message)
273 {
274         struct ast_multi_channel_blob *payload = stasis_message_data(message);
275         struct ast_channel_snapshot *snapshot = ast_multi_channel_blob_get_channel(payload, "forwarded");
276         struct ast_channel *chan;
277
278         if (!snapshot) {
279                 return;
280         }
281
282         chan = ast_channel_get_by_name(snapshot->uniqueid);
283         if (!chan) {
284                 return;
285         }
286
287         app_subscribe_channel(app, chan);
288         ast_channel_unref(chan);
289 }
290
291 static void sub_default_handler(void *data, struct stasis_subscription *sub,
292         struct stasis_message *message)
293 {
294         struct stasis_app *app = data;
295         RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
296
297         if (stasis_subscription_final_message(sub, message)) {
298                 ao2_cleanup(app);
299         }
300
301         if (stasis_message_type(message) == ast_channel_dial_type()) {
302                 call_forwarded_handler(app, message);
303         }
304
305         /* By default, send any message that has a JSON representation */
306         json = stasis_message_to_json(message, stasis_app_get_sanitizer());
307         if (!json) {
308                 return;
309         }
310
311         app_send(app, json);
312 }
313
314 /*! \brief Typedef for callbacks that get called on channel snapshot updates */
315 typedef struct ast_json *(*channel_snapshot_monitor)(
316         struct ast_channel_snapshot *old_snapshot,
317         struct ast_channel_snapshot *new_snapshot,
318         const struct timeval *tv);
319
320 static struct ast_json *simple_channel_event(
321         const char *type,
322         struct ast_channel_snapshot *snapshot,
323         const struct timeval *tv)
324 {
325         struct ast_json *json_channel = ast_channel_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
326
327         if (!json_channel) {
328                 return NULL;
329         }
330
331         return ast_json_pack("{s: s, s: o, s: o}",
332                 "type", type,
333                 "timestamp", ast_json_timeval(*tv, NULL),
334                 "channel", json_channel);
335 }
336
337 static struct ast_json *channel_created_event(
338         struct ast_channel_snapshot *snapshot,
339         const struct timeval *tv)
340 {
341         return simple_channel_event("ChannelCreated", snapshot, tv);
342 }
343
344 static struct ast_json *channel_destroyed_event(
345         struct ast_channel_snapshot *snapshot,
346         const struct timeval *tv)
347 {
348         struct ast_json *json_channel = ast_channel_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
349
350         if (!json_channel) {
351                 return NULL;
352         }
353
354         return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
355                 "type", "ChannelDestroyed",
356                 "timestamp", ast_json_timeval(*tv, NULL),
357                 "cause", snapshot->hangupcause,
358                 "cause_txt", ast_cause2str(snapshot->hangupcause),
359                 "channel", json_channel);
360 }
361
362 static struct ast_json *channel_state_change_event(
363         struct ast_channel_snapshot *snapshot,
364         const struct timeval *tv)
365 {
366         return simple_channel_event("ChannelStateChange", snapshot, tv);
367 }
368
369 /*! \brief Handle channel state changes */
370 static struct ast_json *channel_state(
371         struct ast_channel_snapshot *old_snapshot,
372         struct ast_channel_snapshot *new_snapshot,
373         const struct timeval *tv)
374 {
375         struct ast_channel_snapshot *snapshot = new_snapshot ?
376                 new_snapshot : old_snapshot;
377
378         if (!old_snapshot) {
379                 return channel_created_event(snapshot, tv);
380         } else if (!new_snapshot) {
381                 return channel_destroyed_event(snapshot, tv);
382         } else if (old_snapshot->state != new_snapshot->state) {
383                 return channel_state_change_event(snapshot, tv);
384         }
385
386         return NULL;
387 }
388
389 static struct ast_json *channel_dialplan(
390         struct ast_channel_snapshot *old_snapshot,
391         struct ast_channel_snapshot *new_snapshot,
392         const struct timeval *tv)
393 {
394         struct ast_json *json_channel;
395
396         /* No Newexten event on cache clear or first event */
397         if (!old_snapshot || !new_snapshot) {
398                 return NULL;
399         }
400
401         /* Empty application is not valid for a Newexten event */
402         if (ast_strlen_zero(new_snapshot->appl)) {
403                 return NULL;
404         }
405
406         if (ast_channel_snapshot_cep_equal(old_snapshot, new_snapshot)) {
407                 return NULL;
408         }
409
410         json_channel = ast_channel_snapshot_to_json(new_snapshot, stasis_app_get_sanitizer());
411         if (!json_channel) {
412                 return NULL;
413         }
414
415         return ast_json_pack("{s: s, s: o, s: s, s: s, s: o}",
416                 "type", "ChannelDialplan",
417                 "timestamp", ast_json_timeval(*tv, NULL),
418                 "dialplan_app", new_snapshot->appl,
419                 "dialplan_app_data", new_snapshot->data,
420                 "channel", json_channel);
421 }
422
423 static struct ast_json *channel_callerid(
424         struct ast_channel_snapshot *old_snapshot,
425         struct ast_channel_snapshot *new_snapshot,
426         const struct timeval *tv)
427 {
428         struct ast_json *json_channel;
429
430         /* No NewCallerid event on cache clear or first event */
431         if (!old_snapshot || !new_snapshot) {
432                 return NULL;
433         }
434
435         if (ast_channel_snapshot_caller_id_equal(old_snapshot, new_snapshot)) {
436                 return NULL;
437         }
438
439         json_channel = ast_channel_snapshot_to_json(new_snapshot, stasis_app_get_sanitizer());
440         if (!json_channel) {
441                 return NULL;
442         }
443
444         return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
445                 "type", "ChannelCallerId",
446                 "timestamp", ast_json_timeval(*tv, NULL),
447                 "caller_presentation", new_snapshot->caller_pres,
448                 "caller_presentation_txt", ast_describe_caller_presentation(
449                         new_snapshot->caller_pres),
450                 "channel", json_channel);
451 }
452
453 static channel_snapshot_monitor channel_monitors[] = {
454         channel_state,
455         channel_dialplan,
456         channel_callerid,
457 };
458
459 static void sub_channel_update_handler(void *data,
460         struct stasis_subscription *sub,
461         struct stasis_message *message)
462 {
463         struct stasis_app *app = data;
464         struct stasis_cache_update *update;
465         struct ast_channel_snapshot *new_snapshot;
466         struct ast_channel_snapshot *old_snapshot;
467         const struct timeval *tv;
468         int i;
469
470         ast_assert(stasis_message_type(message) == stasis_cache_update_type());
471
472         update = stasis_message_data(message);
473
474         ast_assert(update->type == ast_channel_snapshot_type());
475
476         new_snapshot = stasis_message_data(update->new_snapshot);
477         old_snapshot = stasis_message_data(update->old_snapshot);
478
479         /* Pull timestamp from the new snapshot, or from the update message
480          * when there isn't one. */
481         tv = update->new_snapshot ?
482                 stasis_message_timestamp(update->new_snapshot) :
483                 stasis_message_timestamp(message);
484
485         for (i = 0; i < ARRAY_LEN(channel_monitors); ++i) {
486                 RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
487
488                 msg = channel_monitors[i](old_snapshot, new_snapshot, tv);
489                 if (msg) {
490                         app_send(app, msg);
491                 }
492         }
493
494         if (!new_snapshot && old_snapshot) {
495                 unsubscribe(app, "channel", old_snapshot->uniqueid, 1);
496         }
497 }
498
499 static struct ast_json *simple_endpoint_event(
500         const char *type,
501         struct ast_endpoint_snapshot *snapshot,
502         const struct timeval *tv)
503 {
504         struct ast_json *json_endpoint = ast_endpoint_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
505
506         if (!json_endpoint) {
507                 return NULL;
508         }
509
510         return ast_json_pack("{s: s, s: o, s: o}",
511                 "type", type,
512                 "timestamp", ast_json_timeval(*tv, NULL),
513                 "endpoint", json_endpoint);
514 }
515
516 static int message_received_handler(const char *endpoint_id, struct ast_json *json_msg, void *pvt)
517 {
518         RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
519         struct ast_json *json_endpoint;
520         struct stasis_app *app = pvt;
521         char *tech;
522         char *resource;
523
524         tech = ast_strdupa(endpoint_id);
525         resource = strchr(tech, '/');
526         if (resource) {
527                 resource[0] = '\0';
528                 resource++;
529         }
530
531         if (ast_strlen_zero(tech) || ast_strlen_zero(resource)) {
532                 return -1;
533         }
534
535         snapshot = ast_endpoint_latest_snapshot(tech, resource);
536         if (!snapshot) {
537                 return -1;
538         }
539
540         json_endpoint = ast_endpoint_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
541         if (!json_endpoint) {
542                 return -1;
543         }
544
545         app_send(app, ast_json_pack("{s: s, s: o, s: o, s: O}",
546                 "type", "TextMessageReceived",
547                 "timestamp", ast_json_timeval(ast_tvnow(), NULL),
548                 "endpoint", json_endpoint,
549                 "message", json_msg));
550
551         return 0;
552 }
553
554 static void sub_endpoint_update_handler(void *data,
555         struct stasis_subscription *sub,
556         struct stasis_message *message)
557 {
558         RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
559         struct stasis_app *app = data;
560         struct stasis_cache_update *update;
561         struct ast_endpoint_snapshot *new_snapshot;
562         struct ast_endpoint_snapshot *old_snapshot;
563         const struct timeval *tv;
564
565         ast_assert(stasis_message_type(message) == stasis_cache_update_type());
566
567         update = stasis_message_data(message);
568
569         ast_assert(update->type == ast_endpoint_snapshot_type());
570
571         new_snapshot = stasis_message_data(update->new_snapshot);
572         old_snapshot = stasis_message_data(update->old_snapshot);
573
574         if (new_snapshot) {
575                 tv = stasis_message_timestamp(update->new_snapshot);
576
577                 json = simple_endpoint_event("EndpointStateChange", new_snapshot, tv);
578                 if (!json) {
579                         return;
580                 }
581
582                 app_send(app, json);
583         }
584
585         if (!new_snapshot && old_snapshot) {
586                 unsubscribe(app, "endpoint", old_snapshot->id, 1);
587         }
588 }
589
590 static struct ast_json *simple_bridge_event(
591         const char *type,
592         struct ast_bridge_snapshot *snapshot,
593         const struct timeval *tv)
594 {
595         struct ast_json *json_bridge = ast_bridge_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
596         if (!json_bridge) {
597                 return NULL;
598         }
599
600         return ast_json_pack("{s: s, s: o, s: o}",
601                 "type", type,
602                 "timestamp", ast_json_timeval(*tv, NULL),
603                 "bridge", json_bridge);
604 }
605
606 static void sub_bridge_update_handler(void *data,
607         struct stasis_subscription *sub,
608         struct stasis_message *message)
609 {
610         RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
611         struct stasis_app *app = data;
612         struct stasis_cache_update *update;
613         struct ast_bridge_snapshot *new_snapshot;
614         struct ast_bridge_snapshot *old_snapshot;
615         const struct timeval *tv;
616
617         ast_assert(stasis_message_type(message) == stasis_cache_update_type());
618
619         update = stasis_message_data(message);
620
621         ast_assert(update->type == ast_bridge_snapshot_type());
622
623         new_snapshot = stasis_message_data(update->new_snapshot);
624         old_snapshot = stasis_message_data(update->old_snapshot);
625         tv = update->new_snapshot ?
626                 stasis_message_timestamp(update->new_snapshot) :
627                 stasis_message_timestamp(message);
628
629         if (!new_snapshot) {
630                 json = simple_bridge_event("BridgeDestroyed", old_snapshot, tv);
631         } else if (!old_snapshot) {
632                 json = simple_bridge_event("BridgeCreated", new_snapshot, tv);
633         }
634
635         if (json) {
636                 app_send(app, json);
637         }
638
639         if (!new_snapshot && old_snapshot) {
640                 unsubscribe(app, "bridge", old_snapshot->uniqueid, 1);
641         }
642 }
643
644
645 /*! \brief Helper function for determining if the application is subscribed to a given entity */
646 static int bridge_app_subscribed(struct stasis_app *app, const char *uniqueid)
647 {
648         struct app_forwards *forwards = NULL;
649
650         forwards = ao2_find(app->forwards, uniqueid, OBJ_SEARCH_KEY);
651         if (!forwards) {
652                 return 0;
653         }
654
655         ao2_ref(forwards, -1);
656         return 1;
657 }
658
659 static void bridge_merge_handler(void *data, struct stasis_subscription *sub,
660         struct stasis_message *message)
661 {
662         struct stasis_app *app = data;
663         struct ast_bridge_merge_message *merge;
664
665         merge = stasis_message_data(message);
666
667         /* Find out if we're subscribed to either bridge */
668         if (bridge_app_subscribed(app, merge->from->uniqueid) ||
669                 bridge_app_subscribed(app, merge->to->uniqueid)) {
670                 /* Forward the message to the app */
671                 stasis_publish(app->topic, message);
672         }
673 }
674
675 /*! \brief Callback function for checking if channels in a bridge are subscribed to */
676 static int bridge_app_subscribed_involved(struct stasis_app *app, struct ast_bridge_snapshot *snapshot)
677 {
678         int subscribed = 0;
679         struct ao2_iterator iter;
680         char *uniqueid;
681
682         if (bridge_app_subscribed(app, snapshot->uniqueid)) {
683                 return 1;
684         }
685
686         iter = ao2_iterator_init(snapshot->channels, 0);
687         for (; (uniqueid = ao2_iterator_next(&iter)); ao2_ref(uniqueid, -1)) {
688                 if (bridge_app_subscribed(app, uniqueid)) {
689                         subscribed = 1;
690                         ao2_ref(uniqueid, -1);
691                         break;
692                 }
693         }
694         ao2_iterator_destroy(&iter);
695
696         return subscribed;
697 }
698
699 static void bridge_blind_transfer_handler(void *data, struct stasis_subscription *sub,
700         struct stasis_message *message)
701 {
702         struct stasis_app *app = data;
703         struct ast_blind_transfer_message *transfer_msg = stasis_message_data(message);
704         struct ast_bridge_snapshot *bridge = transfer_msg->to_transferee.bridge_snapshot;
705
706         if (bridge_app_subscribed(app, transfer_msg->to_transferee.channel_snapshot->uniqueid) ||
707                 (bridge && bridge_app_subscribed_involved(app, bridge))) {
708                 stasis_publish(app->topic, message);
709         }
710 }
711
712 static void set_replacement_channel(struct ast_channel_snapshot *to_be_replaced,
713                 struct ast_channel_snapshot *replacing)
714 {
715         struct stasis_app_control *control = stasis_app_control_find_by_channel_id(
716                 to_be_replaced->uniqueid);
717         struct ast_channel *chan = ast_channel_get_by_name(replacing->uniqueid);
718
719         if (control && chan) {
720                 ast_channel_lock(chan);
721                 app_set_replace_channel_app(chan, app_name(control_app(control)));
722                 app_set_replace_channel_snapshot(chan, to_be_replaced);
723                 ast_channel_unlock(chan);
724         }
725         ast_channel_cleanup(chan);
726         ao2_cleanup(control);
727 }
728
729 static void bridge_attended_transfer_handler(void *data, struct stasis_subscription *sub,
730         struct stasis_message *message)
731 {
732         struct stasis_app *app = data;
733         struct ast_attended_transfer_message *transfer_msg = stasis_message_data(message);
734         int subscribed = 0;
735
736         subscribed = bridge_app_subscribed(app, transfer_msg->to_transferee.channel_snapshot->uniqueid);
737         if (!subscribed) {
738                 subscribed = bridge_app_subscribed(app, transfer_msg->to_transfer_target.channel_snapshot->uniqueid);
739         }
740         if (!subscribed && transfer_msg->to_transferee.bridge_snapshot) {
741                 subscribed = bridge_app_subscribed_involved(app, transfer_msg->to_transferee.bridge_snapshot);
742         }
743         if (!subscribed && transfer_msg->to_transfer_target.bridge_snapshot) {
744                 subscribed = bridge_app_subscribed_involved(app, transfer_msg->to_transfer_target.bridge_snapshot);
745         }
746
747         if (!subscribed) {
748                 switch (transfer_msg->dest_type) {
749                 case AST_ATTENDED_TRANSFER_DEST_BRIDGE_MERGE:
750                         subscribed = bridge_app_subscribed(app, transfer_msg->dest.bridge);
751                         break;
752                 case AST_ATTENDED_TRANSFER_DEST_LINK:
753                         subscribed = bridge_app_subscribed(app, transfer_msg->dest.links[0]->uniqueid);
754                         if (!subscribed) {
755                                 subscribed = bridge_app_subscribed(app, transfer_msg->dest.links[1]->uniqueid);
756                         }
757                         break;
758                 break;
759                 case AST_ATTENDED_TRANSFER_DEST_THREEWAY:
760                         subscribed = bridge_app_subscribed_involved(app, transfer_msg->dest.threeway.bridge_snapshot);
761                         if (!subscribed) {
762                                 subscribed = bridge_app_subscribed(app, transfer_msg->dest.threeway.channel_snapshot->uniqueid);
763                         }
764                         break;
765                 default:
766                         break;
767                 }
768         }
769
770         if (subscribed) {
771                 stasis_publish(app->topic, message);
772         }
773
774         if (transfer_msg->replace_channel) {
775                 set_replacement_channel(transfer_msg->to_transferee.channel_snapshot,
776                                 transfer_msg->replace_channel);
777         }
778
779         if (transfer_msg->dest_type == AST_ATTENDED_TRANSFER_DEST_LINK) {
780                 set_replacement_channel(transfer_msg->to_transferee.channel_snapshot,
781                                 transfer_msg->dest.links[0]);
782                 set_replacement_channel(transfer_msg->to_transfer_target.channel_snapshot,
783                                 transfer_msg->dest.links[1]);
784         }
785 }
786
787 static void bridge_default_handler(void *data, struct stasis_subscription *sub,
788         struct stasis_message *message)
789 {
790         struct stasis_app *app = data;
791
792         if (stasis_subscription_final_message(sub, message)) {
793                 ao2_cleanup(app);
794         }
795 }
796
797 struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *data)
798 {
799         RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
800         size_t size;
801         int res = 0;
802
803         ast_assert(name != NULL);
804         ast_assert(handler != NULL);
805
806         ast_verb(1, "Creating Stasis app '%s'\n", name);
807
808         size = sizeof(*app) + strlen(name) + 1;
809         app = ao2_alloc_options(size, app_dtor, AO2_ALLOC_OPT_LOCK_MUTEX);
810
811         if (!app) {
812                 return NULL;
813         }
814
815         app->forwards = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_MUTEX,
816                 AO2_CONTAINER_ALLOC_OPT_DUPS_OBJ_REJECT,
817                 forwards_sort, NULL);
818         if (!app->forwards) {
819                 return NULL;
820         }
821
822         app->topic = stasis_topic_create(name);
823         if (!app->topic) {
824                 return NULL;
825         }
826
827         app->bridge_router = stasis_message_router_create(ast_bridge_topic_all());
828         if (!app->bridge_router) {
829                 return NULL;
830         }
831
832         res |= stasis_message_router_add(app->bridge_router,
833                 ast_bridge_merge_message_type(), bridge_merge_handler, app);
834
835         res |= stasis_message_router_add(app->bridge_router,
836                 ast_blind_transfer_type(), bridge_blind_transfer_handler, app);
837
838         res |= stasis_message_router_add(app->bridge_router,
839                 ast_attended_transfer_type(), bridge_attended_transfer_handler, app);
840
841         res |= stasis_message_router_set_default(app->bridge_router,
842                 bridge_default_handler, app);
843
844         if (res != 0) {
845                 return NULL;
846         }
847         /* Bridge router holds a reference */
848         ao2_ref(app, +1);
849
850         app->router = stasis_message_router_create(app->topic);
851         if (!app->router) {
852                 return NULL;
853         }
854
855         res |= stasis_message_router_add_cache_update(app->router,
856                 ast_bridge_snapshot_type(), sub_bridge_update_handler, app);
857
858         res |= stasis_message_router_add_cache_update(app->router,
859                 ast_channel_snapshot_type(), sub_channel_update_handler, app);
860
861         res |= stasis_message_router_add_cache_update(app->router,
862                 ast_endpoint_snapshot_type(), sub_endpoint_update_handler, app);
863
864         res |= stasis_message_router_set_default(app->router,
865                 sub_default_handler, app);
866
867         if (res != 0) {
868                 return NULL;
869         }
870         /* Router holds a reference */
871         ao2_ref(app, +1);
872
873         strncpy(app->name, name, size - sizeof(*app));
874         app->handler = handler;
875         ao2_ref(data, +1);
876         app->data = data;
877
878         ao2_ref(app, +1);
879         return app;
880 }
881
882 struct stasis_topic *ast_app_get_topic(struct stasis_app *app) {
883         return app->topic;
884 }
885
886 /*!
887  * \brief Send a message to the given application.
888  * \param app App to send the message to.
889  * \param message Message to send.
890  */
891 void app_send(struct stasis_app *app, struct ast_json *message)
892 {
893         stasis_app_cb handler;
894         RAII_VAR(void *, data, NULL, ao2_cleanup);
895
896         /* Copy off mutable state with lock held */
897         {
898                 SCOPED_AO2LOCK(lock, app);
899                 handler = app->handler;
900                 if (app->data) {
901                         ao2_ref(app->data, +1);
902                         data = app->data;
903                 }
904                 /* Name is immutable; no need to copy */
905         }
906
907         if (!handler) {
908                 ast_verb(3,
909                         "Inactive Stasis app '%s' missed message\n", app->name);
910                 return;
911         }
912
913         handler(data, app->name, message);
914 }
915
916 void app_deactivate(struct stasis_app *app)
917 {
918         SCOPED_AO2LOCK(lock, app);
919         ast_verb(1, "Deactivating Stasis app '%s'\n", app->name);
920         app->handler = NULL;
921         ao2_cleanup(app->data);
922         app->data = NULL;
923 }
924
925 void app_shutdown(struct stasis_app *app)
926 {
927         SCOPED_AO2LOCK(lock, app);
928
929         ast_assert(app_is_finished(app));
930
931         stasis_message_router_unsubscribe(app->router);
932         app->router = NULL;
933         stasis_message_router_unsubscribe(app->bridge_router);
934         app->bridge_router = NULL;
935 }
936
937 int app_is_active(struct stasis_app *app)
938 {
939         SCOPED_AO2LOCK(lock, app);
940         return app->handler != NULL;
941 }
942
943 int app_is_finished(struct stasis_app *app)
944 {
945         SCOPED_AO2LOCK(lock, app);
946
947         return app->handler == NULL && ao2_container_count(app->forwards) == 0;
948 }
949
950 void app_update(struct stasis_app *app, stasis_app_cb handler, void *data)
951 {
952         SCOPED_AO2LOCK(lock, app);
953
954         if (app->handler) {
955                 RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
956
957                 ast_verb(1, "Replacing Stasis app '%s'\n", app->name);
958
959                 msg = ast_json_pack("{s: s, s: s}",
960                         "type", "ApplicationReplaced",
961                         "application", app->name);
962                 if (msg) {
963                         app_send(app, msg);
964                 }
965         } else {
966                 ast_verb(1, "Activating Stasis app '%s'\n", app->name);
967         }
968
969         app->handler = handler;
970         ao2_cleanup(app->data);
971         if (data) {
972                 ao2_ref(data, +1);
973         }
974         app->data = data;
975 }
976
977 const char *app_name(const struct stasis_app *app)
978 {
979         return app->name;
980 }
981
982 struct ast_json *app_to_json(const struct stasis_app *app)
983 {
984         RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
985         struct ast_json *channels;
986         struct ast_json *bridges;
987         struct ast_json *endpoints;
988         struct ao2_iterator i;
989         void *obj;
990
991         json = ast_json_pack("{s: s, s: [], s: [], s: []}",
992                 "name", app->name,
993                 "channel_ids", "bridge_ids", "endpoint_ids");
994         channels = ast_json_object_get(json, "channel_ids");
995         bridges = ast_json_object_get(json, "bridge_ids");
996         endpoints = ast_json_object_get(json, "endpoint_ids");
997
998         i = ao2_iterator_init(app->forwards, 0);
999         while ((obj = ao2_iterator_next(&i))) {
1000                 RAII_VAR(struct app_forwards *, forwards, obj, ao2_cleanup);
1001                 RAII_VAR(struct ast_json *, id, NULL, ast_json_unref);
1002                 int append_res = -1;
1003
1004                 id = ast_json_string_create(forwards->id);
1005
1006                 switch (forwards->forward_type) {
1007                 case FORWARD_CHANNEL:
1008                         append_res = ast_json_array_append(channels,
1009                                 ast_json_ref(id));
1010                         break;
1011                 case FORWARD_BRIDGE:
1012                         append_res = ast_json_array_append(bridges,
1013                                 ast_json_ref(id));
1014                         break;
1015                 case FORWARD_ENDPOINT:
1016                         append_res = ast_json_array_append(endpoints,
1017                                 ast_json_ref(id));
1018                         break;
1019                 }
1020
1021                 if (append_res != 0) {
1022                         ast_log(LOG_ERROR, "Error building response\n");
1023                         ao2_iterator_destroy(&i);
1024                         return NULL;
1025                 }
1026         }
1027         ao2_iterator_destroy(&i);
1028
1029         return ast_json_ref(json);
1030 }
1031
1032 int app_subscribe_channel(struct stasis_app *app, struct ast_channel *chan)
1033 {
1034         int res;
1035
1036         if (!app || !chan) {
1037                 return -1;
1038         } else {
1039                 RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
1040                 SCOPED_AO2LOCK(lock, app->forwards);
1041
1042                 forwards = ao2_find(app->forwards, ast_channel_uniqueid(chan),
1043                         OBJ_SEARCH_KEY | OBJ_NOLOCK);
1044                 if (!forwards) {
1045                         /* Forwards not found, create one */
1046                         forwards = forwards_create_channel(app, chan);
1047                         if (!forwards) {
1048                                 return -1;
1049                         }
1050
1051                         res = ao2_link_flags(app->forwards, forwards,
1052                                 OBJ_NOLOCK);
1053                         if (!res) {
1054                                 return -1;
1055                         }
1056                 }
1057
1058                 ++forwards->interested;
1059                 ast_debug(3, "Channel '%s' is %d interested in %s\n", ast_channel_uniqueid(chan), forwards->interested, app->name);
1060                 return 0;
1061         }
1062 }
1063
1064 static int subscribe_channel(struct stasis_app *app, void *obj)
1065 {
1066         return app_subscribe_channel(app, obj);
1067 }
1068
1069 static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate)
1070 {
1071         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
1072         SCOPED_AO2LOCK(lock, app->forwards);
1073
1074         forwards = ao2_find(app->forwards, id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1075         if (!forwards) {
1076                 ast_log(LOG_WARNING,
1077                         "App '%s' not subscribed to %s '%s'\n",
1078                         app->name, kind, id);
1079                 return -1;
1080         }
1081         forwards->interested--;
1082
1083         ast_debug(3, "%s '%s': is %d interested in %s\n", kind, id, forwards->interested, app->name);
1084         if (forwards->interested == 0 || terminate) {
1085                 /* No one is interested any more; unsubscribe */
1086                 ast_debug(3, "%s '%s' unsubscribed from %s\n", kind, id, app->name);
1087                 forwards_unsubscribe(forwards);
1088                 ao2_find(app->forwards, forwards,
1089                         OBJ_POINTER | OBJ_NOLOCK | OBJ_UNLINK |
1090                         OBJ_NODATA);
1091
1092                 if (!strcmp(kind, "endpoint")) {
1093                         messaging_app_unsubscribe_endpoint(app->name, id);
1094                 }
1095         }
1096
1097         return 0;
1098 }
1099
1100 int app_unsubscribe_channel(struct stasis_app *app, struct ast_channel *chan)
1101 {
1102         if (!app || !chan) {
1103                 return -1;
1104         }
1105
1106         return app_unsubscribe_channel_id(app, ast_channel_uniqueid(chan));
1107 }
1108
1109 int app_unsubscribe_channel_id(struct stasis_app *app, const char *channel_id)
1110 {
1111         if (!app || !channel_id) {
1112                 return -1;
1113         }
1114
1115         return unsubscribe(app, "channel", channel_id, 0);
1116 }
1117
1118 int app_is_subscribed_channel_id(struct stasis_app *app, const char *channel_id)
1119 {
1120         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
1121         forwards = ao2_find(app->forwards, channel_id, OBJ_SEARCH_KEY);
1122         return forwards != NULL;
1123 }
1124
1125 int app_replace_channel_forwards(struct stasis_app *app, const char *old_id, struct ast_channel *new_chan)
1126 {
1127         RAII_VAR(struct app_forwards *, old_forwards, NULL, ao2_cleanup);
1128         struct app_forwards *new_forwards;
1129
1130         old_forwards = ao2_find(app->forwards, old_id, OBJ_SEARCH_KEY | OBJ_UNLINK);
1131         if (!old_forwards) {
1132                 return -1;
1133         }
1134
1135         new_forwards = forwards_create_channel(app, new_chan);
1136         if (!new_forwards) {
1137                 return -1;
1138         }
1139
1140         new_forwards->interested = old_forwards->interested;
1141         ao2_link_flags(app->forwards, new_forwards, 0);
1142         ao2_cleanup(new_forwards);
1143
1144         /* Clean up old forwards */
1145         forwards_unsubscribe(old_forwards);
1146         return 0;
1147 }
1148
1149 static void *channel_find(const struct stasis_app *app, const char *id)
1150 {
1151         return ast_channel_get_by_name(id);
1152 }
1153
1154 struct stasis_app_event_source channel_event_source = {
1155         .scheme = "channel:",
1156         .find = channel_find,
1157         .subscribe = subscribe_channel,
1158         .unsubscribe = app_unsubscribe_channel_id,
1159         .is_subscribed = app_is_subscribed_channel_id
1160 };
1161
1162 int app_subscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
1163 {
1164         if (!app || !bridge) {
1165                 return -1;
1166         } else {
1167                 RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
1168                 SCOPED_AO2LOCK(lock, app->forwards);
1169
1170                 forwards = ao2_find(app->forwards, bridge->uniqueid,
1171                         OBJ_SEARCH_KEY | OBJ_NOLOCK);
1172
1173                 if (!forwards) {
1174                         /* Forwards not found, create one */
1175                         forwards = forwards_create_bridge(app, bridge);
1176                         if (!forwards) {
1177                                 return -1;
1178                         }
1179                         ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
1180                 }
1181
1182                 ++forwards->interested;
1183                 ast_debug(3, "Bridge '%s' is %d interested in %s\n", bridge->uniqueid, forwards->interested, app->name);
1184                 return 0;
1185         }
1186 }
1187
1188 static int subscribe_bridge(struct stasis_app *app, void *obj)
1189 {
1190         return app_subscribe_bridge(app, obj);
1191 }
1192
1193 int app_unsubscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
1194 {
1195         if (!app || !bridge) {
1196                 return -1;
1197         }
1198
1199         return app_unsubscribe_bridge_id(app, bridge->uniqueid);
1200 }
1201
1202 int app_unsubscribe_bridge_id(struct stasis_app *app, const char *bridge_id)
1203 {
1204         if (!app || !bridge_id) {
1205                 return -1;
1206         }
1207
1208         return unsubscribe(app, "bridge", bridge_id, 0);
1209 }
1210
1211 int app_is_subscribed_bridge_id(struct stasis_app *app, const char *bridge_id)
1212 {
1213         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
1214         forwards = ao2_find(app->forwards, bridge_id, OBJ_SEARCH_KEY);
1215         return forwards != NULL;
1216 }
1217
1218 static void *bridge_find(const struct stasis_app *app, const char *id)
1219 {
1220         return stasis_app_bridge_find_by_id(id);
1221 }
1222
1223 struct stasis_app_event_source bridge_event_source = {
1224         .scheme = "bridge:",
1225         .find = bridge_find,
1226         .subscribe = subscribe_bridge,
1227         .unsubscribe = app_unsubscribe_bridge_id,
1228         .is_subscribed = app_is_subscribed_bridge_id
1229 };
1230
1231 int app_subscribe_endpoint(struct stasis_app *app, struct ast_endpoint *endpoint)
1232 {
1233         if (!app || !endpoint) {
1234                 return -1;
1235         } else {
1236                 RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
1237                 SCOPED_AO2LOCK(lock, app->forwards);
1238
1239                 forwards = ao2_find(app->forwards, ast_endpoint_get_id(endpoint),
1240                         OBJ_SEARCH_KEY | OBJ_NOLOCK);
1241
1242                 if (!forwards) {
1243                         /* Forwards not found, create one */
1244                         forwards = forwards_create_endpoint(app, endpoint);
1245                         if (!forwards) {
1246                                 return -1;
1247                         }
1248                         ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
1249
1250                         /* Subscribe for messages */
1251                         messaging_app_subscribe_endpoint(app->name, endpoint, &message_received_handler, app);
1252                 }
1253
1254                 ++forwards->interested;
1255                 ast_debug(3, "Endpoint '%s' is %d interested in %s\n", ast_endpoint_get_id(endpoint), forwards->interested, app->name);
1256                 return 0;
1257         }
1258 }
1259
1260 static int subscribe_endpoint(struct stasis_app *app, void *obj)
1261 {
1262         return app_subscribe_endpoint(app, obj);
1263 }
1264
1265 int app_unsubscribe_endpoint_id(struct stasis_app *app, const char *endpoint_id)
1266 {
1267         if (!app || !endpoint_id) {
1268                 return -1;
1269         }
1270
1271         return unsubscribe(app, "endpoint", endpoint_id, 0);
1272 }
1273
1274 int app_is_subscribed_endpoint_id(struct stasis_app *app, const char *endpoint_id)
1275 {
1276         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
1277         forwards = ao2_find(app->forwards, endpoint_id, OBJ_SEARCH_KEY);
1278         return forwards != NULL;
1279 }
1280
1281 static void *endpoint_find(const struct stasis_app *app, const char *id)
1282 {
1283         return ast_endpoint_find_by_id(id);
1284 }
1285
1286 struct stasis_app_event_source endpoint_event_source = {
1287         .scheme = "endpoint:",
1288         .find = endpoint_find,
1289         .subscribe = subscribe_endpoint,
1290         .unsubscribe = app_unsubscribe_endpoint_id,
1291         .is_subscribed = app_is_subscribed_endpoint_id
1292 };
1293
1294 void stasis_app_register_event_sources(void)
1295 {
1296         stasis_app_register_event_source(&channel_event_source);
1297         stasis_app_register_event_source(&bridge_event_source);
1298         stasis_app_register_event_source(&endpoint_event_source);
1299 }
1300
1301 int stasis_app_is_core_event_source(struct stasis_app_event_source *obj)
1302 {
1303         return obj == &endpoint_event_source ||
1304                 obj == &bridge_event_source ||
1305                 obj == &channel_event_source;
1306 }
1307
1308 void stasis_app_unregister_event_sources(void)
1309 {
1310         stasis_app_unregister_event_source(&endpoint_event_source);
1311         stasis_app_unregister_event_source(&bridge_event_source);
1312         stasis_app_unregister_event_source(&channel_event_source);
1313 }
1314
1315