You'd think that new files would be free of whitespace issues. But you would be...
[asterisk/asterisk.git] / res / stasis / app.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis application support.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 #include "asterisk.h"
27
28 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
29
30 #include "app.h"
31
32 #include "asterisk/callerid.h"
33 #include "asterisk/stasis_app.h"
34 #include "asterisk/stasis_bridges.h"
35 #include "asterisk/stasis_channels.h"
36 #include "asterisk/stasis_endpoints.h"
37 #include "asterisk/stasis_message_router.h"
38
39 struct app {
40         /*! Aggregation topic for this application. */
41         struct stasis_topic *topic;
42         /*! Router for handling messages forwarded to \a topic. */
43         struct stasis_message_router *router;
44         /*! Subscription to watch for bridge merge messages */
45         struct stasis_subscription *bridge_merge_sub;
46         /*! Container of the channel forwards to this app's topic. */
47         struct ao2_container *forwards;
48         /*! Callback function for this application. */
49         stasis_app_cb handler;
50         /*! Opaque data to hand to callback function. */
51         void *data;
52         /*! Name of the Stasis application */
53         char name[];
54 };
55
56 enum forward_type {
57         FORWARD_CHANNEL,
58         FORWARD_BRIDGE,
59         FORWARD_ENDPOINT,
60 };
61
62 /*! Subscription info for a particular channel/bridge. */
63 struct app_forwards {
64         /*! Count of number of times this channel/bridge has been subscribed */
65         int interested;
66
67         /*! Forward for the regular topic */
68         struct stasis_forward *topic_forward;
69         /*! Forward for the caching topic */
70         struct stasis_forward *topic_cached_forward;
71
72         /* Type of object being forwarded */
73         enum forward_type forward_type;
74         /*! Unique id of the object being forwarded */
75         char id[];
76 };
77
78 static void forwards_dtor(void *obj)
79 {
80 #ifdef AST_DEVMODE
81         struct app_forwards *forwards = obj;
82 #endif /* AST_DEVMODE */
83
84         ast_assert(forwards->topic_forward == NULL);
85         ast_assert(forwards->topic_cached_forward == NULL);
86 }
87
88 static void forwards_unsubscribe(struct app_forwards *forwards)
89 {
90         stasis_forward_cancel(forwards->topic_forward);
91         forwards->topic_forward = NULL;
92         stasis_forward_cancel(forwards->topic_cached_forward);
93         forwards->topic_cached_forward = NULL;
94 }
95
96 static struct app_forwards *forwards_create(struct app *app,
97         const char *id)
98 {
99         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
100
101         if (!app || ast_strlen_zero(id)) {
102                 return NULL;
103         }
104
105         forwards = ao2_alloc(sizeof(*forwards) + strlen(id) + 1, forwards_dtor);
106         if (!forwards) {
107                 return NULL;
108         }
109
110         strcpy(forwards->id, id);
111
112         ao2_ref(forwards, +1);
113         return forwards;
114 }
115
116 /*! Forward a channel's topics to an app */
117 static struct app_forwards *forwards_create_channel(struct app *app,
118         struct ast_channel *chan)
119 {
120         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
121
122         if (!app || !chan) {
123                 return NULL;
124         }
125
126         forwards = forwards_create(app, ast_channel_uniqueid(chan));
127         if (!forwards) {
128                 return NULL;
129         }
130
131         forwards->forward_type = FORWARD_CHANNEL;
132         forwards->topic_forward = stasis_forward_all(ast_channel_topic(chan),
133                 app->topic);
134         if (!forwards->topic_forward) {
135                 return NULL;
136         }
137
138         forwards->topic_cached_forward = stasis_forward_all(
139                 ast_channel_topic_cached(chan), app->topic);
140         if (!forwards->topic_cached_forward) {
141                 /* Half-subscribed is a bad thing */
142                 stasis_forward_cancel(forwards->topic_forward);
143                 forwards->topic_forward = NULL;
144                 return NULL;
145         }
146
147         ao2_ref(forwards, +1);
148         return forwards;
149 }
150
151 /*! Forward a bridge's topics to an app */
152 static struct app_forwards *forwards_create_bridge(struct app *app,
153         struct ast_bridge *bridge)
154 {
155         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
156
157         if (!app || !bridge) {
158                 return NULL;
159         }
160
161         forwards = forwards_create(app, bridge->uniqueid);
162         if (!forwards) {
163                 return NULL;
164         }
165
166         forwards->forward_type = FORWARD_BRIDGE;
167         forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge),
168                 app->topic);
169         if (!forwards->topic_forward) {
170                 return NULL;
171         }
172
173         forwards->topic_cached_forward = stasis_forward_all(
174                 ast_bridge_topic_cached(bridge), app->topic);
175         if (!forwards->topic_cached_forward) {
176                 /* Half-subscribed is a bad thing */
177                 stasis_forward_cancel(forwards->topic_forward);
178                 forwards->topic_forward = NULL;
179                 return NULL;
180         }
181
182         ao2_ref(forwards, +1);
183         return forwards;
184 }
185
186 /*! Forward a endpoint's topics to an app */
187 static struct app_forwards *forwards_create_endpoint(struct app *app,
188         struct ast_endpoint *endpoint)
189 {
190         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
191
192         if (!app || !endpoint) {
193                 return NULL;
194         }
195
196         forwards = forwards_create(app, ast_endpoint_get_id(endpoint));
197         if (!forwards) {
198                 return NULL;
199         }
200
201         forwards->forward_type = FORWARD_ENDPOINT;
202         forwards->topic_forward = stasis_forward_all(ast_endpoint_topic(endpoint),
203                 app->topic);
204         if (!forwards->topic_forward) {
205                 return NULL;
206         }
207
208         forwards->topic_cached_forward = stasis_forward_all(
209                 ast_endpoint_topic_cached(endpoint), app->topic);
210         if (!forwards->topic_cached_forward) {
211                 /* Half-subscribed is a bad thing */
212                 stasis_forward_cancel(forwards->topic_forward);
213                 forwards->topic_forward = NULL;
214                 return NULL;
215         }
216
217         ao2_ref(forwards, +1);
218         return forwards;
219 }
220
221 static int forwards_sort(const void *obj_left, const void *obj_right, int flags)
222 {
223         const struct app_forwards *object_left = obj_left;
224         const struct app_forwards *object_right = obj_right;
225         const char *right_key = obj_right;
226         int cmp;
227
228         switch (flags & (OBJ_POINTER | OBJ_KEY | OBJ_PARTIAL_KEY)) {
229         case OBJ_POINTER:
230                 right_key = object_right->id;
231                 /* Fall through */
232         case OBJ_KEY:
233                 cmp = strcmp(object_left->id, right_key);
234                 break;
235         case OBJ_PARTIAL_KEY:
236                 /*
237                  * We could also use a partial key struct containing a length
238                  * so strlen() does not get called for every comparison instead.
239                  */
240                 cmp = strncmp(object_left->id, right_key, strlen(right_key));
241                 break;
242         default:
243                 /* Sort can only work on something with a full or partial key. */
244                 ast_assert(0);
245                 cmp = 0;
246                 break;
247         }
248         return cmp;
249 }
250
251 static void app_dtor(void *obj)
252 {
253         struct app *app = obj;
254
255         ast_verb(1, "Destroying Stasis app %s\n", app->name);
256
257         ast_assert(app->router == NULL);
258         ast_assert(app->bridge_merge_sub == NULL);
259
260         ao2_cleanup(app->topic);
261         app->topic = NULL;
262         ao2_cleanup(app->forwards);
263         app->forwards = NULL;
264         ao2_cleanup(app->data);
265         app->data = NULL;
266 }
267
268 static void sub_default_handler(void *data, struct stasis_subscription *sub,
269         struct stasis_message *message)
270 {
271         struct app *app = data;
272         RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
273
274         if (stasis_subscription_final_message(sub, message)) {
275                 ao2_cleanup(app);
276         }
277
278         /* By default, send any message that has a JSON representation */
279         json = stasis_message_to_json(message);
280         if (!json) {
281                 return;
282         }
283
284         app_send(app, json);
285 }
286
287 /*! \brief Typedef for callbacks that get called on channel snapshot updates */
288 typedef struct ast_json *(*channel_snapshot_monitor)(
289         struct ast_channel_snapshot *old_snapshot,
290         struct ast_channel_snapshot *new_snapshot,
291         const struct timeval *tv);
292
293 static struct ast_json *simple_channel_event(
294         const char *type,
295         struct ast_channel_snapshot *snapshot,
296         const struct timeval *tv)
297 {
298         return ast_json_pack("{s: s, s: o, s: o}",
299                 "type", type,
300                 "timestamp", ast_json_timeval(*tv, NULL),
301                 "channel", ast_channel_snapshot_to_json(snapshot));
302 }
303
304 static struct ast_json *channel_created_event(
305         struct ast_channel_snapshot *snapshot,
306         const struct timeval *tv)
307 {
308         return simple_channel_event("ChannelCreated", snapshot, tv);
309 }
310
311 static struct ast_json *channel_destroyed_event(
312         struct ast_channel_snapshot *snapshot,
313         const struct timeval *tv)
314 {
315         return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
316                 "type", "ChannelDestroyed",
317                 "timestamp", ast_json_timeval(*tv, NULL),
318                 "cause", snapshot->hangupcause,
319                 "cause_txt", ast_cause2str(snapshot->hangupcause),
320                 "channel", ast_channel_snapshot_to_json(snapshot));
321 }
322
323 static struct ast_json *channel_state_change_event(
324         struct ast_channel_snapshot *snapshot,
325         const struct timeval *tv)
326 {
327         return simple_channel_event("ChannelStateChange", snapshot, tv);
328 }
329
330 /*! \brief Handle channel state changes */
331 static struct ast_json *channel_state(
332         struct ast_channel_snapshot *old_snapshot,
333         struct ast_channel_snapshot *new_snapshot,
334         const struct timeval *tv)
335 {
336         struct ast_channel_snapshot *snapshot = new_snapshot ?
337                 new_snapshot : old_snapshot;
338
339         if (!old_snapshot) {
340                 return channel_created_event(snapshot, tv);
341         } else if (!new_snapshot) {
342                 return channel_destroyed_event(snapshot, tv);
343         } else if (old_snapshot->state != new_snapshot->state) {
344                 return channel_state_change_event(snapshot, tv);
345         }
346
347         return NULL;
348 }
349
350 static struct ast_json *channel_dialplan(
351         struct ast_channel_snapshot *old_snapshot,
352         struct ast_channel_snapshot *new_snapshot,
353         const struct timeval *tv)
354 {
355         RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
356
357         /* No Newexten event on cache clear or first event */
358         if (!old_snapshot || !new_snapshot) {
359                 return NULL;
360         }
361
362         /* Empty application is not valid for a Newexten event */
363         if (ast_strlen_zero(new_snapshot->appl)) {
364                 return NULL;
365         }
366
367         if (ast_channel_snapshot_cep_equal(old_snapshot, new_snapshot)) {
368                 return NULL;
369         }
370
371         return ast_json_pack("{s: s, s: o, s: s, s: s, s: o}",
372                 "type", "ChannelDialplan",
373                 "timestamp", ast_json_timeval(*tv, NULL),
374                 "dialplan_app", new_snapshot->appl,
375                 "dialplan_app_data", new_snapshot->data,
376                 "channel", ast_channel_snapshot_to_json(new_snapshot));
377 }
378
379 static struct ast_json *channel_callerid(
380         struct ast_channel_snapshot *old_snapshot,
381         struct ast_channel_snapshot *new_snapshot,
382         const struct timeval *tv)
383 {
384         RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
385
386         /* No NewCallerid event on cache clear or first event */
387         if (!old_snapshot || !new_snapshot) {
388                 return NULL;
389         }
390
391         if (ast_channel_snapshot_caller_id_equal(old_snapshot, new_snapshot)) {
392                 return NULL;
393         }
394
395         return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
396                 "type", "ChannelCallerId",
397                 "timestamp", ast_json_timeval(*tv, NULL),
398                 "caller_presentation", new_snapshot->caller_pres,
399                 "caller_presentation_txt", ast_describe_caller_presentation(
400                         new_snapshot->caller_pres),
401                 "channel", ast_channel_snapshot_to_json(new_snapshot));
402 }
403
404 static channel_snapshot_monitor channel_monitors[] = {
405         channel_state,
406         channel_dialplan,
407         channel_callerid
408 };
409
410 static void sub_channel_update_handler(void *data,
411         struct stasis_subscription *sub,
412         struct stasis_message *message)
413 {
414         struct app *app = data;
415         struct stasis_cache_update *update;
416         struct ast_channel_snapshot *new_snapshot;
417         struct ast_channel_snapshot *old_snapshot;
418         const struct timeval *tv;
419         int i;
420
421         ast_assert(stasis_message_type(message) == stasis_cache_update_type());
422
423         update = stasis_message_data(message);
424
425         ast_assert(update->type == ast_channel_snapshot_type());
426
427         new_snapshot = stasis_message_data(update->new_snapshot);
428         old_snapshot = stasis_message_data(update->old_snapshot);
429
430         /* Pull timestamp from the new snapshot, or from the update message
431          * when there isn't one. */
432         tv = update->new_snapshot ?
433                 stasis_message_timestamp(update->new_snapshot) :
434                 stasis_message_timestamp(message);
435
436         for (i = 0; i < ARRAY_LEN(channel_monitors); ++i) {
437                 RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
438
439                 msg = channel_monitors[i](old_snapshot, new_snapshot, tv);
440                 if (msg) {
441                         app_send(app, msg);
442                 }
443         }
444 }
445
446 static struct ast_json *simple_endpoint_event(
447         const char *type,
448         struct ast_endpoint_snapshot *snapshot,
449         const struct timeval *tv)
450 {
451         return ast_json_pack("{s: s, s: o, s: o}",
452                 "type", type,
453                 "timestamp", ast_json_timeval(*tv, NULL),
454                 "endpoint", ast_endpoint_snapshot_to_json(snapshot));
455 }
456
457 static void sub_endpoint_update_handler(void *data,
458         struct stasis_subscription *sub,
459         struct stasis_message *message)
460 {
461         RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
462         struct app *app = data;
463         struct stasis_cache_update *update;
464         struct ast_endpoint_snapshot *new_snapshot;
465         const struct timeval *tv;
466
467         ast_assert(stasis_message_type(message) == stasis_cache_update_type());
468
469         update = stasis_message_data(message);
470
471         ast_assert(update->type == ast_endpoint_snapshot_type());
472
473         new_snapshot = stasis_message_data(update->new_snapshot);
474         tv = update->new_snapshot ?
475                 stasis_message_timestamp(update->new_snapshot) :
476                 stasis_message_timestamp(message);
477
478         json = simple_endpoint_event("EndpointStateChange", new_snapshot, tv);
479
480         if (!json) {
481                 return;
482         }
483
484         app_send(app, json);
485 }
486
487 static struct ast_json *simple_bridge_event(
488         const char *type,
489         struct ast_bridge_snapshot *snapshot,
490         const struct timeval *tv)
491 {
492         return ast_json_pack("{s: s, s: o, s: o}",
493                 "type", type,
494                 "timestamp", ast_json_timeval(*tv, NULL),
495                 "bridge", ast_bridge_snapshot_to_json(snapshot));
496 }
497
498 static void sub_bridge_update_handler(void *data,
499         struct stasis_subscription *sub,
500         struct stasis_message *message)
501 {
502         RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
503         struct app *app = data;
504         struct stasis_cache_update *update;
505         struct ast_bridge_snapshot *new_snapshot;
506         struct ast_bridge_snapshot *old_snapshot;
507         const struct timeval *tv;
508
509         ast_assert(stasis_message_type(message) == stasis_cache_update_type());
510
511         update = stasis_message_data(message);
512
513         ast_assert(update->type == ast_bridge_snapshot_type());
514
515         new_snapshot = stasis_message_data(update->new_snapshot);
516         old_snapshot = stasis_message_data(update->old_snapshot);
517         tv = update->new_snapshot ?
518                 stasis_message_timestamp(update->new_snapshot) :
519                 stasis_message_timestamp(message);
520
521         if (!new_snapshot) {
522                 json = simple_bridge_event("BridgeDestroyed", old_snapshot, tv);
523         } else if (!old_snapshot) {
524                 json = simple_bridge_event("BridgeCreated", old_snapshot, tv);
525         }
526
527         if (!json) {
528                 return;
529         }
530
531         app_send(app, json);
532 }
533
534 static void bridge_merge_handler(void *data, struct stasis_subscription *sub,
535         struct stasis_message *message)
536 {
537         struct app *app = data;
538         struct ast_bridge_merge_message *merge;
539         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
540
541         if (stasis_subscription_final_message(sub, message)) {
542                 ao2_cleanup(app);
543         }
544
545         if (stasis_message_type(message) != ast_bridge_merge_message_type()) {
546                 return;
547         }
548
549         merge = stasis_message_data(message);
550
551         /* Find out if we're subscribed to either bridge */
552         forwards = ao2_find(app->forwards, merge->from->uniqueid,
553                 OBJ_SEARCH_KEY);
554         if (!forwards) {
555                 forwards = ao2_find(app->forwards, merge->to->uniqueid,
556                         OBJ_SEARCH_KEY);
557         }
558
559         if (!forwards) {
560                 return;
561         }
562
563         /* Forward the message to the app */
564         stasis_publish(app->topic, message);
565 }
566
567 struct app *app_create(const char *name, stasis_app_cb handler, void *data)
568 {
569         RAII_VAR(struct app *, app, NULL, ao2_cleanup);
570         size_t size;
571         int res = 0;
572
573         ast_assert(name != NULL);
574         ast_assert(handler != NULL);
575
576         ast_verb(1, "Creating Stasis app '%s'\n", name);
577
578         size = sizeof(*app) + strlen(name) + 1;
579         app = ao2_alloc_options(size, app_dtor, AO2_ALLOC_OPT_LOCK_MUTEX);
580
581         if (!app) {
582                 return NULL;
583         }
584
585         app->forwards = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_MUTEX,
586                 AO2_CONTAINER_ALLOC_OPT_DUPS_OBJ_REJECT,
587                 forwards_sort, NULL);
588         if (!app->forwards) {
589                 return NULL;
590         }
591
592         app->topic = stasis_topic_create(name);
593         if (!app->topic) {
594                 return NULL;
595         }
596
597         app->bridge_merge_sub = stasis_subscribe(ast_bridge_topic_all(),
598                 bridge_merge_handler, app);
599         if (!app->bridge_merge_sub) {
600                 return NULL;
601         }
602         /* Subscription holds a reference */
603         ao2_ref(app, +1);
604
605         app->router = stasis_message_router_create(app->topic);
606         if (!app->router) {
607                 return NULL;
608         }
609
610         res |= stasis_message_router_add_cache_update(app->router,
611                 ast_bridge_snapshot_type(), sub_bridge_update_handler, app);
612
613         res |= stasis_message_router_add_cache_update(app->router,
614                 ast_channel_snapshot_type(), sub_channel_update_handler, app);
615
616         res |= stasis_message_router_add_cache_update(app->router,
617                 ast_endpoint_snapshot_type(), sub_endpoint_update_handler, app);
618
619         res |= stasis_message_router_set_default(app->router,
620                 sub_default_handler, app);
621
622         if (res != 0) {
623                 return NULL;
624         }
625         /* Router holds a reference */
626         ao2_ref(app, +1);
627
628         strncpy(app->name, name, size - sizeof(*app));
629         app->handler = handler;
630         ao2_ref(data, +1);
631         app->data = data;
632
633         ao2_ref(app, +1);
634         return app;
635 }
636
637 /*!
638  * \brief Send a message to the given application.
639  * \param app App to send the message to.
640  * \param message Message to send.
641  */
642 void app_send(struct app *app, struct ast_json *message)
643 {
644         stasis_app_cb handler;
645         RAII_VAR(void *, data, NULL, ao2_cleanup);
646
647         /* Copy off mutable state with lock held */
648         {
649                 SCOPED_AO2LOCK(lock, app);
650                 handler = app->handler;
651                 if (app->data) {
652                         ao2_ref(app->data, +1);
653                         data = app->data;
654                 }
655                 /* Name is immutable; no need to copy */
656         }
657
658         if (!handler) {
659                 ast_verb(3,
660                         "Inactive Stasis app '%s' missed message\n", app->name);
661                 return;
662         }
663
664         handler(data, app->name, message);
665 }
666
667 void app_deactivate(struct app *app)
668 {
669         SCOPED_AO2LOCK(lock, app);
670         ast_verb(1, "Deactivating Stasis app '%s'\n", app->name);
671         app->handler = NULL;
672         ao2_cleanup(app->data);
673         app->data = NULL;
674 }
675
676 void app_shutdown(struct app *app)
677 {
678         SCOPED_AO2LOCK(lock, app);
679
680         ast_assert(app_is_finished(app));
681
682         stasis_message_router_unsubscribe(app->router);
683         app->router = NULL;
684         stasis_unsubscribe(app->bridge_merge_sub);
685         app->bridge_merge_sub = NULL;
686 }
687
688 int app_is_active(struct app *app)
689 {
690         SCOPED_AO2LOCK(lock, app);
691         return app->handler != NULL;
692 }
693
694 int app_is_finished(struct app *app)
695 {
696         SCOPED_AO2LOCK(lock, app);
697
698         return app->handler == NULL && ao2_container_count(app->forwards) == 0;
699 }
700
701 void app_update(struct app *app, stasis_app_cb handler, void *data)
702 {
703         SCOPED_AO2LOCK(lock, app);
704
705         if (app->handler) {
706                 RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
707
708                 ast_verb(1, "Replacing Stasis app '%s'\n", app->name);
709
710                 msg = ast_json_pack("{s: s, s: s}",
711                         "type", "ApplicationReplaced",
712                         "application", app->name);
713                 if (msg) {
714                         app_send(app, msg);
715                 }
716         } else {
717                 ast_verb(1, "Activating Stasis app '%s'\n", app->name);
718         }
719
720         app->handler = handler;
721         ao2_cleanup(app->data);
722         if (data) {
723                 ao2_ref(data, +1);
724         }
725         app->data = data;
726 }
727
728 const char *app_name(const struct app *app)
729 {
730         return app->name;
731 }
732
733 struct ast_json *app_to_json(const struct app *app)
734 {
735         RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
736         struct ast_json *channels;
737         struct ast_json *bridges;
738         struct ast_json *endpoints;
739         struct ao2_iterator i;
740         void *obj;
741
742         json = ast_json_pack("{s: s, s: [], s: [], s: []}",
743                 "name", app->name,
744                 "channel_ids", "bridge_ids", "endpoint_ids");
745         channels = ast_json_object_get(json, "channel_ids");
746         bridges = ast_json_object_get(json, "bridge_ids");
747         endpoints = ast_json_object_get(json, "endpoint_ids");
748
749         i = ao2_iterator_init(app->forwards, 0);
750         while ((obj = ao2_iterator_next(&i))) {
751                 RAII_VAR(struct app_forwards *, forwards, obj, ao2_cleanup);
752                 RAII_VAR(struct ast_json *, id, NULL, ast_json_unref);
753                 int append_res = -1;
754
755                 id = ast_json_string_create(forwards->id);
756
757                 switch (forwards->forward_type) {
758                 case FORWARD_CHANNEL:
759                         append_res = ast_json_array_append(channels,
760                                 ast_json_ref(id));
761                         break;
762                 case FORWARD_BRIDGE:
763                         append_res = ast_json_array_append(bridges,
764                                 ast_json_ref(id));
765                         break;
766                 case FORWARD_ENDPOINT:
767                         append_res = ast_json_array_append(endpoints,
768                                 ast_json_ref(id));
769                         break;
770                 }
771
772                 if (append_res != 0) {
773                         ast_log(LOG_ERROR, "Error building response\n");
774                         ao2_iterator_destroy(&i);
775                         return NULL;
776                 }
777         }
778         ao2_iterator_destroy(&i);
779
780         return ast_json_ref(json);
781 }
782
783 int app_subscribe_channel(struct app *app, struct ast_channel *chan)
784 {
785         int res;
786
787         if (!app || !chan) {
788                 return -1;
789         } else {
790                 RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
791                 SCOPED_AO2LOCK(lock, app->forwards);
792
793                 forwards = ao2_find(app->forwards, ast_channel_uniqueid(chan),
794                         OBJ_SEARCH_KEY | OBJ_NOLOCK);
795                 if (!forwards) {
796                         /* Forwards not found, create one */
797                         forwards = forwards_create_channel(app, chan);
798                         if (!forwards) {
799                                 return -1;
800                         }
801
802                         res = ao2_link_flags(app->forwards, forwards,
803                                 OBJ_NOLOCK);
804                         if (!res) {
805                                 return -1;
806                         }
807                 }
808
809                 ++forwards->interested;
810                 return 0;
811         }
812 }
813
814 static int unsubscribe(struct app *app, const char *kind, const char *id)
815 {
816         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
817         SCOPED_AO2LOCK(lock, app->forwards);
818
819         forwards = ao2_find(app->forwards, id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
820         if (!forwards) {
821                 ast_log(LOG_WARNING,
822                         "App '%s' not subscribed to %s '%s'\n",
823                         app->name, kind, id);
824                 return -1;
825         }
826
827         if (--forwards->interested == 0) {
828                 /* No one is interested any more; unsubscribe */
829                 forwards_unsubscribe(forwards);
830                 ao2_find(app->forwards, forwards,
831                         OBJ_POINTER | OBJ_NOLOCK | OBJ_UNLINK |
832                         OBJ_NODATA);
833         }
834
835         return 0;
836 }
837
838 int app_unsubscribe_channel(struct app *app, struct ast_channel *chan)
839 {
840         if (!app || !chan) {
841                 return -1;
842         }
843
844         return app_unsubscribe_channel_id(app, ast_channel_uniqueid(chan));
845 }
846
847 int app_unsubscribe_channel_id(struct app *app, const char *channel_id)
848 {
849         if (!app || !channel_id) {
850                 return -1;
851         }
852
853         return unsubscribe(app, "channel", channel_id);
854 }
855
856 int app_is_subscribed_channel_id(struct app *app, const char *channel_id)
857 {
858         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
859         forwards = ao2_find(app->forwards, channel_id, OBJ_SEARCH_KEY);
860         return forwards != NULL;
861 }
862
863 int app_subscribe_bridge(struct app *app, struct ast_bridge *bridge)
864 {
865         if (!app || !bridge) {
866                 return -1;
867         } else {
868                 RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
869                 SCOPED_AO2LOCK(lock, app->forwards);
870
871                 forwards = ao2_find(app->forwards, bridge->uniqueid,
872                         OBJ_SEARCH_KEY | OBJ_NOLOCK);
873
874                 if (!forwards) {
875                         /* Forwards not found, create one */
876                         forwards = forwards_create_bridge(app, bridge);
877                         if (!forwards) {
878                                 return -1;
879                         }
880                         ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
881                 }
882
883                 ++forwards->interested;
884                 return 0;
885         }
886 }
887
888 int app_unsubscribe_bridge(struct app *app, struct ast_bridge *bridge)
889 {
890         if (!app || !bridge) {
891                 return -1;
892         }
893
894         return app_unsubscribe_bridge_id(app, bridge->uniqueid);
895 }
896
897 int app_unsubscribe_bridge_id(struct app *app, const char *bridge_id)
898 {
899         if (!app || !bridge_id) {
900                 return -1;
901         }
902
903         return unsubscribe(app, "bridge", bridge_id);
904 }
905
906 int app_is_subscribed_bridge_id(struct app *app, const char *bridge_id)
907 {
908         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
909         forwards = ao2_find(app->forwards, bridge_id, OBJ_SEARCH_KEY);
910         return forwards != NULL;
911 }
912
913 int app_subscribe_endpoint(struct app *app, struct ast_endpoint *endpoint)
914 {
915         if (!app || !endpoint) {
916                 return -1;
917         } else {
918                 RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
919                 SCOPED_AO2LOCK(lock, app->forwards);
920
921                 forwards = ao2_find(app->forwards, ast_endpoint_get_id(endpoint),
922                         OBJ_SEARCH_KEY | OBJ_NOLOCK);
923
924                 if (!forwards) {
925                         /* Forwards not found, create one */
926                         forwards = forwards_create_endpoint(app, endpoint);
927                         if (!forwards) {
928                                 return -1;
929                         }
930                         ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
931                 }
932
933                 ++forwards->interested;
934                 return 0;
935         }
936 }