ARI: Add the ability to subscribe to all events
[asterisk/asterisk.git] / res / stasis / app.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis application support.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 #include "asterisk.h"
27
28 ASTERISK_REGISTER_FILE()
29
30 #include "app.h"
31 #include "control.h"
32 #include "messaging.h"
33
34 #include "asterisk/callerid.h"
35 #include "asterisk/stasis_app.h"
36 #include "asterisk/stasis_bridges.h"
37 #include "asterisk/stasis_channels.h"
38 #include "asterisk/stasis_endpoints.h"
39 #include "asterisk/stasis_message_router.h"
40
41 #define BRIDGE_ALL "__AST_BRIDGE_ALL_TOPIC"
42 #define CHANNEL_ALL "__AST_CHANNEL_ALL_TOPIC"
43 #define ENDPOINT_ALL "__AST_ENDPOINT_ALL_TOPIC"
44
45 static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate);
46
47 struct stasis_app {
48         /*! Aggregation topic for this application. */
49         struct stasis_topic *topic;
50         /*! Router for handling messages forwarded to \a topic. */
51         struct stasis_message_router *router;
52         /*! Router for handling messages to the bridge all \a topic. */
53         struct stasis_message_router *bridge_router;
54         /*! Optional router for handling endpoint messages in 'all' subscriptions */
55         struct stasis_message_router *endpoint_router;
56         /*! Container of the channel forwards to this app's topic. */
57         struct ao2_container *forwards;
58         /*! Callback function for this application. */
59         stasis_app_cb handler;
60         /*! Opaque data to hand to callback function. */
61         void *data;
62         /*! Subscription model for the application */
63         enum stasis_app_subscription_model subscription_model;
64         /*! Name of the Stasis application */
65         char name[];
66 };
67
68 enum forward_type {
69         FORWARD_CHANNEL,
70         FORWARD_BRIDGE,
71         FORWARD_ENDPOINT,
72 };
73
74 /*! Subscription info for a particular channel/bridge. */
75 struct app_forwards {
76         /*! Count of number of times this channel/bridge has been subscribed */
77         int interested;
78
79         /*! Forward for the regular topic */
80         struct stasis_forward *topic_forward;
81         /*! Forward for the caching topic */
82         struct stasis_forward *topic_cached_forward;
83
84         /* Type of object being forwarded */
85         enum forward_type forward_type;
86         /*! Unique id of the object being forwarded */
87         char id[];
88 };
89
90 static void forwards_dtor(void *obj)
91 {
92 #ifdef AST_DEVMODE
93         struct app_forwards *forwards = obj;
94 #endif /* AST_DEVMODE */
95
96         ast_assert(forwards->topic_forward == NULL);
97         ast_assert(forwards->topic_cached_forward == NULL);
98 }
99
100 static void forwards_unsubscribe(struct app_forwards *forwards)
101 {
102         stasis_forward_cancel(forwards->topic_forward);
103         forwards->topic_forward = NULL;
104         stasis_forward_cancel(forwards->topic_cached_forward);
105         forwards->topic_cached_forward = NULL;
106 }
107
108 static struct app_forwards *forwards_create(struct stasis_app *app,
109         const char *id)
110 {
111         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
112
113         if (!app || ast_strlen_zero(id)) {
114                 return NULL;
115         }
116
117         forwards = ao2_alloc(sizeof(*forwards) + strlen(id) + 1, forwards_dtor);
118         if (!forwards) {
119                 return NULL;
120         }
121
122         strcpy(forwards->id, id);
123
124         ao2_ref(forwards, +1);
125         return forwards;
126 }
127
128 /*! Forward a channel's topics to an app */
129 static struct app_forwards *forwards_create_channel(struct stasis_app *app,
130         struct ast_channel *chan)
131 {
132         struct app_forwards *forwards;
133
134         if (!app) {
135                 return NULL;
136         }
137
138         forwards = forwards_create(app, chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL);
139         if (!forwards) {
140                 return NULL;
141         }
142
143         forwards->forward_type = FORWARD_CHANNEL;
144         if (chan) {
145                 forwards->topic_forward = stasis_forward_all(ast_channel_topic(chan),
146                         app->topic);
147         }
148         forwards->topic_cached_forward = stasis_forward_all(
149                 chan ? ast_channel_topic_cached(chan) : ast_channel_topic_all_cached(),
150                 app->topic);
151
152         if ((!forwards->topic_forward && chan) || !forwards->topic_cached_forward) {
153                 /* Half-subscribed is a bad thing */
154                 forwards_unsubscribe(forwards);
155                 ao2_ref(forwards, -1);
156                 return NULL;
157         }
158
159         return forwards;
160 }
161
162 /*! Forward a bridge's topics to an app */
163 static struct app_forwards *forwards_create_bridge(struct stasis_app *app,
164         struct ast_bridge *bridge)
165 {
166         struct app_forwards *forwards;
167
168         if (!app) {
169                 return NULL;
170         }
171
172         forwards = forwards_create(app, bridge ? bridge->uniqueid : BRIDGE_ALL);
173         if (!forwards) {
174                 return NULL;
175         }
176
177         forwards->forward_type = FORWARD_BRIDGE;
178         if (bridge) {
179                 forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge),
180                         app->topic);
181         }
182         forwards->topic_cached_forward = stasis_forward_all(
183                 bridge ? ast_bridge_topic_cached(bridge) : ast_bridge_topic_all_cached(),
184                 app->topic);
185
186         if ((!forwards->topic_forward && bridge) || !forwards->topic_cached_forward) {
187                 /* Half-subscribed is a bad thing */
188                 forwards_unsubscribe(forwards);
189                 ao2_ref(forwards, -1);
190                 return NULL;
191         }
192
193         return forwards;
194 }
195
196 static void endpoint_state_cb(void *data, struct stasis_subscription *sub,
197         struct stasis_message *message)
198 {
199         struct stasis_app *app = data;
200
201         stasis_publish(app->topic, message);
202 }
203
204 /*! Forward a endpoint's topics to an app */
205 static struct app_forwards *forwards_create_endpoint(struct stasis_app *app,
206         struct ast_endpoint *endpoint)
207 {
208         struct app_forwards *forwards;
209         int ret = 0;
210
211         if (!app) {
212                 return NULL;
213         }
214
215         forwards = forwards_create(app, endpoint ? ast_endpoint_get_id(endpoint) : ENDPOINT_ALL);
216         if (!forwards) {
217                 return NULL;
218         }
219
220         forwards->forward_type = FORWARD_ENDPOINT;
221         if (endpoint) {
222                 forwards->topic_forward = stasis_forward_all(ast_endpoint_topic(endpoint),
223                         app->topic);
224                 forwards->topic_cached_forward = stasis_forward_all(
225                         ast_endpoint_topic_cached(endpoint), app->topic);
226
227                 if (!forwards->topic_forward || !forwards->topic_cached_forward) {
228                         /* Half-subscribed is a bad thing */
229                         forwards_unsubscribe(forwards);
230                         ao2_ref(forwards, -1);
231                         return NULL;
232                 }
233         } else {
234                 /* Since endpoint subscriptions also subscribe to channels, in the case
235                  * of all endpoint subscriptions, we only want messages for the endpoints.
236                  * As such, we route those particular messages and then re-publish them
237                  * on the app's topic.
238                  */
239                 ast_assert(app->endpoint_router == NULL);
240                 app->endpoint_router = stasis_message_router_create(ast_endpoint_topic_all_cached());
241                 if (!app->endpoint_router) {
242                         forwards_unsubscribe(forwards);
243                         ao2_ref(forwards, -1);
244                         return NULL;
245                 }
246
247                 ret |= stasis_message_router_add(app->endpoint_router,
248                         ast_endpoint_state_type(), endpoint_state_cb, app);
249                 ret |= stasis_message_router_add(app->endpoint_router,
250                         ast_endpoint_contact_state_type(), endpoint_state_cb, app);
251
252                 if (ret) {
253                         ao2_ref(app->endpoint_router, -1);
254                         app->endpoint_router = NULL;
255                         ao2_ref(forwards, -1);
256                         return NULL;
257                 }
258         }
259
260         return forwards;
261 }
262
263 static int forwards_sort(const void *obj_left, const void *obj_right, int flags)
264 {
265         const struct app_forwards *object_left = obj_left;
266         const struct app_forwards *object_right = obj_right;
267         const char *right_key = obj_right;
268         int cmp;
269
270         switch (flags & (OBJ_POINTER | OBJ_KEY | OBJ_PARTIAL_KEY)) {
271         case OBJ_POINTER:
272                 right_key = object_right->id;
273                 /* Fall through */
274         case OBJ_KEY:
275                 cmp = strcmp(object_left->id, right_key);
276                 break;
277         case OBJ_PARTIAL_KEY:
278                 /*
279                  * We could also use a partial key struct containing a length
280                  * so strlen() does not get called for every comparison instead.
281                  */
282                 cmp = strncmp(object_left->id, right_key, strlen(right_key));
283                 break;
284         default:
285                 /* Sort can only work on something with a full or partial key. */
286                 ast_assert(0);
287                 cmp = 0;
288                 break;
289         }
290         return cmp;
291 }
292
293 static void app_dtor(void *obj)
294 {
295         struct stasis_app *app = obj;
296
297         ast_verb(1, "Destroying Stasis app %s\n", app->name);
298
299         ast_assert(app->router == NULL);
300         ast_assert(app->bridge_router == NULL);
301         ast_assert(app->endpoint_router == NULL);
302
303         ao2_cleanup(app->topic);
304         app->topic = NULL;
305         ao2_cleanup(app->forwards);
306         app->forwards = NULL;
307         ao2_cleanup(app->data);
308         app->data = NULL;
309 }
310
311 static void call_forwarded_handler(struct stasis_app *app, struct stasis_message *message)
312 {
313         struct ast_multi_channel_blob *payload = stasis_message_data(message);
314         struct ast_channel_snapshot *snapshot = ast_multi_channel_blob_get_channel(payload, "forwarded");
315         struct ast_channel *chan;
316
317         if (!snapshot) {
318                 return;
319         }
320
321         chan = ast_channel_get_by_name(snapshot->uniqueid);
322         if (!chan) {
323                 return;
324         }
325
326         app_subscribe_channel(app, chan);
327         ast_channel_unref(chan);
328 }
329
330 static void sub_default_handler(void *data, struct stasis_subscription *sub,
331         struct stasis_message *message)
332 {
333         struct stasis_app *app = data;
334         RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
335
336         if (stasis_subscription_final_message(sub, message)) {
337                 ao2_cleanup(app);
338         }
339
340         if (stasis_message_type(message) == ast_channel_dial_type()) {
341                 call_forwarded_handler(app, message);
342         }
343
344         /* By default, send any message that has a JSON representation */
345         json = stasis_message_to_json(message, stasis_app_get_sanitizer());
346         if (!json) {
347                 return;
348         }
349
350         app_send(app, json);
351 }
352
353 /*! \brief Typedef for callbacks that get called on channel snapshot updates */
354 typedef struct ast_json *(*channel_snapshot_monitor)(
355         struct ast_channel_snapshot *old_snapshot,
356         struct ast_channel_snapshot *new_snapshot,
357         const struct timeval *tv);
358
359 static struct ast_json *simple_channel_event(
360         const char *type,
361         struct ast_channel_snapshot *snapshot,
362         const struct timeval *tv)
363 {
364         struct ast_json *json_channel = ast_channel_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
365
366         if (!json_channel) {
367                 return NULL;
368         }
369
370         return ast_json_pack("{s: s, s: o, s: o}",
371                 "type", type,
372                 "timestamp", ast_json_timeval(*tv, NULL),
373                 "channel", json_channel);
374 }
375
376 static struct ast_json *channel_created_event(
377         struct ast_channel_snapshot *snapshot,
378         const struct timeval *tv)
379 {
380         return simple_channel_event("ChannelCreated", snapshot, tv);
381 }
382
383 static struct ast_json *channel_destroyed_event(
384         struct ast_channel_snapshot *snapshot,
385         const struct timeval *tv)
386 {
387         struct ast_json *json_channel = ast_channel_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
388
389         if (!json_channel) {
390                 return NULL;
391         }
392
393         return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
394                 "type", "ChannelDestroyed",
395                 "timestamp", ast_json_timeval(*tv, NULL),
396                 "cause", snapshot->hangupcause,
397                 "cause_txt", ast_cause2str(snapshot->hangupcause),
398                 "channel", json_channel);
399 }
400
401 static struct ast_json *channel_state_change_event(
402         struct ast_channel_snapshot *snapshot,
403         const struct timeval *tv)
404 {
405         return simple_channel_event("ChannelStateChange", snapshot, tv);
406 }
407
408 /*! \brief Handle channel state changes */
409 static struct ast_json *channel_state(
410         struct ast_channel_snapshot *old_snapshot,
411         struct ast_channel_snapshot *new_snapshot,
412         const struct timeval *tv)
413 {
414         struct ast_channel_snapshot *snapshot = new_snapshot ?
415                 new_snapshot : old_snapshot;
416
417         if (!old_snapshot) {
418                 return channel_created_event(snapshot, tv);
419         } else if (!new_snapshot) {
420                 return channel_destroyed_event(snapshot, tv);
421         } else if (old_snapshot->state != new_snapshot->state) {
422                 return channel_state_change_event(snapshot, tv);
423         }
424
425         return NULL;
426 }
427
428 static struct ast_json *channel_dialplan(
429         struct ast_channel_snapshot *old_snapshot,
430         struct ast_channel_snapshot *new_snapshot,
431         const struct timeval *tv)
432 {
433         struct ast_json *json_channel;
434
435         /* No Newexten event on cache clear or first event */
436         if (!old_snapshot || !new_snapshot) {
437                 return NULL;
438         }
439
440         /* Empty application is not valid for a Newexten event */
441         if (ast_strlen_zero(new_snapshot->appl)) {
442                 return NULL;
443         }
444
445         if (ast_channel_snapshot_cep_equal(old_snapshot, new_snapshot)) {
446                 return NULL;
447         }
448
449         json_channel = ast_channel_snapshot_to_json(new_snapshot, stasis_app_get_sanitizer());
450         if (!json_channel) {
451                 return NULL;
452         }
453
454         return ast_json_pack("{s: s, s: o, s: s, s: s, s: o}",
455                 "type", "ChannelDialplan",
456                 "timestamp", ast_json_timeval(*tv, NULL),
457                 "dialplan_app", new_snapshot->appl,
458                 "dialplan_app_data", new_snapshot->data,
459                 "channel", json_channel);
460 }
461
462 static struct ast_json *channel_callerid(
463         struct ast_channel_snapshot *old_snapshot,
464         struct ast_channel_snapshot *new_snapshot,
465         const struct timeval *tv)
466 {
467         struct ast_json *json_channel;
468
469         /* No NewCallerid event on cache clear or first event */
470         if (!old_snapshot || !new_snapshot) {
471                 return NULL;
472         }
473
474         if (ast_channel_snapshot_caller_id_equal(old_snapshot, new_snapshot)) {
475                 return NULL;
476         }
477
478         json_channel = ast_channel_snapshot_to_json(new_snapshot, stasis_app_get_sanitizer());
479         if (!json_channel) {
480                 return NULL;
481         }
482
483         return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
484                 "type", "ChannelCallerId",
485                 "timestamp", ast_json_timeval(*tv, NULL),
486                 "caller_presentation", new_snapshot->caller_pres,
487                 "caller_presentation_txt", ast_describe_caller_presentation(
488                         new_snapshot->caller_pres),
489                 "channel", json_channel);
490 }
491
492 static struct ast_json *channel_connected_line(
493         struct ast_channel_snapshot *old_snapshot,
494         struct ast_channel_snapshot *new_snapshot,
495         const struct timeval *tv)
496 {
497         struct ast_json *json_channel;
498
499         /* No ChannelConnectedLine event on cache clear or first event */
500         if (!old_snapshot || !new_snapshot) {
501                 return NULL;
502         }
503
504         if (ast_channel_snapshot_connected_line_equal(old_snapshot, new_snapshot)) {
505                 return NULL;
506         }
507
508         json_channel = ast_channel_snapshot_to_json(new_snapshot, stasis_app_get_sanitizer());
509         if (!json_channel) {
510                 return NULL;
511         }
512
513         return ast_json_pack("{s: s, s: o, s: o}",
514                 "type", "ChannelConnectedLine",
515                 "timestamp", ast_json_timeval(*tv, NULL),
516                 "channel", json_channel);
517 }
518
519 static channel_snapshot_monitor channel_monitors[] = {
520         channel_state,
521         channel_dialplan,
522         channel_callerid,
523         channel_connected_line,
524 };
525
526 static void sub_channel_update_handler(void *data,
527         struct stasis_subscription *sub,
528         struct stasis_message *message)
529 {
530         struct stasis_app *app = data;
531         struct stasis_cache_update *update;
532         struct ast_channel_snapshot *new_snapshot;
533         struct ast_channel_snapshot *old_snapshot;
534         const struct timeval *tv;
535         int i;
536
537         ast_assert(stasis_message_type(message) == stasis_cache_update_type());
538
539         update = stasis_message_data(message);
540
541         ast_assert(update->type == ast_channel_snapshot_type());
542
543         new_snapshot = stasis_message_data(update->new_snapshot);
544         old_snapshot = stasis_message_data(update->old_snapshot);
545
546         /* Pull timestamp from the new snapshot, or from the update message
547          * when there isn't one. */
548         tv = update->new_snapshot ?
549                 stasis_message_timestamp(update->new_snapshot) :
550                 stasis_message_timestamp(message);
551
552         for (i = 0; i < ARRAY_LEN(channel_monitors); ++i) {
553                 RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
554
555                 msg = channel_monitors[i](old_snapshot, new_snapshot, tv);
556                 if (msg) {
557                         app_send(app, msg);
558                 }
559         }
560
561         if (!new_snapshot && old_snapshot) {
562                 unsubscribe(app, "channel", old_snapshot->uniqueid, 1);
563         }
564 }
565
566 static struct ast_json *simple_endpoint_event(
567         const char *type,
568         struct ast_endpoint_snapshot *snapshot,
569         const struct timeval *tv)
570 {
571         struct ast_json *json_endpoint = ast_endpoint_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
572
573         if (!json_endpoint) {
574                 return NULL;
575         }
576
577         return ast_json_pack("{s: s, s: o, s: o}",
578                 "type", type,
579                 "timestamp", ast_json_timeval(*tv, NULL),
580                 "endpoint", json_endpoint);
581 }
582
583 static int message_received_handler(const char *endpoint_id, struct ast_json *json_msg, void *pvt)
584 {
585         RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
586         struct ast_json *json_endpoint;
587         struct stasis_app *app = pvt;
588         char *tech;
589         char *resource;
590
591         tech = ast_strdupa(endpoint_id);
592         resource = strchr(tech, '/');
593         if (resource) {
594                 resource[0] = '\0';
595                 resource++;
596         }
597
598         if (ast_strlen_zero(tech) || ast_strlen_zero(resource)) {
599                 return -1;
600         }
601
602         snapshot = ast_endpoint_latest_snapshot(tech, resource);
603         if (!snapshot) {
604                 return -1;
605         }
606
607         json_endpoint = ast_endpoint_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
608         if (!json_endpoint) {
609                 return -1;
610         }
611
612         app_send(app, ast_json_pack("{s: s, s: o, s: o, s: O}",
613                 "type", "TextMessageReceived",
614                 "timestamp", ast_json_timeval(ast_tvnow(), NULL),
615                 "endpoint", json_endpoint,
616                 "message", json_msg));
617
618         return 0;
619 }
620
621 static void sub_endpoint_update_handler(void *data,
622         struct stasis_subscription *sub,
623         struct stasis_message *message)
624 {
625         RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
626         struct stasis_app *app = data;
627         struct stasis_cache_update *update;
628         struct ast_endpoint_snapshot *new_snapshot;
629         struct ast_endpoint_snapshot *old_snapshot;
630         const struct timeval *tv;
631
632         ast_assert(stasis_message_type(message) == stasis_cache_update_type());
633
634         update = stasis_message_data(message);
635
636         ast_assert(update->type == ast_endpoint_snapshot_type());
637
638         new_snapshot = stasis_message_data(update->new_snapshot);
639         old_snapshot = stasis_message_data(update->old_snapshot);
640
641         if (new_snapshot) {
642                 tv = stasis_message_timestamp(update->new_snapshot);
643
644                 json = simple_endpoint_event("EndpointStateChange", new_snapshot, tv);
645                 if (!json) {
646                         return;
647                 }
648
649                 app_send(app, json);
650         }
651
652         if (!new_snapshot && old_snapshot) {
653                 unsubscribe(app, "endpoint", old_snapshot->id, 1);
654         }
655 }
656
657 static struct ast_json *simple_bridge_event(
658         const char *type,
659         struct ast_bridge_snapshot *snapshot,
660         const struct timeval *tv)
661 {
662         struct ast_json *json_bridge = ast_bridge_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
663         if (!json_bridge) {
664                 return NULL;
665         }
666
667         return ast_json_pack("{s: s, s: o, s: o}",
668                 "type", type,
669                 "timestamp", ast_json_timeval(*tv, NULL),
670                 "bridge", json_bridge);
671 }
672
673 static void sub_bridge_update_handler(void *data,
674         struct stasis_subscription *sub,
675         struct stasis_message *message)
676 {
677         RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
678         struct stasis_app *app = data;
679         struct stasis_cache_update *update;
680         struct ast_bridge_snapshot *new_snapshot;
681         struct ast_bridge_snapshot *old_snapshot;
682         const struct timeval *tv;
683
684         ast_assert(stasis_message_type(message) == stasis_cache_update_type());
685
686         update = stasis_message_data(message);
687
688         ast_assert(update->type == ast_bridge_snapshot_type());
689
690         new_snapshot = stasis_message_data(update->new_snapshot);
691         old_snapshot = stasis_message_data(update->old_snapshot);
692         tv = update->new_snapshot ?
693                 stasis_message_timestamp(update->new_snapshot) :
694                 stasis_message_timestamp(message);
695
696         if (!new_snapshot) {
697                 json = simple_bridge_event("BridgeDestroyed", old_snapshot, tv);
698         } else if (!old_snapshot) {
699                 json = simple_bridge_event("BridgeCreated", new_snapshot, tv);
700         }
701
702         if (json) {
703                 app_send(app, json);
704         }
705
706         if (!new_snapshot && old_snapshot) {
707                 unsubscribe(app, "bridge", old_snapshot->uniqueid, 1);
708         }
709 }
710
711
712 /*! \brief Helper function for determining if the application is subscribed to a given entity */
713 static int bridge_app_subscribed(struct stasis_app *app, const char *uniqueid)
714 {
715         struct app_forwards *forwards = NULL;
716
717         forwards = ao2_find(app->forwards, uniqueid, OBJ_SEARCH_KEY);
718         if (!forwards) {
719                 return 0;
720         }
721
722         ao2_ref(forwards, -1);
723         return 1;
724 }
725
726 static void bridge_merge_handler(void *data, struct stasis_subscription *sub,
727         struct stasis_message *message)
728 {
729         struct stasis_app *app = data;
730         struct ast_bridge_merge_message *merge;
731
732         merge = stasis_message_data(message);
733
734         /* Find out if we're subscribed to either bridge */
735         if (bridge_app_subscribed(app, merge->from->uniqueid) ||
736                 bridge_app_subscribed(app, merge->to->uniqueid)) {
737                 /* Forward the message to the app */
738                 stasis_publish(app->topic, message);
739         }
740 }
741
742 /*! \brief Callback function for checking if channels in a bridge are subscribed to */
743 static int bridge_app_subscribed_involved(struct stasis_app *app, struct ast_bridge_snapshot *snapshot)
744 {
745         int subscribed = 0;
746         struct ao2_iterator iter;
747         char *uniqueid;
748
749         if (bridge_app_subscribed(app, snapshot->uniqueid)) {
750                 return 1;
751         }
752
753         iter = ao2_iterator_init(snapshot->channels, 0);
754         for (; (uniqueid = ao2_iterator_next(&iter)); ao2_ref(uniqueid, -1)) {
755                 if (bridge_app_subscribed(app, uniqueid)) {
756                         subscribed = 1;
757                         ao2_ref(uniqueid, -1);
758                         break;
759                 }
760         }
761         ao2_iterator_destroy(&iter);
762
763         return subscribed;
764 }
765
766 static void bridge_blind_transfer_handler(void *data, struct stasis_subscription *sub,
767         struct stasis_message *message)
768 {
769         struct stasis_app *app = data;
770         struct ast_blind_transfer_message *transfer_msg = stasis_message_data(message);
771         struct ast_bridge_snapshot *bridge = transfer_msg->bridge;
772
773         if (bridge_app_subscribed(app, transfer_msg->transferer->uniqueid) ||
774                 (bridge && bridge_app_subscribed_involved(app, bridge))) {
775                 stasis_publish(app->topic, message);
776         }
777 }
778
779 static void bridge_attended_transfer_handler(void *data, struct stasis_subscription *sub,
780         struct stasis_message *message)
781 {
782         struct stasis_app *app = data;
783         struct ast_attended_transfer_message *transfer_msg = stasis_message_data(message);
784         int subscribed = 0;
785
786         subscribed = bridge_app_subscribed(app, transfer_msg->to_transferee.channel_snapshot->uniqueid);
787         if (!subscribed) {
788                 subscribed = bridge_app_subscribed(app, transfer_msg->to_transfer_target.channel_snapshot->uniqueid);
789         }
790         if (!subscribed && transfer_msg->to_transferee.bridge_snapshot) {
791                 subscribed = bridge_app_subscribed_involved(app, transfer_msg->to_transferee.bridge_snapshot);
792         }
793         if (!subscribed && transfer_msg->to_transfer_target.bridge_snapshot) {
794                 subscribed = bridge_app_subscribed_involved(app, transfer_msg->to_transfer_target.bridge_snapshot);
795         }
796
797         if (!subscribed) {
798                 switch (transfer_msg->dest_type) {
799                 case AST_ATTENDED_TRANSFER_DEST_BRIDGE_MERGE:
800                         subscribed = bridge_app_subscribed(app, transfer_msg->dest.bridge);
801                         break;
802                 case AST_ATTENDED_TRANSFER_DEST_LINK:
803                         subscribed = bridge_app_subscribed(app, transfer_msg->dest.links[0]->uniqueid);
804                         if (!subscribed) {
805                                 subscribed = bridge_app_subscribed(app, transfer_msg->dest.links[1]->uniqueid);
806                         }
807                         break;
808                 break;
809                 case AST_ATTENDED_TRANSFER_DEST_THREEWAY:
810                         subscribed = bridge_app_subscribed_involved(app, transfer_msg->dest.threeway.bridge_snapshot);
811                         if (!subscribed) {
812                                 subscribed = bridge_app_subscribed(app, transfer_msg->dest.threeway.channel_snapshot->uniqueid);
813                         }
814                         break;
815                 default:
816                         break;
817                 }
818         }
819
820         if (subscribed) {
821                 stasis_publish(app->topic, message);
822         }
823 }
824
825 static void bridge_default_handler(void *data, struct stasis_subscription *sub,
826         struct stasis_message *message)
827 {
828         struct stasis_app *app = data;
829
830         if (stasis_subscription_final_message(sub, message)) {
831                 ao2_cleanup(app);
832         }
833 }
834
835 struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *data, enum stasis_app_subscription_model subscription_model)
836 {
837         RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
838         size_t size;
839         int res = 0;
840
841         ast_assert(name != NULL);
842         ast_assert(handler != NULL);
843
844         ast_verb(1, "Creating Stasis app '%s'\n", name);
845
846         size = sizeof(*app) + strlen(name) + 1;
847         app = ao2_alloc_options(size, app_dtor, AO2_ALLOC_OPT_LOCK_MUTEX);
848         if (!app) {
849                 return NULL;
850         }
851         app->subscription_model = subscription_model;
852
853         app->forwards = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_MUTEX,
854                 AO2_CONTAINER_ALLOC_OPT_DUPS_OBJ_REJECT,
855                 forwards_sort, NULL);
856         if (!app->forwards) {
857                 return NULL;
858         }
859
860         app->topic = stasis_topic_create(name);
861         if (!app->topic) {
862                 return NULL;
863         }
864
865         app->bridge_router = stasis_message_router_create(ast_bridge_topic_all());
866         if (!app->bridge_router) {
867                 return NULL;
868         }
869
870         res |= stasis_message_router_add(app->bridge_router,
871                 ast_bridge_merge_message_type(), bridge_merge_handler, app);
872
873         res |= stasis_message_router_add(app->bridge_router,
874                 ast_blind_transfer_type(), bridge_blind_transfer_handler, app);
875
876         res |= stasis_message_router_add(app->bridge_router,
877                 ast_attended_transfer_type(), bridge_attended_transfer_handler, app);
878
879         res |= stasis_message_router_set_default(app->bridge_router,
880                 bridge_default_handler, app);
881
882         if (res != 0) {
883                 return NULL;
884         }
885         /* Bridge router holds a reference */
886         ao2_ref(app, +1);
887
888         app->router = stasis_message_router_create(app->topic);
889         if (!app->router) {
890                 return NULL;
891         }
892
893         res |= stasis_message_router_add_cache_update(app->router,
894                 ast_bridge_snapshot_type(), sub_bridge_update_handler, app);
895
896         res |= stasis_message_router_add_cache_update(app->router,
897                 ast_channel_snapshot_type(), sub_channel_update_handler, app);
898
899         res |= stasis_message_router_add_cache_update(app->router,
900                 ast_endpoint_snapshot_type(), sub_endpoint_update_handler, app);
901
902         res |= stasis_message_router_set_default(app->router,
903                 sub_default_handler, app);
904
905         if (res != 0) {
906                 return NULL;
907         }
908         /* Router holds a reference */
909         ao2_ref(app, +1);
910
911         strncpy(app->name, name, size - sizeof(*app));
912         app->handler = handler;
913         app->data = ao2_bump(data);
914
915         ao2_ref(app, +1);
916         return app;
917 }
918
919 struct stasis_topic *ast_app_get_topic(struct stasis_app *app)
920 {
921         return app->topic;
922 }
923
924 /*!
925  * \brief Send a message to the given application.
926  * \param app App to send the message to.
927  * \param message Message to send.
928  */
929 void app_send(struct stasis_app *app, struct ast_json *message)
930 {
931         stasis_app_cb handler;
932         RAII_VAR(void *, data, NULL, ao2_cleanup);
933
934         /* Copy off mutable state with lock held */
935         {
936                 SCOPED_AO2LOCK(lock, app);
937                 handler = app->handler;
938                 if (app->data) {
939                         ao2_ref(app->data, +1);
940                         data = app->data;
941                 }
942                 /* Name is immutable; no need to copy */
943         }
944
945         if (!handler) {
946                 ast_verb(3,
947                         "Inactive Stasis app '%s' missed message\n", app->name);
948                 return;
949         }
950
951         handler(data, app->name, message);
952 }
953
954 void app_deactivate(struct stasis_app *app)
955 {
956         SCOPED_AO2LOCK(lock, app);
957         ast_verb(1, "Deactivating Stasis app '%s'\n", app->name);
958         app->handler = NULL;
959         ao2_cleanup(app->data);
960         app->data = NULL;
961 }
962
963 void app_shutdown(struct stasis_app *app)
964 {
965         SCOPED_AO2LOCK(lock, app);
966
967         ast_assert(app_is_finished(app));
968
969         stasis_message_router_unsubscribe(app->router);
970         app->router = NULL;
971         stasis_message_router_unsubscribe(app->bridge_router);
972         app->bridge_router = NULL;
973         stasis_message_router_unsubscribe(app->endpoint_router);
974         app->endpoint_router = NULL;
975 }
976
977 int app_is_active(struct stasis_app *app)
978 {
979         SCOPED_AO2LOCK(lock, app);
980         return app->handler != NULL;
981 }
982
983 int app_is_finished(struct stasis_app *app)
984 {
985         SCOPED_AO2LOCK(lock, app);
986
987         return app->handler == NULL && ao2_container_count(app->forwards) == 0;
988 }
989
990 void app_update(struct stasis_app *app, stasis_app_cb handler, void *data)
991 {
992         SCOPED_AO2LOCK(lock, app);
993
994         if (app->handler && app->data) {
995                 RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
996
997                 ast_verb(1, "Replacing Stasis app '%s'\n", app->name);
998
999                 msg = ast_json_pack("{s: s, s: s}",
1000                         "type", "ApplicationReplaced",
1001                         "application", app->name);
1002                 if (msg) {
1003                         app_send(app, msg);
1004                 }
1005         } else {
1006                 ast_verb(1, "Activating Stasis app '%s'\n", app->name);
1007         }
1008
1009         app->handler = handler;
1010         ao2_cleanup(app->data);
1011         if (data) {
1012                 ao2_ref(data, +1);
1013         }
1014         app->data = data;
1015 }
1016
1017 const char *app_name(const struct stasis_app *app)
1018 {
1019         return app->name;
1020 }
1021
1022 struct ast_json *app_to_json(const struct stasis_app *app)
1023 {
1024         RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
1025         struct ast_json *channels;
1026         struct ast_json *bridges;
1027         struct ast_json *endpoints;
1028         struct ao2_iterator i;
1029         void *obj;
1030
1031         json = ast_json_pack("{s: s, s: [], s: [], s: []}",
1032                 "name", app->name,
1033                 "channel_ids", "bridge_ids", "endpoint_ids");
1034         channels = ast_json_object_get(json, "channel_ids");
1035         bridges = ast_json_object_get(json, "bridge_ids");
1036         endpoints = ast_json_object_get(json, "endpoint_ids");
1037
1038         i = ao2_iterator_init(app->forwards, 0);
1039         while ((obj = ao2_iterator_next(&i))) {
1040                 RAII_VAR(struct app_forwards *, forwards, obj, ao2_cleanup);
1041                 RAII_VAR(struct ast_json *, id, NULL, ast_json_unref);
1042                 int append_res = -1;
1043
1044                 id = ast_json_string_create(forwards->id);
1045
1046                 switch (forwards->forward_type) {
1047                 case FORWARD_CHANNEL:
1048                         append_res = ast_json_array_append(channels,
1049                                 ast_json_ref(id));
1050                         break;
1051                 case FORWARD_BRIDGE:
1052                         append_res = ast_json_array_append(bridges,
1053                                 ast_json_ref(id));
1054                         break;
1055                 case FORWARD_ENDPOINT:
1056                         append_res = ast_json_array_append(endpoints,
1057                                 ast_json_ref(id));
1058                         break;
1059                 }
1060
1061                 if (append_res != 0) {
1062                         ast_log(LOG_ERROR, "Error building response\n");
1063                         ao2_iterator_destroy(&i);
1064                         return NULL;
1065                 }
1066         }
1067         ao2_iterator_destroy(&i);
1068
1069         return ast_json_ref(json);
1070 }
1071
1072 int app_subscribe_channel(struct stasis_app *app, struct ast_channel *chan)
1073 {
1074         struct app_forwards *forwards;
1075         SCOPED_AO2LOCK(lock, app->forwards);
1076         int res;
1077
1078         if (!app) {
1079                 return -1;
1080         }
1081
1082         /* If subscribed to all, don't subscribe again */
1083         forwards = ao2_find(app->forwards, CHANNEL_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1084         if (forwards) {
1085                 ao2_ref(forwards, -1);
1086                 return 0;
1087         }
1088
1089         forwards = ao2_find(app->forwards,
1090                 chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL,
1091                 OBJ_SEARCH_KEY | OBJ_NOLOCK);
1092         if (!forwards) {
1093                 /* Forwards not found, create one */
1094                 forwards = forwards_create_channel(app, chan);
1095                 if (!forwards) {
1096                         return -1;
1097                 }
1098
1099                 res = ao2_link_flags(app->forwards, forwards,
1100                         OBJ_NOLOCK);
1101                 if (!res) {
1102                         ao2_ref(forwards, -1);
1103                         return -1;
1104                 }
1105         }
1106
1107         ++forwards->interested;
1108         ast_debug(3, "Channel '%s' is %d interested in %s\n",
1109                 chan ? ast_channel_uniqueid(chan) : "ALL",
1110                 forwards->interested,
1111                 app->name);
1112
1113         ao2_ref(forwards, -1);
1114         return 0;
1115 }
1116
1117 static int subscribe_channel(struct stasis_app *app, void *obj)
1118 {
1119         return app_subscribe_channel(app, obj);
1120 }
1121
1122 static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate)
1123 {
1124         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
1125         SCOPED_AO2LOCK(lock, app->forwards);
1126
1127         if (!id) {
1128                 if (!strcmp(kind, "bridge")) {
1129                         id = BRIDGE_ALL;
1130                 } else if (!strcmp(kind, "channel")) {
1131                         id = CHANNEL_ALL;
1132                 } else if (!strcmp(kind, "endpoint")) {
1133                         id = ENDPOINT_ALL;
1134                 } else {
1135                         ast_log(LOG_WARNING, "Unknown subscription kind '%s'\n", kind);
1136                         return -1;
1137                 }
1138         }
1139
1140         forwards = ao2_find(app->forwards, id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1141         if (!forwards) {
1142                 ast_debug(3, "App '%s' not subscribed to %s '%s'\n", app->name, kind, id);
1143                 return -1;
1144         }
1145         forwards->interested--;
1146
1147         ast_debug(3, "%s '%s': is %d interested in %s\n", kind, id, forwards->interested, app->name);
1148         if (forwards->interested == 0 || terminate) {
1149                 /* No one is interested any more; unsubscribe */
1150                 ast_debug(3, "%s '%s' unsubscribed from %s\n", kind, id, app->name);
1151                 forwards_unsubscribe(forwards);
1152                 ao2_find(app->forwards, forwards,
1153                         OBJ_POINTER | OBJ_NOLOCK | OBJ_UNLINK |
1154                         OBJ_NODATA);
1155
1156                 if (!strcmp(kind, "endpoint")) {
1157                         messaging_app_unsubscribe_endpoint(app->name, id);
1158                 }
1159         }
1160
1161         return 0;
1162 }
1163
1164 int app_unsubscribe_channel(struct stasis_app *app, struct ast_channel *chan)
1165 {
1166         if (!app) {
1167                 return -1;
1168         }
1169
1170         return app_unsubscribe_channel_id(app, chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL);
1171 }
1172
1173 int app_unsubscribe_channel_id(struct stasis_app *app, const char *channel_id)
1174 {
1175         if (!app) {
1176                 return -1;
1177         }
1178
1179         return unsubscribe(app, "channel", channel_id, 0);
1180 }
1181
1182 int app_is_subscribed_channel_id(struct stasis_app *app, const char *channel_id)
1183 {
1184         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
1185
1186         if (ast_strlen_zero(channel_id)) {
1187                 channel_id = CHANNEL_ALL;
1188         }
1189         forwards = ao2_find(app->forwards, channel_id, OBJ_SEARCH_KEY);
1190         return forwards != NULL;
1191 }
1192
1193 static void *channel_find(const struct stasis_app *app, const char *id)
1194 {
1195         return ast_channel_get_by_name(id);
1196 }
1197
1198 struct stasis_app_event_source channel_event_source = {
1199         .scheme = "channel:",
1200         .find = channel_find,
1201         .subscribe = subscribe_channel,
1202         .unsubscribe = app_unsubscribe_channel_id,
1203         .is_subscribed = app_is_subscribed_channel_id
1204 };
1205
1206 int app_subscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
1207 {
1208         struct app_forwards *forwards;
1209         SCOPED_AO2LOCK(lock, app->forwards);
1210
1211         if (!app) {
1212                 return -1;
1213         }
1214
1215         /* If subscribed to all, don't subscribe again */
1216         forwards = ao2_find(app->forwards, BRIDGE_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1217         if (forwards) {
1218                 ao2_ref(forwards, -1);
1219                 return 0;
1220         }
1221
1222         forwards = ao2_find(app->forwards, bridge ? bridge->uniqueid : BRIDGE_ALL,
1223                 OBJ_SEARCH_KEY | OBJ_NOLOCK);
1224         if (!forwards) {
1225                 /* Forwards not found, create one */
1226                 forwards = forwards_create_bridge(app, bridge);
1227                 if (!forwards) {
1228                         return -1;
1229                 }
1230                 ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
1231         }
1232
1233         ++forwards->interested;
1234         ast_debug(3, "Bridge '%s' is %d interested in %s\n",
1235                 bridge ? bridge->uniqueid : "ALL",
1236                 forwards->interested,
1237                 app->name);
1238
1239         ao2_ref(forwards, -1);
1240         return 0;
1241 }
1242
1243 static int subscribe_bridge(struct stasis_app *app, void *obj)
1244 {
1245         return app_subscribe_bridge(app, obj);
1246 }
1247
1248 int app_unsubscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
1249 {
1250         if (!app) {
1251                 return -1;
1252         }
1253
1254         return app_unsubscribe_bridge_id(app, bridge ? bridge->uniqueid : BRIDGE_ALL);
1255 }
1256
1257 int app_unsubscribe_bridge_id(struct stasis_app *app, const char *bridge_id)
1258 {
1259         if (!app) {
1260                 return -1;
1261         }
1262
1263         return unsubscribe(app, "bridge", bridge_id, 0);
1264 }
1265
1266 int app_is_subscribed_bridge_id(struct stasis_app *app, const char *bridge_id)
1267 {
1268         struct app_forwards *forwards;
1269         SCOPED_AO2LOCK(lock, app->forwards);
1270
1271         forwards = ao2_find(app->forwards, BRIDGE_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1272         if (forwards) {
1273                 ao2_ref(forwards, -1);
1274                 return 1;
1275         }
1276
1277         if (ast_strlen_zero(bridge_id)) {
1278                 bridge_id = BRIDGE_ALL;
1279         }
1280
1281         forwards = ao2_find(app->forwards, bridge_id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1282         if (forwards) {
1283                 ao2_ref(forwards, -1);
1284                 return 1;
1285         }
1286
1287         return 0;
1288 }
1289
1290 static void *bridge_find(const struct stasis_app *app, const char *id)
1291 {
1292         return stasis_app_bridge_find_by_id(id);
1293 }
1294
1295 struct stasis_app_event_source bridge_event_source = {
1296         .scheme = "bridge:",
1297         .find = bridge_find,
1298         .subscribe = subscribe_bridge,
1299         .unsubscribe = app_unsubscribe_bridge_id,
1300         .is_subscribed = app_is_subscribed_bridge_id
1301 };
1302
1303 int app_subscribe_endpoint(struct stasis_app *app, struct ast_endpoint *endpoint)
1304 {
1305         struct app_forwards *forwards;
1306         SCOPED_AO2LOCK(lock, app->forwards);
1307
1308         if (!app) {
1309                 return -1;
1310         }
1311
1312         /* If subscribed to all, don't subscribe again */
1313         forwards = ao2_find(app->forwards, ENDPOINT_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1314         if (forwards) {
1315                 ao2_ref(forwards, -1);
1316                 return 0;
1317         }
1318
1319         forwards = ao2_find(app->forwards,
1320                 endpoint ? ast_endpoint_get_id(endpoint) : ENDPOINT_ALL,
1321                 OBJ_SEARCH_KEY | OBJ_NOLOCK);
1322         if (!forwards) {
1323                 /* Forwards not found, create one */
1324                 forwards = forwards_create_endpoint(app, endpoint);
1325                 if (!forwards) {
1326                         return -1;
1327                 }
1328                 ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
1329
1330                 /* Subscribe for messages */
1331                 messaging_app_subscribe_endpoint(app->name, endpoint, &message_received_handler, app);
1332         }
1333
1334         ++forwards->interested;
1335         ast_debug(3, "Endpoint '%s' is %d interested in %s\n",
1336                 endpoint ? ast_endpoint_get_id(endpoint) : "ALL",
1337                 forwards->interested,
1338                 app->name);
1339
1340         ao2_ref(forwards, -1);
1341         return 0;
1342 }
1343
1344 static int subscribe_endpoint(struct stasis_app *app, void *obj)
1345 {
1346         return app_subscribe_endpoint(app, obj);
1347 }
1348
1349 int app_unsubscribe_endpoint_id(struct stasis_app *app, const char *endpoint_id)
1350 {
1351         if (!app) {
1352                 return -1;
1353         }
1354
1355         return unsubscribe(app, "endpoint", endpoint_id, 0);
1356 }
1357
1358 int app_is_subscribed_endpoint_id(struct stasis_app *app, const char *endpoint_id)
1359 {
1360         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
1361
1362         if (ast_strlen_zero(endpoint_id)) {
1363                 endpoint_id = ENDPOINT_ALL;
1364         }
1365         forwards = ao2_find(app->forwards, endpoint_id, OBJ_SEARCH_KEY);
1366         return forwards != NULL;
1367 }
1368
1369 static void *endpoint_find(const struct stasis_app *app, const char *id)
1370 {
1371         return ast_endpoint_find_by_id(id);
1372 }
1373
1374 struct stasis_app_event_source endpoint_event_source = {
1375         .scheme = "endpoint:",
1376         .find = endpoint_find,
1377         .subscribe = subscribe_endpoint,
1378         .unsubscribe = app_unsubscribe_endpoint_id,
1379         .is_subscribed = app_is_subscribed_endpoint_id
1380 };
1381
1382 void stasis_app_register_event_sources(void)
1383 {
1384         stasis_app_register_event_source(&channel_event_source);
1385         stasis_app_register_event_source(&bridge_event_source);
1386         stasis_app_register_event_source(&endpoint_event_source);
1387 }
1388
1389 int stasis_app_is_core_event_source(struct stasis_app_event_source *obj)
1390 {
1391         return obj == &endpoint_event_source ||
1392                 obj == &bridge_event_source ||
1393                 obj == &channel_event_source;
1394 }
1395
1396 void stasis_app_unregister_event_sources(void)
1397 {
1398         stasis_app_unregister_event_source(&endpoint_event_source);
1399         stasis_app_unregister_event_source(&bridge_event_source);
1400         stasis_app_unregister_event_source(&channel_event_source);
1401 }
1402
1403