cda1c045d55a9b8a5c5fc7ddf2353cc0f171482c
[asterisk/asterisk.git] / res / stasis / app.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis application support.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 #include "asterisk.h"
27
28 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
29
30 #include "app.h"
31 #include "control.h"
32 #include "messaging.h"
33
34 #include "asterisk/callerid.h"
35 #include "asterisk/stasis_app.h"
36 #include "asterisk/stasis_bridges.h"
37 #include "asterisk/stasis_channels.h"
38 #include "asterisk/stasis_endpoints.h"
39 #include "asterisk/stasis_message_router.h"
40
41 static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate);
42
43 struct stasis_app {
44         /*! Aggregation topic for this application. */
45         struct stasis_topic *topic;
46         /*! Router for handling messages forwarded to \a topic. */
47         struct stasis_message_router *router;
48         /*! Router for handling messages to the bridge all \a topic. */
49         struct stasis_message_router *bridge_router;
50         /*! Container of the channel forwards to this app's topic. */
51         struct ao2_container *forwards;
52         /*! Callback function for this application. */
53         stasis_app_cb handler;
54         /*! Opaque data to hand to callback function. */
55         void *data;
56         /*! Name of the Stasis application */
57         char name[];
58 };
59
60 enum forward_type {
61         FORWARD_CHANNEL,
62         FORWARD_BRIDGE,
63         FORWARD_ENDPOINT,
64 };
65
66 /*! Subscription info for a particular channel/bridge. */
67 struct app_forwards {
68         /*! Count of number of times this channel/bridge has been subscribed */
69         int interested;
70
71         /*! Forward for the regular topic */
72         struct stasis_forward *topic_forward;
73         /*! Forward for the caching topic */
74         struct stasis_forward *topic_cached_forward;
75
76         /* Type of object being forwarded */
77         enum forward_type forward_type;
78         /*! Unique id of the object being forwarded */
79         char id[];
80 };
81
82 static void forwards_dtor(void *obj)
83 {
84 #ifdef AST_DEVMODE
85         struct app_forwards *forwards = obj;
86 #endif /* AST_DEVMODE */
87
88         ast_assert(forwards->topic_forward == NULL);
89         ast_assert(forwards->topic_cached_forward == NULL);
90 }
91
92 static void forwards_unsubscribe(struct app_forwards *forwards)
93 {
94         stasis_forward_cancel(forwards->topic_forward);
95         forwards->topic_forward = NULL;
96         stasis_forward_cancel(forwards->topic_cached_forward);
97         forwards->topic_cached_forward = NULL;
98 }
99
100 static struct app_forwards *forwards_create(struct stasis_app *app,
101         const char *id)
102 {
103         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
104
105         if (!app || ast_strlen_zero(id)) {
106                 return NULL;
107         }
108
109         forwards = ao2_alloc(sizeof(*forwards) + strlen(id) + 1, forwards_dtor);
110         if (!forwards) {
111                 return NULL;
112         }
113
114         strcpy(forwards->id, id);
115
116         ao2_ref(forwards, +1);
117         return forwards;
118 }
119
120 /*! Forward a channel's topics to an app */
121 static struct app_forwards *forwards_create_channel(struct stasis_app *app,
122         struct ast_channel *chan)
123 {
124         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
125
126         if (!app || !chan) {
127                 return NULL;
128         }
129
130         forwards = forwards_create(app, ast_channel_uniqueid(chan));
131         if (!forwards) {
132                 return NULL;
133         }
134
135         forwards->forward_type = FORWARD_CHANNEL;
136         forwards->topic_forward = stasis_forward_all(ast_channel_topic(chan),
137                 app->topic);
138         if (!forwards->topic_forward) {
139                 return NULL;
140         }
141
142         forwards->topic_cached_forward = stasis_forward_all(
143                 ast_channel_topic_cached(chan), app->topic);
144         if (!forwards->topic_cached_forward) {
145                 /* Half-subscribed is a bad thing */
146                 stasis_forward_cancel(forwards->topic_forward);
147                 forwards->topic_forward = NULL;
148                 return NULL;
149         }
150
151         ao2_ref(forwards, +1);
152         return forwards;
153 }
154
155 /*! Forward a bridge's topics to an app */
156 static struct app_forwards *forwards_create_bridge(struct stasis_app *app,
157         struct ast_bridge *bridge)
158 {
159         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
160
161         if (!app || !bridge) {
162                 return NULL;
163         }
164
165         forwards = forwards_create(app, bridge->uniqueid);
166         if (!forwards) {
167                 return NULL;
168         }
169
170         forwards->forward_type = FORWARD_BRIDGE;
171         forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge),
172                 app->topic);
173         if (!forwards->topic_forward) {
174                 return NULL;
175         }
176
177         forwards->topic_cached_forward = stasis_forward_all(
178                 ast_bridge_topic_cached(bridge), app->topic);
179         if (!forwards->topic_cached_forward) {
180                 /* Half-subscribed is a bad thing */
181                 stasis_forward_cancel(forwards->topic_forward);
182                 forwards->topic_forward = NULL;
183                 return NULL;
184         }
185
186         ao2_ref(forwards, +1);
187         return forwards;
188 }
189
190 /*! Forward a endpoint's topics to an app */
191 static struct app_forwards *forwards_create_endpoint(struct stasis_app *app,
192         struct ast_endpoint *endpoint)
193 {
194         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
195
196         if (!app || !endpoint) {
197                 return NULL;
198         }
199
200         forwards = forwards_create(app, ast_endpoint_get_id(endpoint));
201         if (!forwards) {
202                 return NULL;
203         }
204
205         forwards->forward_type = FORWARD_ENDPOINT;
206         forwards->topic_forward = stasis_forward_all(ast_endpoint_topic(endpoint),
207                 app->topic);
208         if (!forwards->topic_forward) {
209                 return NULL;
210         }
211
212         forwards->topic_cached_forward = stasis_forward_all(
213                 ast_endpoint_topic_cached(endpoint), app->topic);
214         if (!forwards->topic_cached_forward) {
215                 /* Half-subscribed is a bad thing */
216                 stasis_forward_cancel(forwards->topic_forward);
217                 forwards->topic_forward = NULL;
218                 return NULL;
219         }
220
221         ao2_ref(forwards, +1);
222         return forwards;
223 }
224
225 static int forwards_sort(const void *obj_left, const void *obj_right, int flags)
226 {
227         const struct app_forwards *object_left = obj_left;
228         const struct app_forwards *object_right = obj_right;
229         const char *right_key = obj_right;
230         int cmp;
231
232         switch (flags & (OBJ_POINTER | OBJ_KEY | OBJ_PARTIAL_KEY)) {
233         case OBJ_POINTER:
234                 right_key = object_right->id;
235                 /* Fall through */
236         case OBJ_KEY:
237                 cmp = strcmp(object_left->id, right_key);
238                 break;
239         case OBJ_PARTIAL_KEY:
240                 /*
241                  * We could also use a partial key struct containing a length
242                  * so strlen() does not get called for every comparison instead.
243                  */
244                 cmp = strncmp(object_left->id, right_key, strlen(right_key));
245                 break;
246         default:
247                 /* Sort can only work on something with a full or partial key. */
248                 ast_assert(0);
249                 cmp = 0;
250                 break;
251         }
252         return cmp;
253 }
254
255 static void app_dtor(void *obj)
256 {
257         struct stasis_app *app = obj;
258
259         ast_verb(1, "Destroying Stasis app %s\n", app->name);
260
261         ast_assert(app->router == NULL);
262         ast_assert(app->bridge_router == NULL);
263
264         ao2_cleanup(app->topic);
265         app->topic = NULL;
266         ao2_cleanup(app->forwards);
267         app->forwards = NULL;
268         ao2_cleanup(app->data);
269         app->data = NULL;
270 }
271
272 static void call_forwarded_handler(struct stasis_app *app, struct stasis_message *message)
273 {
274         struct ast_multi_channel_blob *payload = stasis_message_data(message);
275         struct ast_channel_snapshot *snapshot = ast_multi_channel_blob_get_channel(payload, "forwarded");
276         struct ast_channel *chan;
277
278         if (!snapshot) {
279                 return;
280         }
281
282         chan = ast_channel_get_by_name(snapshot->uniqueid);
283         if (!chan) {
284                 return;
285         }
286
287         app_subscribe_channel(app, chan);
288         ast_channel_unref(chan);
289 }
290
291 static void sub_default_handler(void *data, struct stasis_subscription *sub,
292         struct stasis_message *message)
293 {
294         struct stasis_app *app = data;
295         RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
296
297         if (stasis_subscription_final_message(sub, message)) {
298                 ao2_cleanup(app);
299         }
300
301         if (stasis_message_type(message) == ast_channel_dial_type()) {
302                 call_forwarded_handler(app, message);
303         }
304
305         if (stasis_message_type(message) == app_end_message_type()) {
306                 app_end_message_handler(message);
307         }
308
309         /* By default, send any message that has a JSON representation */
310         json = stasis_message_to_json(message, stasis_app_get_sanitizer());
311         if (!json) {
312                 return;
313         }
314
315         app_send(app, json);
316 }
317
318 /*! \brief Typedef for callbacks that get called on channel snapshot updates */
319 typedef struct ast_json *(*channel_snapshot_monitor)(
320         struct ast_channel_snapshot *old_snapshot,
321         struct ast_channel_snapshot *new_snapshot,
322         const struct timeval *tv);
323
324 static struct ast_json *simple_channel_event(
325         const char *type,
326         struct ast_channel_snapshot *snapshot,
327         const struct timeval *tv)
328 {
329         struct ast_json *json_channel = ast_channel_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
330
331         if (!json_channel) {
332                 return NULL;
333         }
334
335         return ast_json_pack("{s: s, s: o, s: o}",
336                 "type", type,
337                 "timestamp", ast_json_timeval(*tv, NULL),
338                 "channel", json_channel);
339 }
340
341 static struct ast_json *channel_created_event(
342         struct ast_channel_snapshot *snapshot,
343         const struct timeval *tv)
344 {
345         return simple_channel_event("ChannelCreated", snapshot, tv);
346 }
347
348 static struct ast_json *channel_destroyed_event(
349         struct ast_channel_snapshot *snapshot,
350         const struct timeval *tv)
351 {
352         struct ast_json *json_channel = ast_channel_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
353
354         if (!json_channel) {
355                 return NULL;
356         }
357
358         return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
359                 "type", "ChannelDestroyed",
360                 "timestamp", ast_json_timeval(*tv, NULL),
361                 "cause", snapshot->hangupcause,
362                 "cause_txt", ast_cause2str(snapshot->hangupcause),
363                 "channel", json_channel);
364 }
365
366 static struct ast_json *channel_state_change_event(
367         struct ast_channel_snapshot *snapshot,
368         const struct timeval *tv)
369 {
370         return simple_channel_event("ChannelStateChange", snapshot, tv);
371 }
372
373 /*! \brief Handle channel state changes */
374 static struct ast_json *channel_state(
375         struct ast_channel_snapshot *old_snapshot,
376         struct ast_channel_snapshot *new_snapshot,
377         const struct timeval *tv)
378 {
379         struct ast_channel_snapshot *snapshot = new_snapshot ?
380                 new_snapshot : old_snapshot;
381
382         if (!old_snapshot) {
383                 return channel_created_event(snapshot, tv);
384         } else if (!new_snapshot) {
385                 return channel_destroyed_event(snapshot, tv);
386         } else if (old_snapshot->state != new_snapshot->state) {
387                 return channel_state_change_event(snapshot, tv);
388         }
389
390         return NULL;
391 }
392
393 static struct ast_json *channel_dialplan(
394         struct ast_channel_snapshot *old_snapshot,
395         struct ast_channel_snapshot *new_snapshot,
396         const struct timeval *tv)
397 {
398         struct ast_json *json_channel;
399
400         /* No Newexten event on cache clear or first event */
401         if (!old_snapshot || !new_snapshot) {
402                 return NULL;
403         }
404
405         /* Empty application is not valid for a Newexten event */
406         if (ast_strlen_zero(new_snapshot->appl)) {
407                 return NULL;
408         }
409
410         if (ast_channel_snapshot_cep_equal(old_snapshot, new_snapshot)) {
411                 return NULL;
412         }
413
414         json_channel = ast_channel_snapshot_to_json(new_snapshot, stasis_app_get_sanitizer());
415         if (!json_channel) {
416                 return NULL;
417         }
418
419         return ast_json_pack("{s: s, s: o, s: s, s: s, s: o}",
420                 "type", "ChannelDialplan",
421                 "timestamp", ast_json_timeval(*tv, NULL),
422                 "dialplan_app", new_snapshot->appl,
423                 "dialplan_app_data", new_snapshot->data,
424                 "channel", json_channel);
425 }
426
427 static struct ast_json *channel_callerid(
428         struct ast_channel_snapshot *old_snapshot,
429         struct ast_channel_snapshot *new_snapshot,
430         const struct timeval *tv)
431 {
432         struct ast_json *json_channel;
433
434         /* No NewCallerid event on cache clear or first event */
435         if (!old_snapshot || !new_snapshot) {
436                 return NULL;
437         }
438
439         if (ast_channel_snapshot_caller_id_equal(old_snapshot, new_snapshot)) {
440                 return NULL;
441         }
442
443         json_channel = ast_channel_snapshot_to_json(new_snapshot, stasis_app_get_sanitizer());
444         if (!json_channel) {
445                 return NULL;
446         }
447
448         return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
449                 "type", "ChannelCallerId",
450                 "timestamp", ast_json_timeval(*tv, NULL),
451                 "caller_presentation", new_snapshot->caller_pres,
452                 "caller_presentation_txt", ast_describe_caller_presentation(
453                         new_snapshot->caller_pres),
454                 "channel", json_channel);
455 }
456
457 static channel_snapshot_monitor channel_monitors[] = {
458         channel_state,
459         channel_dialplan,
460         channel_callerid,
461 };
462
463 static void sub_channel_update_handler(void *data,
464         struct stasis_subscription *sub,
465         struct stasis_message *message)
466 {
467         struct stasis_app *app = data;
468         struct stasis_cache_update *update;
469         struct ast_channel_snapshot *new_snapshot;
470         struct ast_channel_snapshot *old_snapshot;
471         const struct timeval *tv;
472         int i;
473
474         ast_assert(stasis_message_type(message) == stasis_cache_update_type());
475
476         update = stasis_message_data(message);
477
478         ast_assert(update->type == ast_channel_snapshot_type());
479
480         new_snapshot = stasis_message_data(update->new_snapshot);
481         old_snapshot = stasis_message_data(update->old_snapshot);
482
483         /* Pull timestamp from the new snapshot, or from the update message
484          * when there isn't one. */
485         tv = update->new_snapshot ?
486                 stasis_message_timestamp(update->new_snapshot) :
487                 stasis_message_timestamp(message);
488
489         for (i = 0; i < ARRAY_LEN(channel_monitors); ++i) {
490                 RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
491
492                 msg = channel_monitors[i](old_snapshot, new_snapshot, tv);
493                 if (msg) {
494                         app_send(app, msg);
495                 }
496         }
497
498         if (!new_snapshot && old_snapshot) {
499                 unsubscribe(app, "channel", old_snapshot->uniqueid, 1);
500         }
501 }
502
503 static struct ast_json *simple_endpoint_event(
504         const char *type,
505         struct ast_endpoint_snapshot *snapshot,
506         const struct timeval *tv)
507 {
508         struct ast_json *json_endpoint = ast_endpoint_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
509
510         if (!json_endpoint) {
511                 return NULL;
512         }
513
514         return ast_json_pack("{s: s, s: o, s: o}",
515                 "type", type,
516                 "timestamp", ast_json_timeval(*tv, NULL),
517                 "endpoint", json_endpoint);
518 }
519
520 static int message_received_handler(const char *endpoint_id, struct ast_json *json_msg, void *pvt)
521 {
522         RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
523         struct ast_json *json_endpoint;
524         struct stasis_app *app = pvt;
525         char *tech;
526         char *resource;
527
528         tech = ast_strdupa(endpoint_id);
529         resource = strchr(tech, '/');
530         if (resource) {
531                 resource[0] = '\0';
532                 resource++;
533         }
534
535         if (ast_strlen_zero(tech) || ast_strlen_zero(resource)) {
536                 return -1;
537         }
538
539         snapshot = ast_endpoint_latest_snapshot(tech, resource);
540         if (!snapshot) {
541                 return -1;
542         }
543
544         json_endpoint = ast_endpoint_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
545         if (!json_endpoint) {
546                 return -1;
547         }
548
549         app_send(app, ast_json_pack("{s: s, s: o, s: o, s: O}",
550                 "type", "TextMessageReceived",
551                 "timestamp", ast_json_timeval(ast_tvnow(), NULL),
552                 "endpoint", json_endpoint,
553                 "message", json_msg));
554
555         return 0;
556 }
557
558 static void sub_endpoint_update_handler(void *data,
559         struct stasis_subscription *sub,
560         struct stasis_message *message)
561 {
562         RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
563         struct stasis_app *app = data;
564         struct stasis_cache_update *update;
565         struct ast_endpoint_snapshot *new_snapshot;
566         struct ast_endpoint_snapshot *old_snapshot;
567         const struct timeval *tv;
568
569         ast_assert(stasis_message_type(message) == stasis_cache_update_type());
570
571         update = stasis_message_data(message);
572
573         ast_assert(update->type == ast_endpoint_snapshot_type());
574
575         new_snapshot = stasis_message_data(update->new_snapshot);
576         old_snapshot = stasis_message_data(update->old_snapshot);
577
578         if (new_snapshot) {
579                 tv = stasis_message_timestamp(update->new_snapshot);
580
581                 json = simple_endpoint_event("EndpointStateChange", new_snapshot, tv);
582                 if (!json) {
583                         return;
584                 }
585
586                 app_send(app, json);
587         }
588
589         if (!new_snapshot && old_snapshot) {
590                 unsubscribe(app, "endpoint", old_snapshot->id, 1);
591         }
592 }
593
594 static struct ast_json *simple_bridge_event(
595         const char *type,
596         struct ast_bridge_snapshot *snapshot,
597         const struct timeval *tv)
598 {
599         struct ast_json *json_bridge = ast_bridge_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
600         if (!json_bridge) {
601                 return NULL;
602         }
603
604         return ast_json_pack("{s: s, s: o, s: o}",
605                 "type", type,
606                 "timestamp", ast_json_timeval(*tv, NULL),
607                 "bridge", json_bridge);
608 }
609
610 static void sub_bridge_update_handler(void *data,
611         struct stasis_subscription *sub,
612         struct stasis_message *message)
613 {
614         RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
615         struct stasis_app *app = data;
616         struct stasis_cache_update *update;
617         struct ast_bridge_snapshot *new_snapshot;
618         struct ast_bridge_snapshot *old_snapshot;
619         const struct timeval *tv;
620
621         ast_assert(stasis_message_type(message) == stasis_cache_update_type());
622
623         update = stasis_message_data(message);
624
625         ast_assert(update->type == ast_bridge_snapshot_type());
626
627         new_snapshot = stasis_message_data(update->new_snapshot);
628         old_snapshot = stasis_message_data(update->old_snapshot);
629         tv = update->new_snapshot ?
630                 stasis_message_timestamp(update->new_snapshot) :
631                 stasis_message_timestamp(message);
632
633         if (!new_snapshot) {
634                 json = simple_bridge_event("BridgeDestroyed", old_snapshot, tv);
635         } else if (!old_snapshot) {
636                 json = simple_bridge_event("BridgeCreated", new_snapshot, tv);
637         }
638
639         if (json) {
640                 app_send(app, json);
641         }
642
643         if (!new_snapshot && old_snapshot) {
644                 unsubscribe(app, "bridge", old_snapshot->uniqueid, 1);
645         }
646 }
647
648
649 /*! \brief Helper function for determining if the application is subscribed to a given entity */
650 static int bridge_app_subscribed(struct stasis_app *app, const char *uniqueid)
651 {
652         struct app_forwards *forwards = NULL;
653
654         forwards = ao2_find(app->forwards, uniqueid, OBJ_SEARCH_KEY);
655         if (!forwards) {
656                 return 0;
657         }
658
659         ao2_ref(forwards, -1);
660         return 1;
661 }
662
663 static void bridge_merge_handler(void *data, struct stasis_subscription *sub,
664         struct stasis_message *message)
665 {
666         struct stasis_app *app = data;
667         struct ast_bridge_merge_message *merge;
668
669         merge = stasis_message_data(message);
670
671         /* Find out if we're subscribed to either bridge */
672         if (bridge_app_subscribed(app, merge->from->uniqueid) ||
673                 bridge_app_subscribed(app, merge->to->uniqueid)) {
674                 /* Forward the message to the app */
675                 stasis_publish(app->topic, message);
676         }
677 }
678
679 /*! \brief Callback function for checking if channels in a bridge are subscribed to */
680 static int bridge_app_subscribed_involved(struct stasis_app *app, struct ast_bridge_snapshot *snapshot)
681 {
682         int subscribed = 0;
683         struct ao2_iterator iter;
684         char *uniqueid;
685
686         if (bridge_app_subscribed(app, snapshot->uniqueid)) {
687                 return 1;
688         }
689
690         iter = ao2_iterator_init(snapshot->channels, 0);
691         for (; (uniqueid = ao2_iterator_next(&iter)); ao2_ref(uniqueid, -1)) {
692                 if (bridge_app_subscribed(app, uniqueid)) {
693                         subscribed = 1;
694                         ao2_ref(uniqueid, -1);
695                         break;
696                 }
697         }
698         ao2_iterator_destroy(&iter);
699
700         return subscribed;
701 }
702
703 static void set_replacement_channel(struct ast_channel_snapshot *to_be_replaced,
704                 struct ast_channel_snapshot *replacing)
705 {
706         struct stasis_app_control *control = stasis_app_control_find_by_channel_id(
707                 to_be_replaced->uniqueid);
708         struct ast_channel *chan = ast_channel_get_by_name(replacing->uniqueid);
709
710         if (control && chan) {
711                 ast_channel_lock(chan);
712                 app_set_replace_channel_app(chan, app_name(control_app(control)));
713                 app_set_replace_channel_snapshot(chan, to_be_replaced);
714                 ast_channel_unlock(chan);
715         }
716         ast_channel_cleanup(chan);
717         ao2_cleanup(control);
718 }
719
720 static void bridge_blind_transfer_handler(void *data, struct stasis_subscription *sub,
721         struct stasis_message *message)
722 {
723         struct stasis_app *app = data;
724         struct ast_blind_transfer_message *transfer_msg = stasis_message_data(message);
725         struct ast_bridge_snapshot *bridge = transfer_msg->to_transferee.bridge_snapshot;
726
727         if (transfer_msg->replace_channel) {
728                 set_replacement_channel(transfer_msg->to_transferee.channel_snapshot,
729                                 transfer_msg->replace_channel);
730         }
731
732         if (bridge_app_subscribed(app, transfer_msg->to_transferee.channel_snapshot->uniqueid) ||
733                 (bridge && bridge_app_subscribed_involved(app, bridge))) {
734                 stasis_publish(app->topic, message);
735         }
736 }
737
738 static void bridge_attended_transfer_handler(void *data, struct stasis_subscription *sub,
739         struct stasis_message *message)
740 {
741         struct stasis_app *app = data;
742         struct ast_attended_transfer_message *transfer_msg = stasis_message_data(message);
743         int subscribed = 0;
744
745         subscribed = bridge_app_subscribed(app, transfer_msg->to_transferee.channel_snapshot->uniqueid);
746         if (!subscribed) {
747                 subscribed = bridge_app_subscribed(app, transfer_msg->to_transfer_target.channel_snapshot->uniqueid);
748         }
749         if (!subscribed && transfer_msg->to_transferee.bridge_snapshot) {
750                 subscribed = bridge_app_subscribed_involved(app, transfer_msg->to_transferee.bridge_snapshot);
751         }
752         if (!subscribed && transfer_msg->to_transfer_target.bridge_snapshot) {
753                 subscribed = bridge_app_subscribed_involved(app, transfer_msg->to_transfer_target.bridge_snapshot);
754         }
755
756         if (!subscribed) {
757                 switch (transfer_msg->dest_type) {
758                 case AST_ATTENDED_TRANSFER_DEST_BRIDGE_MERGE:
759                         subscribed = bridge_app_subscribed(app, transfer_msg->dest.bridge);
760                         break;
761                 case AST_ATTENDED_TRANSFER_DEST_LINK:
762                         subscribed = bridge_app_subscribed(app, transfer_msg->dest.links[0]->uniqueid);
763                         if (!subscribed) {
764                                 subscribed = bridge_app_subscribed(app, transfer_msg->dest.links[1]->uniqueid);
765                         }
766                         break;
767                 break;
768                 case AST_ATTENDED_TRANSFER_DEST_THREEWAY:
769                         subscribed = bridge_app_subscribed_involved(app, transfer_msg->dest.threeway.bridge_snapshot);
770                         if (!subscribed) {
771                                 subscribed = bridge_app_subscribed(app, transfer_msg->dest.threeway.channel_snapshot->uniqueid);
772                         }
773                         break;
774                 default:
775                         break;
776                 }
777         }
778
779         if (subscribed) {
780                 stasis_publish(app->topic, message);
781         }
782
783         if (transfer_msg->replace_channel) {
784                 set_replacement_channel(transfer_msg->to_transferee.channel_snapshot,
785                                 transfer_msg->replace_channel);
786         }
787
788         if (transfer_msg->dest_type == AST_ATTENDED_TRANSFER_DEST_LINK) {
789                 set_replacement_channel(transfer_msg->to_transferee.channel_snapshot,
790                                 transfer_msg->dest.links[0]);
791                 set_replacement_channel(transfer_msg->to_transfer_target.channel_snapshot,
792                                 transfer_msg->dest.links[1]);
793         }
794 }
795
796 static void bridge_default_handler(void *data, struct stasis_subscription *sub,
797         struct stasis_message *message)
798 {
799         struct stasis_app *app = data;
800
801         if (stasis_subscription_final_message(sub, message)) {
802                 ao2_cleanup(app);
803         }
804 }
805
806 struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *data)
807 {
808         RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
809         size_t size;
810         int res = 0;
811
812         ast_assert(name != NULL);
813         ast_assert(handler != NULL);
814
815         ast_verb(1, "Creating Stasis app '%s'\n", name);
816
817         size = sizeof(*app) + strlen(name) + 1;
818         app = ao2_alloc_options(size, app_dtor, AO2_ALLOC_OPT_LOCK_MUTEX);
819
820         if (!app) {
821                 return NULL;
822         }
823
824         app->forwards = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_MUTEX,
825                 AO2_CONTAINER_ALLOC_OPT_DUPS_OBJ_REJECT,
826                 forwards_sort, NULL);
827         if (!app->forwards) {
828                 return NULL;
829         }
830
831         app->topic = stasis_topic_create(name);
832         if (!app->topic) {
833                 return NULL;
834         }
835
836         app->bridge_router = stasis_message_router_create(ast_bridge_topic_all());
837         if (!app->bridge_router) {
838                 return NULL;
839         }
840
841         res |= stasis_message_router_add(app->bridge_router,
842                 ast_bridge_merge_message_type(), bridge_merge_handler, app);
843
844         res |= stasis_message_router_add(app->bridge_router,
845                 ast_blind_transfer_type(), bridge_blind_transfer_handler, app);
846
847         res |= stasis_message_router_add(app->bridge_router,
848                 ast_attended_transfer_type(), bridge_attended_transfer_handler, app);
849
850         res |= stasis_message_router_set_default(app->bridge_router,
851                 bridge_default_handler, app);
852
853         if (res != 0) {
854                 return NULL;
855         }
856         /* Bridge router holds a reference */
857         ao2_ref(app, +1);
858
859         app->router = stasis_message_router_create(app->topic);
860         if (!app->router) {
861                 return NULL;
862         }
863
864         res |= stasis_message_router_add_cache_update(app->router,
865                 ast_bridge_snapshot_type(), sub_bridge_update_handler, app);
866
867         res |= stasis_message_router_add_cache_update(app->router,
868                 ast_channel_snapshot_type(), sub_channel_update_handler, app);
869
870         res |= stasis_message_router_add_cache_update(app->router,
871                 ast_endpoint_snapshot_type(), sub_endpoint_update_handler, app);
872
873         res |= stasis_message_router_set_default(app->router,
874                 sub_default_handler, app);
875
876         if (res != 0) {
877                 return NULL;
878         }
879         /* Router holds a reference */
880         ao2_ref(app, +1);
881
882         strncpy(app->name, name, size - sizeof(*app));
883         app->handler = handler;
884         ao2_ref(data, +1);
885         app->data = data;
886
887         ao2_ref(app, +1);
888         return app;
889 }
890
891 struct stasis_topic *ast_app_get_topic(struct stasis_app *app) {
892         return app->topic;
893 }
894
895 /*!
896  * \brief Send a message to the given application.
897  * \param app App to send the message to.
898  * \param message Message to send.
899  */
900 void app_send(struct stasis_app *app, struct ast_json *message)
901 {
902         stasis_app_cb handler;
903         RAII_VAR(void *, data, NULL, ao2_cleanup);
904
905         /* Copy off mutable state with lock held */
906         {
907                 SCOPED_AO2LOCK(lock, app);
908                 handler = app->handler;
909                 if (app->data) {
910                         ao2_ref(app->data, +1);
911                         data = app->data;
912                 }
913                 /* Name is immutable; no need to copy */
914         }
915
916         if (!handler) {
917                 ast_verb(3,
918                         "Inactive Stasis app '%s' missed message\n", app->name);
919                 return;
920         }
921
922         handler(data, app->name, message);
923 }
924
925 void app_deactivate(struct stasis_app *app)
926 {
927         SCOPED_AO2LOCK(lock, app);
928         ast_verb(1, "Deactivating Stasis app '%s'\n", app->name);
929         app->handler = NULL;
930         ao2_cleanup(app->data);
931         app->data = NULL;
932 }
933
934 void app_shutdown(struct stasis_app *app)
935 {
936         SCOPED_AO2LOCK(lock, app);
937
938         ast_assert(app_is_finished(app));
939
940         stasis_message_router_unsubscribe(app->router);
941         app->router = NULL;
942         stasis_message_router_unsubscribe(app->bridge_router);
943         app->bridge_router = NULL;
944 }
945
946 int app_is_active(struct stasis_app *app)
947 {
948         SCOPED_AO2LOCK(lock, app);
949         return app->handler != NULL;
950 }
951
952 int app_is_finished(struct stasis_app *app)
953 {
954         SCOPED_AO2LOCK(lock, app);
955
956         return app->handler == NULL && ao2_container_count(app->forwards) == 0;
957 }
958
959 void app_update(struct stasis_app *app, stasis_app_cb handler, void *data)
960 {
961         SCOPED_AO2LOCK(lock, app);
962
963         if (app->handler) {
964                 RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
965
966                 ast_verb(1, "Replacing Stasis app '%s'\n", app->name);
967
968                 msg = ast_json_pack("{s: s, s: s}",
969                         "type", "ApplicationReplaced",
970                         "application", app->name);
971                 if (msg) {
972                         app_send(app, msg);
973                 }
974         } else {
975                 ast_verb(1, "Activating Stasis app '%s'\n", app->name);
976         }
977
978         app->handler = handler;
979         ao2_cleanup(app->data);
980         if (data) {
981                 ao2_ref(data, +1);
982         }
983         app->data = data;
984 }
985
986 const char *app_name(const struct stasis_app *app)
987 {
988         return app->name;
989 }
990
991 struct ast_json *app_to_json(const struct stasis_app *app)
992 {
993         RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
994         struct ast_json *channels;
995         struct ast_json *bridges;
996         struct ast_json *endpoints;
997         struct ao2_iterator i;
998         void *obj;
999
1000         json = ast_json_pack("{s: s, s: [], s: [], s: []}",
1001                 "name", app->name,
1002                 "channel_ids", "bridge_ids", "endpoint_ids");
1003         channels = ast_json_object_get(json, "channel_ids");
1004         bridges = ast_json_object_get(json, "bridge_ids");
1005         endpoints = ast_json_object_get(json, "endpoint_ids");
1006
1007         i = ao2_iterator_init(app->forwards, 0);
1008         while ((obj = ao2_iterator_next(&i))) {
1009                 RAII_VAR(struct app_forwards *, forwards, obj, ao2_cleanup);
1010                 RAII_VAR(struct ast_json *, id, NULL, ast_json_unref);
1011                 int append_res = -1;
1012
1013                 id = ast_json_string_create(forwards->id);
1014
1015                 switch (forwards->forward_type) {
1016                 case FORWARD_CHANNEL:
1017                         append_res = ast_json_array_append(channels,
1018                                 ast_json_ref(id));
1019                         break;
1020                 case FORWARD_BRIDGE:
1021                         append_res = ast_json_array_append(bridges,
1022                                 ast_json_ref(id));
1023                         break;
1024                 case FORWARD_ENDPOINT:
1025                         append_res = ast_json_array_append(endpoints,
1026                                 ast_json_ref(id));
1027                         break;
1028                 }
1029
1030                 if (append_res != 0) {
1031                         ast_log(LOG_ERROR, "Error building response\n");
1032                         ao2_iterator_destroy(&i);
1033                         return NULL;
1034                 }
1035         }
1036         ao2_iterator_destroy(&i);
1037
1038         return ast_json_ref(json);
1039 }
1040
1041 int app_subscribe_channel(struct stasis_app *app, struct ast_channel *chan)
1042 {
1043         int res;
1044
1045         if (!app || !chan) {
1046                 return -1;
1047         } else {
1048                 RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
1049                 SCOPED_AO2LOCK(lock, app->forwards);
1050
1051                 forwards = ao2_find(app->forwards, ast_channel_uniqueid(chan),
1052                         OBJ_SEARCH_KEY | OBJ_NOLOCK);
1053                 if (!forwards) {
1054                         /* Forwards not found, create one */
1055                         forwards = forwards_create_channel(app, chan);
1056                         if (!forwards) {
1057                                 return -1;
1058                         }
1059
1060                         res = ao2_link_flags(app->forwards, forwards,
1061                                 OBJ_NOLOCK);
1062                         if (!res) {
1063                                 return -1;
1064                         }
1065                 }
1066
1067                 ++forwards->interested;
1068                 ast_debug(3, "Channel '%s' is %d interested in %s\n", ast_channel_uniqueid(chan), forwards->interested, app->name);
1069                 return 0;
1070         }
1071 }
1072
1073 static int subscribe_channel(struct stasis_app *app, void *obj)
1074 {
1075         return app_subscribe_channel(app, obj);
1076 }
1077
1078 static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate)
1079 {
1080         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
1081         SCOPED_AO2LOCK(lock, app->forwards);
1082
1083         forwards = ao2_find(app->forwards, id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1084         if (!forwards) {
1085                 ast_debug(3, "App '%s' not subscribed to %s '%s'\n", app->name, kind, id);
1086                 return -1;
1087         }
1088         forwards->interested--;
1089
1090         ast_debug(3, "%s '%s': is %d interested in %s\n", kind, id, forwards->interested, app->name);
1091         if (forwards->interested == 0 || terminate) {
1092                 /* No one is interested any more; unsubscribe */
1093                 ast_debug(3, "%s '%s' unsubscribed from %s\n", kind, id, app->name);
1094                 forwards_unsubscribe(forwards);
1095                 ao2_find(app->forwards, forwards,
1096                         OBJ_POINTER | OBJ_NOLOCK | OBJ_UNLINK |
1097                         OBJ_NODATA);
1098
1099                 if (!strcmp(kind, "endpoint")) {
1100                         messaging_app_unsubscribe_endpoint(app->name, id);
1101                 }
1102         }
1103
1104         return 0;
1105 }
1106
1107 int app_unsubscribe_channel(struct stasis_app *app, struct ast_channel *chan)
1108 {
1109         if (!app || !chan) {
1110                 return -1;
1111         }
1112
1113         return app_unsubscribe_channel_id(app, ast_channel_uniqueid(chan));
1114 }
1115
1116 int app_unsubscribe_channel_id(struct stasis_app *app, const char *channel_id)
1117 {
1118         if (!app || !channel_id) {
1119                 return -1;
1120         }
1121
1122         return unsubscribe(app, "channel", channel_id, 0);
1123 }
1124
1125 int app_is_subscribed_channel_id(struct stasis_app *app, const char *channel_id)
1126 {
1127         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
1128         forwards = ao2_find(app->forwards, channel_id, OBJ_SEARCH_KEY);
1129         return forwards != NULL;
1130 }
1131
1132 int app_replace_channel_forwards(struct stasis_app *app, const char *old_id, struct ast_channel *new_chan)
1133 {
1134         RAII_VAR(struct app_forwards *, old_forwards, NULL, ao2_cleanup);
1135         struct app_forwards *new_forwards;
1136
1137         old_forwards = ao2_find(app->forwards, old_id, OBJ_SEARCH_KEY | OBJ_UNLINK);
1138         if (!old_forwards) {
1139                 return -1;
1140         }
1141
1142         new_forwards = forwards_create_channel(app, new_chan);
1143         if (!new_forwards) {
1144                 return -1;
1145         }
1146
1147         new_forwards->interested = old_forwards->interested;
1148         ao2_link_flags(app->forwards, new_forwards, 0);
1149         ao2_cleanup(new_forwards);
1150
1151         /* Clean up old forwards */
1152         forwards_unsubscribe(old_forwards);
1153         return 0;
1154 }
1155
1156 static void *channel_find(const struct stasis_app *app, const char *id)
1157 {
1158         return ast_channel_get_by_name(id);
1159 }
1160
1161 struct stasis_app_event_source channel_event_source = {
1162         .scheme = "channel:",
1163         .find = channel_find,
1164         .subscribe = subscribe_channel,
1165         .unsubscribe = app_unsubscribe_channel_id,
1166         .is_subscribed = app_is_subscribed_channel_id
1167 };
1168
1169 int app_subscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
1170 {
1171         if (!app || !bridge) {
1172                 return -1;
1173         } else {
1174                 RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
1175                 SCOPED_AO2LOCK(lock, app->forwards);
1176
1177                 forwards = ao2_find(app->forwards, bridge->uniqueid,
1178                         OBJ_SEARCH_KEY | OBJ_NOLOCK);
1179
1180                 if (!forwards) {
1181                         /* Forwards not found, create one */
1182                         forwards = forwards_create_bridge(app, bridge);
1183                         if (!forwards) {
1184                                 return -1;
1185                         }
1186                         ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
1187                 }
1188
1189                 ++forwards->interested;
1190                 ast_debug(3, "Bridge '%s' is %d interested in %s\n", bridge->uniqueid, forwards->interested, app->name);
1191                 return 0;
1192         }
1193 }
1194
1195 static int subscribe_bridge(struct stasis_app *app, void *obj)
1196 {
1197         return app_subscribe_bridge(app, obj);
1198 }
1199
1200 int app_unsubscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
1201 {
1202         if (!app || !bridge) {
1203                 return -1;
1204         }
1205
1206         return app_unsubscribe_bridge_id(app, bridge->uniqueid);
1207 }
1208
1209 int app_unsubscribe_bridge_id(struct stasis_app *app, const char *bridge_id)
1210 {
1211         if (!app || !bridge_id) {
1212                 return -1;
1213         }
1214
1215         return unsubscribe(app, "bridge", bridge_id, 0);
1216 }
1217
1218 int app_is_subscribed_bridge_id(struct stasis_app *app, const char *bridge_id)
1219 {
1220         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
1221         forwards = ao2_find(app->forwards, bridge_id, OBJ_SEARCH_KEY);
1222         return forwards != NULL;
1223 }
1224
1225 static void *bridge_find(const struct stasis_app *app, const char *id)
1226 {
1227         return stasis_app_bridge_find_by_id(id);
1228 }
1229
1230 struct stasis_app_event_source bridge_event_source = {
1231         .scheme = "bridge:",
1232         .find = bridge_find,
1233         .subscribe = subscribe_bridge,
1234         .unsubscribe = app_unsubscribe_bridge_id,
1235         .is_subscribed = app_is_subscribed_bridge_id
1236 };
1237
1238 int app_subscribe_endpoint(struct stasis_app *app, struct ast_endpoint *endpoint)
1239 {
1240         if (!app || !endpoint) {
1241                 return -1;
1242         } else {
1243                 RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
1244                 SCOPED_AO2LOCK(lock, app->forwards);
1245
1246                 forwards = ao2_find(app->forwards, ast_endpoint_get_id(endpoint),
1247                         OBJ_SEARCH_KEY | OBJ_NOLOCK);
1248
1249                 if (!forwards) {
1250                         /* Forwards not found, create one */
1251                         forwards = forwards_create_endpoint(app, endpoint);
1252                         if (!forwards) {
1253                                 return -1;
1254                         }
1255                         ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
1256
1257                         /* Subscribe for messages */
1258                         messaging_app_subscribe_endpoint(app->name, endpoint, &message_received_handler, app);
1259                 }
1260
1261                 ++forwards->interested;
1262                 ast_debug(3, "Endpoint '%s' is %d interested in %s\n", ast_endpoint_get_id(endpoint), forwards->interested, app->name);
1263                 return 0;
1264         }
1265 }
1266
1267 static int subscribe_endpoint(struct stasis_app *app, void *obj)
1268 {
1269         return app_subscribe_endpoint(app, obj);
1270 }
1271
1272 int app_unsubscribe_endpoint_id(struct stasis_app *app, const char *endpoint_id)
1273 {
1274         if (!app || !endpoint_id) {
1275                 return -1;
1276         }
1277
1278         return unsubscribe(app, "endpoint", endpoint_id, 0);
1279 }
1280
1281 int app_is_subscribed_endpoint_id(struct stasis_app *app, const char *endpoint_id)
1282 {
1283         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
1284         forwards = ao2_find(app->forwards, endpoint_id, OBJ_SEARCH_KEY);
1285         return forwards != NULL;
1286 }
1287
1288 static void *endpoint_find(const struct stasis_app *app, const char *id)
1289 {
1290         return ast_endpoint_find_by_id(id);
1291 }
1292
1293 struct stasis_app_event_source endpoint_event_source = {
1294         .scheme = "endpoint:",
1295         .find = endpoint_find,
1296         .subscribe = subscribe_endpoint,
1297         .unsubscribe = app_unsubscribe_endpoint_id,
1298         .is_subscribed = app_is_subscribed_endpoint_id
1299 };
1300
1301 void stasis_app_register_event_sources(void)
1302 {
1303         stasis_app_register_event_source(&channel_event_source);
1304         stasis_app_register_event_source(&bridge_event_source);
1305         stasis_app_register_event_source(&endpoint_event_source);
1306 }
1307
1308 int stasis_app_is_core_event_source(struct stasis_app_event_source *obj)
1309 {
1310         return obj == &endpoint_event_source ||
1311                 obj == &bridge_event_source ||
1312                 obj == &channel_event_source;
1313 }
1314
1315 void stasis_app_unregister_event_sources(void)
1316 {
1317         stasis_app_unregister_event_source(&endpoint_event_source);
1318         stasis_app_unregister_event_source(&bridge_event_source);
1319         stasis_app_unregister_event_source(&channel_event_source);
1320 }
1321
1322