Removing registrar_expire from basic-pbx config
[asterisk/asterisk.git] / res / stasis / app.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis application support.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 #include "asterisk.h"
27
28 #include "app.h"
29 #include "control.h"
30 #include "messaging.h"
31
32 #include "asterisk/callerid.h"
33 #include "asterisk/cli.h"
34 #include "asterisk/stasis_app.h"
35 #include "asterisk/stasis_bridges.h"
36 #include "asterisk/stasis_channels.h"
37 #include "asterisk/stasis_endpoints.h"
38 #include "asterisk/stasis_message_router.h"
39
40 #define BRIDGE_ALL "__AST_BRIDGE_ALL_TOPIC"
41 #define CHANNEL_ALL "__AST_CHANNEL_ALL_TOPIC"
42 #define ENDPOINT_ALL "__AST_ENDPOINT_ALL_TOPIC"
43
44 /*! Global debug flag.  No need for locking */
45 int global_debug;
46
47 static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate);
48
49 struct stasis_app {
50         /*! Aggregation topic for this application. */
51         struct stasis_topic *topic;
52         /*! Router for handling messages forwarded to \a topic. */
53         struct stasis_message_router *router;
54         /*! Router for handling messages to the bridge all \a topic. */
55         struct stasis_message_router *bridge_router;
56         /*! Optional router for handling endpoint messages in 'all' subscriptions */
57         struct stasis_message_router *endpoint_router;
58         /*! Container of the channel forwards to this app's topic. */
59         struct ao2_container *forwards;
60         /*! Callback function for this application. */
61         stasis_app_cb handler;
62         /*! Opaque data to hand to callback function. */
63         void *data;
64         /*! Subscription model for the application */
65         enum stasis_app_subscription_model subscription_model;
66         /*! Whether or not someone wants to see debug messages about this app */
67         int debug;
68         /*! Name of the Stasis application */
69         char name[];
70 };
71
72 enum forward_type {
73         FORWARD_CHANNEL,
74         FORWARD_BRIDGE,
75         FORWARD_ENDPOINT,
76 };
77
78 /*! Subscription info for a particular channel/bridge. */
79 struct app_forwards {
80         /*! Count of number of times this channel/bridge has been subscribed */
81         int interested;
82
83         /*! Forward for the regular topic */
84         struct stasis_forward *topic_forward;
85         /*! Forward for the caching topic */
86         struct stasis_forward *topic_cached_forward;
87
88         /* Type of object being forwarded */
89         enum forward_type forward_type;
90         /*! Unique id of the object being forwarded */
91         char id[];
92 };
93
94 static void forwards_dtor(void *obj)
95 {
96 #ifdef AST_DEVMODE
97         struct app_forwards *forwards = obj;
98 #endif /* AST_DEVMODE */
99
100         ast_assert(forwards->topic_forward == NULL);
101         ast_assert(forwards->topic_cached_forward == NULL);
102 }
103
104 static void forwards_unsubscribe(struct app_forwards *forwards)
105 {
106         stasis_forward_cancel(forwards->topic_forward);
107         forwards->topic_forward = NULL;
108         stasis_forward_cancel(forwards->topic_cached_forward);
109         forwards->topic_cached_forward = NULL;
110 }
111
112 static struct app_forwards *forwards_create(struct stasis_app *app,
113         const char *id)
114 {
115         struct app_forwards *forwards;
116
117         if (!app || ast_strlen_zero(id)) {
118                 return NULL;
119         }
120
121         forwards = ao2_t_alloc(sizeof(*forwards) + strlen(id) + 1, forwards_dtor, id);
122         if (!forwards) {
123                 return NULL;
124         }
125
126         strcpy(forwards->id, id); /* SAFE */
127
128         return forwards;
129 }
130
131 /*! Forward a channel's topics to an app */
132 static struct app_forwards *forwards_create_channel(struct stasis_app *app,
133         struct ast_channel *chan)
134 {
135         struct app_forwards *forwards;
136
137         if (!app) {
138                 return NULL;
139         }
140
141         forwards = forwards_create(app, chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL);
142         if (!forwards) {
143                 return NULL;
144         }
145
146         forwards->forward_type = FORWARD_CHANNEL;
147         forwards->topic_forward = stasis_forward_all(
148                 chan ? ast_channel_topic(chan) : ast_channel_topic_all(),
149                 app->topic);
150
151         if (!forwards->topic_forward) {
152                 ao2_ref(forwards, -1);
153                 return NULL;
154         }
155
156         return forwards;
157 }
158
159 /*! Forward a bridge's topics to an app */
160 static struct app_forwards *forwards_create_bridge(struct stasis_app *app,
161         struct ast_bridge *bridge)
162 {
163         struct app_forwards *forwards;
164
165         if (!app) {
166                 return NULL;
167         }
168
169         forwards = forwards_create(app, bridge ? bridge->uniqueid : BRIDGE_ALL);
170         if (!forwards) {
171                 return NULL;
172         }
173
174         forwards->forward_type = FORWARD_BRIDGE;
175         forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge), app->topic);
176
177         if (!forwards->topic_forward && bridge) {
178                 forwards_unsubscribe(forwards);
179                 ao2_ref(forwards, -1);
180                 return NULL;
181         }
182
183         return forwards;
184 }
185
186 static void endpoint_state_cb(void *data, struct stasis_subscription *sub,
187         struct stasis_message *message)
188 {
189         struct stasis_app *app = data;
190
191         stasis_publish(app->topic, message);
192 }
193
194 /*! Forward a endpoint's topics to an app */
195 static struct app_forwards *forwards_create_endpoint(struct stasis_app *app,
196         struct ast_endpoint *endpoint)
197 {
198         struct app_forwards *forwards;
199         int ret = 0;
200
201         if (!app) {
202                 return NULL;
203         }
204
205         forwards = forwards_create(app, endpoint ? ast_endpoint_get_id(endpoint) : ENDPOINT_ALL);
206         if (!forwards) {
207                 return NULL;
208         }
209
210         forwards->forward_type = FORWARD_ENDPOINT;
211         if (endpoint) {
212                 forwards->topic_forward = stasis_forward_all(ast_endpoint_topic(endpoint),
213                         app->topic);
214                 forwards->topic_cached_forward = stasis_forward_all(
215                         ast_endpoint_topic_cached(endpoint), app->topic);
216
217                 if (!forwards->topic_forward || !forwards->topic_cached_forward) {
218                         /* Half-subscribed is a bad thing */
219                         forwards_unsubscribe(forwards);
220                         ao2_ref(forwards, -1);
221                         return NULL;
222                 }
223         } else {
224                 /* Since endpoint subscriptions also subscribe to channels, in the case
225                  * of all endpoint subscriptions, we only want messages for the endpoints.
226                  * As such, we route those particular messages and then re-publish them
227                  * on the app's topic.
228                  */
229                 ast_assert(app->endpoint_router == NULL);
230                 app->endpoint_router = stasis_message_router_create(ast_endpoint_topic_all_cached());
231                 if (!app->endpoint_router) {
232                         forwards_unsubscribe(forwards);
233                         ao2_ref(forwards, -1);
234                         return NULL;
235                 }
236
237                 ret |= stasis_message_router_add(app->endpoint_router,
238                         ast_endpoint_state_type(), endpoint_state_cb, app);
239                 ret |= stasis_message_router_add(app->endpoint_router,
240                         ast_endpoint_contact_state_type(), endpoint_state_cb, app);
241
242                 if (ret) {
243                         ao2_ref(app->endpoint_router, -1);
244                         app->endpoint_router = NULL;
245                         ao2_ref(forwards, -1);
246                         return NULL;
247                 }
248         }
249
250         return forwards;
251 }
252
253 static int forwards_sort(const void *obj_left, const void *obj_right, int flags)
254 {
255         const struct app_forwards *object_left = obj_left;
256         const struct app_forwards *object_right = obj_right;
257         const char *right_key = obj_right;
258         int cmp;
259
260         switch (flags & (OBJ_POINTER | OBJ_KEY | OBJ_PARTIAL_KEY)) {
261         case OBJ_POINTER:
262                 right_key = object_right->id;
263                 /* Fall through */
264         case OBJ_KEY:
265                 cmp = strcmp(object_left->id, right_key);
266                 break;
267         case OBJ_PARTIAL_KEY:
268                 /*
269                  * We could also use a partial key struct containing a length
270                  * so strlen() does not get called for every comparison instead.
271                  */
272                 cmp = strncmp(object_left->id, right_key, strlen(right_key));
273                 break;
274         default:
275                 /* Sort can only work on something with a full or partial key. */
276                 ast_assert(0);
277                 cmp = 0;
278                 break;
279         }
280         return cmp;
281 }
282
283 static void app_dtor(void *obj)
284 {
285         struct stasis_app *app = obj;
286
287         ast_verb(1, "Destroying Stasis app %s\n", app->name);
288
289         ast_assert(app->router == NULL);
290         ast_assert(app->bridge_router == NULL);
291         ast_assert(app->endpoint_router == NULL);
292
293         ao2_cleanup(app->topic);
294         app->topic = NULL;
295         ao2_cleanup(app->forwards);
296         app->forwards = NULL;
297         ao2_cleanup(app->data);
298         app->data = NULL;
299 }
300
301 static void call_forwarded_handler(struct stasis_app *app, struct stasis_message *message)
302 {
303         struct ast_multi_channel_blob *payload = stasis_message_data(message);
304         struct ast_channel_snapshot *snapshot = ast_multi_channel_blob_get_channel(payload, "forwarded");
305         struct ast_channel *chan;
306
307         if (!snapshot) {
308                 return;
309         }
310
311         chan = ast_channel_get_by_name(snapshot->base->uniqueid);
312         if (!chan) {
313                 return;
314         }
315
316         app_subscribe_channel(app, chan);
317         ast_channel_unref(chan);
318 }
319
320 static void sub_default_handler(void *data, struct stasis_subscription *sub,
321         struct stasis_message *message)
322 {
323         struct stasis_app *app = data;
324         struct ast_json *json;
325
326         if (stasis_subscription_final_message(sub, message)) {
327                 ao2_cleanup(app);
328         }
329
330         if (stasis_message_type(message) == ast_channel_dial_type()) {
331                 call_forwarded_handler(app, message);
332         }
333
334         /* By default, send any message that has a JSON representation */
335         json = stasis_message_to_json(message, stasis_app_get_sanitizer());
336         if (!json) {
337                 return;
338         }
339
340         app_send(app, json);
341         ast_json_unref(json);
342 }
343
344 /*! \brief Typedef for callbacks that get called on channel snapshot updates */
345 typedef struct ast_json *(*channel_snapshot_monitor)(
346         struct ast_channel_snapshot *old_snapshot,
347         struct ast_channel_snapshot *new_snapshot,
348         const struct timeval *tv);
349
350 static struct ast_json *simple_channel_event(
351         const char *type,
352         struct ast_channel_snapshot *snapshot,
353         const struct timeval *tv)
354 {
355         struct ast_json *json_channel = ast_channel_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
356
357         if (!json_channel) {
358                 return NULL;
359         }
360
361         return ast_json_pack("{s: s, s: o, s: o}",
362                 "type", type,
363                 "timestamp", ast_json_timeval(*tv, NULL),
364                 "channel", json_channel);
365 }
366
367 static struct ast_json *channel_created_event(
368         struct ast_channel_snapshot *snapshot,
369         const struct timeval *tv)
370 {
371         return simple_channel_event("ChannelCreated", snapshot, tv);
372 }
373
374 static struct ast_json *channel_destroyed_event(
375         struct ast_channel_snapshot *snapshot,
376         const struct timeval *tv)
377 {
378         struct ast_json *json_channel = ast_channel_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
379
380         if (!json_channel) {
381                 return NULL;
382         }
383
384         return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
385                 "type", "ChannelDestroyed",
386                 "timestamp", ast_json_timeval(*tv, NULL),
387                 "cause", snapshot->hangup->cause,
388                 "cause_txt", ast_cause2str(snapshot->hangup->cause),
389                 "channel", json_channel);
390 }
391
392 static struct ast_json *channel_state_change_event(
393         struct ast_channel_snapshot *snapshot,
394         const struct timeval *tv)
395 {
396         return simple_channel_event("ChannelStateChange", snapshot, tv);
397 }
398
399 /*! \brief Handle channel state changes */
400 static struct ast_json *channel_state(
401         struct ast_channel_snapshot *old_snapshot,
402         struct ast_channel_snapshot *new_snapshot,
403         const struct timeval *tv)
404 {
405         struct ast_channel_snapshot *snapshot = new_snapshot ?
406                 new_snapshot : old_snapshot;
407
408         if (!old_snapshot) {
409                 return channel_created_event(snapshot, tv);
410         } else if (ast_test_flag(&new_snapshot->flags, AST_FLAG_DEAD)) {
411                 return channel_destroyed_event(snapshot, tv);
412         } else if (old_snapshot->state != new_snapshot->state) {
413                 return channel_state_change_event(snapshot, tv);
414         }
415
416         return NULL;
417 }
418
419 static struct ast_json *channel_dialplan(
420         struct ast_channel_snapshot *old_snapshot,
421         struct ast_channel_snapshot *new_snapshot,
422         const struct timeval *tv)
423 {
424         struct ast_json *json_channel;
425
426         /* No Newexten event on first channel snapshot */
427         if (!old_snapshot) {
428                 return NULL;
429         }
430
431         /* Empty application is not valid for a Newexten event */
432         if (ast_strlen_zero(new_snapshot->dialplan->appl)) {
433                 return NULL;
434         }
435
436         if (ast_channel_snapshot_cep_equal(old_snapshot, new_snapshot)) {
437                 return NULL;
438         }
439
440         json_channel = ast_channel_snapshot_to_json(new_snapshot, stasis_app_get_sanitizer());
441         if (!json_channel) {
442                 return NULL;
443         }
444
445         return ast_json_pack("{s: s, s: o, s: s, s: s, s: o}",
446                 "type", "ChannelDialplan",
447                 "timestamp", ast_json_timeval(*tv, NULL),
448                 "dialplan_app", new_snapshot->dialplan->appl,
449                 "dialplan_app_data", AST_JSON_UTF8_VALIDATE(new_snapshot->dialplan->data),
450                 "channel", json_channel);
451 }
452
453 static struct ast_json *channel_callerid(
454         struct ast_channel_snapshot *old_snapshot,
455         struct ast_channel_snapshot *new_snapshot,
456         const struct timeval *tv)
457 {
458         struct ast_json *json_channel;
459
460         /* No NewCallerid event on first channel snapshot */
461         if (!old_snapshot) {
462                 return NULL;
463         }
464
465         if (ast_channel_snapshot_caller_id_equal(old_snapshot, new_snapshot)) {
466                 return NULL;
467         }
468
469         json_channel = ast_channel_snapshot_to_json(new_snapshot, stasis_app_get_sanitizer());
470         if (!json_channel) {
471                 return NULL;
472         }
473
474         return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
475                 "type", "ChannelCallerId",
476                 "timestamp", ast_json_timeval(*tv, NULL),
477                 "caller_presentation", new_snapshot->caller->pres,
478                 "caller_presentation_txt", ast_describe_caller_presentation(
479                         new_snapshot->caller->pres),
480                 "channel", json_channel);
481 }
482
483 static struct ast_json *channel_connected_line(
484         struct ast_channel_snapshot *old_snapshot,
485         struct ast_channel_snapshot *new_snapshot,
486         const struct timeval *tv)
487 {
488         struct ast_json *json_channel;
489
490         /* No ChannelConnectedLine event on first channel snapshot */
491         if (!old_snapshot) {
492                 return NULL;
493         }
494
495         if (ast_channel_snapshot_connected_line_equal(old_snapshot, new_snapshot)) {
496                 return NULL;
497         }
498
499         json_channel = ast_channel_snapshot_to_json(new_snapshot, stasis_app_get_sanitizer());
500         if (!json_channel) {
501                 return NULL;
502         }
503
504         return ast_json_pack("{s: s, s: o, s: o}",
505                 "type", "ChannelConnectedLine",
506                 "timestamp", ast_json_timeval(*tv, NULL),
507                 "channel", json_channel);
508 }
509
510 static channel_snapshot_monitor channel_monitors[] = {
511         channel_state,
512         channel_dialplan,
513         channel_callerid,
514         channel_connected_line,
515 };
516
517 static void sub_channel_update_handler(void *data,
518         struct stasis_subscription *sub,
519         struct stasis_message *message)
520 {
521         struct stasis_app *app = data;
522         struct ast_channel_snapshot_update *update = stasis_message_data(message);
523         int i;
524
525         for (i = 0; i < ARRAY_LEN(channel_monitors); ++i) {
526                 struct ast_json *msg;
527
528                 msg = channel_monitors[i](update->old_snapshot, update->new_snapshot,
529                         stasis_message_timestamp(message));
530                 if (msg) {
531                         app_send(app, msg);
532                         ast_json_unref(msg);
533                 }
534         }
535
536         if (ast_test_flag(&update->new_snapshot->flags, AST_FLAG_DEAD)) {
537                 unsubscribe(app, "channel", update->new_snapshot->base->uniqueid, 1);
538         }
539 }
540
541 static struct ast_json *simple_endpoint_event(
542         const char *type,
543         struct ast_endpoint_snapshot *snapshot,
544         const struct timeval *tv)
545 {
546         struct ast_json *json_endpoint = ast_endpoint_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
547
548         if (!json_endpoint) {
549                 return NULL;
550         }
551
552         return ast_json_pack("{s: s, s: o, s: o}",
553                 "type", type,
554                 "timestamp", ast_json_timeval(*tv, NULL),
555                 "endpoint", json_endpoint);
556 }
557
558 static int message_received_handler(const char *endpoint_id, struct ast_json *json_msg, void *pvt)
559 {
560         struct ast_endpoint_snapshot *snapshot;
561         struct ast_json *json_endpoint;
562         struct ast_json *message;
563         struct stasis_app *app = pvt;
564         char *tech;
565         char *resource;
566
567         tech = ast_strdupa(endpoint_id);
568         resource = strchr(tech, '/');
569         if (resource) {
570                 resource[0] = '\0';
571                 resource++;
572         }
573
574         if (ast_strlen_zero(tech) || ast_strlen_zero(resource)) {
575                 return -1;
576         }
577
578         snapshot = ast_endpoint_latest_snapshot(tech, resource);
579         if (!snapshot) {
580                 return -1;
581         }
582
583         json_endpoint = ast_endpoint_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
584         ao2_ref(snapshot, -1);
585         if (!json_endpoint) {
586                 return -1;
587         }
588
589         message = ast_json_pack("{s: s, s: o, s: o, s: o}",
590                 "type", "TextMessageReceived",
591                 "timestamp", ast_json_timeval(ast_tvnow(), NULL),
592                 "endpoint", json_endpoint,
593                 "message", ast_json_ref(json_msg));
594         if (message) {
595                 app_send(app, message);
596                 ast_json_unref(message);
597         }
598
599         return 0;
600 }
601
602 static void sub_endpoint_update_handler(void *data,
603         struct stasis_subscription *sub,
604         struct stasis_message *message)
605 {
606         struct stasis_app *app = data;
607         struct stasis_cache_update *update;
608         struct ast_endpoint_snapshot *new_snapshot;
609         struct ast_endpoint_snapshot *old_snapshot;
610         const struct timeval *tv;
611
612         ast_assert(stasis_message_type(message) == stasis_cache_update_type());
613
614         update = stasis_message_data(message);
615
616         ast_assert(update->type == ast_endpoint_snapshot_type());
617
618         new_snapshot = stasis_message_data(update->new_snapshot);
619         old_snapshot = stasis_message_data(update->old_snapshot);
620
621         if (new_snapshot) {
622                 struct ast_json *json;
623
624                 tv = stasis_message_timestamp(update->new_snapshot);
625
626                 json = simple_endpoint_event("EndpointStateChange", new_snapshot, tv);
627                 if (!json) {
628                         return;
629                 }
630
631                 app_send(app, json);
632                 ast_json_unref(json);
633         }
634
635         if (!new_snapshot && old_snapshot) {
636                 unsubscribe(app, "endpoint", old_snapshot->id, 1);
637         }
638 }
639
640 static struct ast_json *simple_bridge_event(
641         const char *type,
642         struct ast_bridge_snapshot *snapshot,
643         const struct timeval *tv)
644 {
645         struct ast_json *json_bridge = ast_bridge_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
646         if (!json_bridge) {
647                 return NULL;
648         }
649
650         return ast_json_pack("{s: s, s: o, s: o}",
651                 "type", type,
652                 "timestamp", ast_json_timeval(*tv, NULL),
653                 "bridge", json_bridge);
654 }
655
656 static void sub_bridge_update_handler(void *data,
657         struct stasis_subscription *sub,
658         struct stasis_message *message)
659 {
660         struct ast_json *json = NULL;
661         struct stasis_app *app = data;
662         struct ast_bridge_snapshot_update *update;
663         const struct timeval *tv;
664
665         update = stasis_message_data(message);
666
667         tv = stasis_message_timestamp(message);
668
669         if (!update->new_snapshot) {
670                 json = simple_bridge_event("BridgeDestroyed", update->old_snapshot, tv);
671         } else if (!update->old_snapshot) {
672                 json = simple_bridge_event("BridgeCreated", update->new_snapshot, tv);
673         } else if (update->new_snapshot && update->old_snapshot
674                 && strcmp(update->new_snapshot->video_source_id, update->old_snapshot->video_source_id)) {
675                 json = simple_bridge_event("BridgeVideoSourceChanged", update->new_snapshot, tv);
676                 if (json && !ast_strlen_zero(update->old_snapshot->video_source_id)) {
677                         ast_json_object_set(json, "old_video_source_id",
678                                 ast_json_string_create(update->old_snapshot->video_source_id));
679                 }
680         }
681
682         if (json) {
683                 app_send(app, json);
684                 ast_json_unref(json);
685         }
686
687         if (!update->new_snapshot && update->old_snapshot) {
688                 unsubscribe(app, "bridge", update->old_snapshot->uniqueid, 1);
689         }
690 }
691
692
693 /*! \brief Helper function for determining if the application is subscribed to a given entity */
694 static int bridge_app_subscribed(struct stasis_app *app, const char *uniqueid)
695 {
696         struct app_forwards *forwards = NULL;
697
698         forwards = ao2_find(app->forwards, uniqueid, OBJ_SEARCH_KEY);
699         if (!forwards) {
700                 return 0;
701         }
702
703         ao2_ref(forwards, -1);
704         return 1;
705 }
706
707 static void bridge_merge_handler(void *data, struct stasis_subscription *sub,
708         struct stasis_message *message)
709 {
710         struct stasis_app *app = data;
711         struct ast_bridge_merge_message *merge;
712
713         merge = stasis_message_data(message);
714
715         /* Find out if we're subscribed to either bridge */
716         if (bridge_app_subscribed(app, merge->from->uniqueid) ||
717                 bridge_app_subscribed(app, merge->to->uniqueid)) {
718                 /* Forward the message to the app */
719                 stasis_publish(app->topic, message);
720         }
721 }
722
723 /*! \brief Callback function for checking if channels in a bridge are subscribed to */
724 static int bridge_app_subscribed_involved(struct stasis_app *app, struct ast_bridge_snapshot *snapshot)
725 {
726         int subscribed = 0;
727         struct ao2_iterator iter;
728         char *uniqueid;
729
730         if (bridge_app_subscribed(app, snapshot->uniqueid)) {
731                 return 1;
732         }
733
734         iter = ao2_iterator_init(snapshot->channels, 0);
735         for (; (uniqueid = ao2_iterator_next(&iter)); ao2_ref(uniqueid, -1)) {
736                 if (bridge_app_subscribed(app, uniqueid)) {
737                         subscribed = 1;
738                         ao2_ref(uniqueid, -1);
739                         break;
740                 }
741         }
742         ao2_iterator_destroy(&iter);
743
744         return subscribed;
745 }
746
747 static void bridge_blind_transfer_handler(void *data, struct stasis_subscription *sub,
748         struct stasis_message *message)
749 {
750         struct stasis_app *app = data;
751         struct ast_blind_transfer_message *transfer_msg = stasis_message_data(message);
752         struct ast_bridge_snapshot *bridge = transfer_msg->bridge;
753
754         if (bridge_app_subscribed(app, transfer_msg->transferer->base->uniqueid) ||
755                 (bridge && bridge_app_subscribed_involved(app, bridge))) {
756                 stasis_publish(app->topic, message);
757         }
758 }
759
760 static void bridge_attended_transfer_handler(void *data, struct stasis_subscription *sub,
761         struct stasis_message *message)
762 {
763         struct stasis_app *app = data;
764         struct ast_attended_transfer_message *transfer_msg = stasis_message_data(message);
765         int subscribed = 0;
766
767         subscribed = bridge_app_subscribed(app, transfer_msg->to_transferee.channel_snapshot->base->uniqueid);
768         if (!subscribed) {
769                 subscribed = bridge_app_subscribed(app, transfer_msg->to_transfer_target.channel_snapshot->base->uniqueid);
770         }
771         if (!subscribed && transfer_msg->to_transferee.bridge_snapshot) {
772                 subscribed = bridge_app_subscribed_involved(app, transfer_msg->to_transferee.bridge_snapshot);
773         }
774         if (!subscribed && transfer_msg->to_transfer_target.bridge_snapshot) {
775                 subscribed = bridge_app_subscribed_involved(app, transfer_msg->to_transfer_target.bridge_snapshot);
776         }
777
778         if (!subscribed) {
779                 switch (transfer_msg->dest_type) {
780                 case AST_ATTENDED_TRANSFER_DEST_BRIDGE_MERGE:
781                         subscribed = bridge_app_subscribed(app, transfer_msg->dest.bridge);
782                         break;
783                 case AST_ATTENDED_TRANSFER_DEST_LINK:
784                         subscribed = bridge_app_subscribed(app, transfer_msg->dest.links[0]->base->uniqueid);
785                         if (!subscribed) {
786                                 subscribed = bridge_app_subscribed(app, transfer_msg->dest.links[1]->base->uniqueid);
787                         }
788                         break;
789                 break;
790                 case AST_ATTENDED_TRANSFER_DEST_THREEWAY:
791                         subscribed = bridge_app_subscribed_involved(app, transfer_msg->dest.threeway.bridge_snapshot);
792                         if (!subscribed) {
793                                 subscribed = bridge_app_subscribed(app, transfer_msg->dest.threeway.channel_snapshot->base->uniqueid);
794                         }
795                         break;
796                 default:
797                         break;
798                 }
799         }
800
801         if (subscribed) {
802                 stasis_publish(app->topic, message);
803         }
804 }
805
806 static void bridge_default_handler(void *data, struct stasis_subscription *sub,
807         struct stasis_message *message)
808 {
809         struct stasis_app *app = data;
810
811         if (stasis_subscription_final_message(sub, message)) {
812                 ao2_cleanup(app);
813         }
814 }
815
816 void stasis_app_set_debug(struct stasis_app *app, int debug)
817 {
818         if (!app) {
819                 return;
820         }
821
822         app->debug = debug;
823 }
824
825 void stasis_app_set_debug_by_name(const char *app_name, int debug)
826 {
827         struct stasis_app *app = stasis_app_get_by_name(app_name);
828
829         if (!app) {
830                 return;
831         }
832
833         app->debug = debug;
834         ao2_cleanup(app);
835 }
836
837 int stasis_app_get_debug(struct stasis_app *app)
838 {
839         return (app ? app->debug : 0) || global_debug;
840 }
841
842 int stasis_app_get_debug_by_name(const char *app_name)
843 {
844         int debug_enabled = 0;
845
846         if (global_debug) {
847                 debug_enabled = 1;
848         } else {
849                 struct stasis_app *app = stasis_app_get_by_name(app_name);
850
851                 if (app) {
852                         if (app->debug) {
853                                 debug_enabled = 1;
854                         }
855                         ao2_ref(app, -1);
856                 }
857         }
858         return debug_enabled;
859 }
860
861 void stasis_app_set_global_debug(int debug)
862 {
863         global_debug = debug;
864         if (!global_debug) {
865                 struct ao2_container *app_names = stasis_app_get_all();
866                 struct ao2_iterator it_app_names;
867                 char *app_name;
868                 struct stasis_app *app;
869
870                 if (!app_names || !ao2_container_count(app_names)) {
871                         ao2_cleanup(app_names);
872                         return;
873                 }
874
875                 it_app_names = ao2_iterator_init(app_names, 0);
876                 while ((app_name = ao2_iterator_next(&it_app_names))) {
877                         if ((app = stasis_app_get_by_name(app_name))) {
878                                 stasis_app_set_debug(app, 0);
879                         }
880
881                         ao2_cleanup(app_name);
882                         ao2_cleanup(app);
883                 }
884                 ao2_iterator_cleanup(&it_app_names);
885                 ao2_cleanup(app_names);
886         }
887 }
888
889 struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *data, enum stasis_app_subscription_model subscription_model)
890 {
891         RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
892         size_t size;
893         int res = 0;
894
895         ast_assert(name != NULL);
896         ast_assert(handler != NULL);
897
898         ast_verb(1, "Creating Stasis app '%s'\n", name);
899
900         size = sizeof(*app) + strlen(name) + 1;
901         app = ao2_alloc_options(size, app_dtor, AO2_ALLOC_OPT_LOCK_MUTEX);
902         if (!app) {
903                 return NULL;
904         }
905         app->subscription_model = subscription_model;
906
907         app->forwards = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_MUTEX,
908                 AO2_CONTAINER_ALLOC_OPT_DUPS_OBJ_REJECT,
909                 forwards_sort, NULL);
910         if (!app->forwards) {
911                 return NULL;
912         }
913
914         app->topic = stasis_topic_create(name);
915         if (!app->topic) {
916                 return NULL;
917         }
918
919         app->bridge_router = stasis_message_router_create(ast_bridge_topic_all());
920         if (!app->bridge_router) {
921                 return NULL;
922         }
923
924         res |= stasis_message_router_add(app->bridge_router,
925                 ast_bridge_merge_message_type(), bridge_merge_handler, app);
926
927         res |= stasis_message_router_add(app->bridge_router,
928                 ast_blind_transfer_type(), bridge_blind_transfer_handler, app);
929
930         res |= stasis_message_router_add(app->bridge_router,
931                 ast_attended_transfer_type(), bridge_attended_transfer_handler, app);
932
933         res |= stasis_message_router_set_default(app->bridge_router,
934                 bridge_default_handler, app);
935
936         if (res != 0) {
937                 return NULL;
938         }
939         /* Bridge router holds a reference */
940         ao2_ref(app, +1);
941
942         app->router = stasis_message_router_create(app->topic);
943         if (!app->router) {
944                 return NULL;
945         }
946
947         res |= stasis_message_router_add(app->router,
948                 ast_bridge_snapshot_type(), sub_bridge_update_handler, app);
949
950         res |= stasis_message_router_add(app->router,
951                 ast_channel_snapshot_type(), sub_channel_update_handler, app);
952
953         res |= stasis_message_router_add_cache_update(app->router,
954                 ast_endpoint_snapshot_type(), sub_endpoint_update_handler, app);
955
956         res |= stasis_message_router_set_default(app->router,
957                 sub_default_handler, app);
958
959         if (res != 0) {
960                 return NULL;
961         }
962         /* Router holds a reference */
963         ao2_ref(app, +1);
964
965         strncpy(app->name, name, size - sizeof(*app));
966         app->handler = handler;
967         app->data = ao2_bump(data);
968
969         ao2_ref(app, +1);
970         return app;
971 }
972
973 struct stasis_topic *ast_app_get_topic(struct stasis_app *app)
974 {
975         return app->topic;
976 }
977
978 /*!
979  * \brief Send a message to the given application.
980  * \param app App to send the message to.
981  * \param message Message to send.
982  */
983 void app_send(struct stasis_app *app, struct ast_json *message)
984 {
985         stasis_app_cb handler;
986         char eid[20];
987         void *data;
988
989         if (ast_json_object_set(message, "asterisk_id", ast_json_string_create(
990                         ast_eid_to_str(eid, sizeof(eid), &ast_eid_default)))) {
991                 ast_log(AST_LOG_WARNING, "Failed to append EID to outgoing event %s\n",
992                         ast_json_string_get(ast_json_object_get(message, "type")));
993         }
994
995         /* Copy off mutable state with lock held */
996         ao2_lock(app);
997         handler = app->handler;
998         data = ao2_bump(app->data);
999         ao2_unlock(app);
1000         /* Name is immutable; no need to copy */
1001
1002         if (handler) {
1003                 handler(data, app->name, message);
1004         } else {
1005                 ast_verb(3,
1006                         "Inactive Stasis app '%s' missed message\n", app->name);
1007         }
1008         ao2_cleanup(data);
1009 }
1010
1011 void app_deactivate(struct stasis_app *app)
1012 {
1013         ao2_lock(app);
1014
1015         ast_verb(1, "Deactivating Stasis app '%s'\n", app->name);
1016         app->handler = NULL;
1017         ao2_cleanup(app->data);
1018         app->data = NULL;
1019
1020         ao2_unlock(app);
1021 }
1022
1023 void app_shutdown(struct stasis_app *app)
1024 {
1025         ao2_lock(app);
1026
1027         ast_assert(app_is_finished(app));
1028
1029         stasis_message_router_unsubscribe(app->router);
1030         app->router = NULL;
1031         stasis_message_router_unsubscribe(app->bridge_router);
1032         app->bridge_router = NULL;
1033         stasis_message_router_unsubscribe(app->endpoint_router);
1034         app->endpoint_router = NULL;
1035
1036         ao2_unlock(app);
1037 }
1038
1039 int app_is_active(struct stasis_app *app)
1040 {
1041         int ret;
1042
1043         ao2_lock(app);
1044         ret = app->handler != NULL;
1045         ao2_unlock(app);
1046
1047         return ret;
1048 }
1049
1050 int app_is_finished(struct stasis_app *app)
1051 {
1052         int ret;
1053
1054         ao2_lock(app);
1055         ret = app->handler == NULL && ao2_container_count(app->forwards) == 0;
1056         ao2_unlock(app);
1057
1058         return ret;
1059 }
1060
1061 void app_update(struct stasis_app *app, stasis_app_cb handler, void *data)
1062 {
1063         ao2_lock(app);
1064         if (app->handler && app->data) {
1065                 struct ast_json *msg;
1066
1067                 ast_verb(1, "Replacing Stasis app '%s'\n", app->name);
1068
1069                 msg = ast_json_pack("{s: s, s: s}",
1070                         "type", "ApplicationReplaced",
1071                         "application", app->name);
1072                 if (msg) {
1073                         app_send(app, msg);
1074                         ast_json_unref(msg);
1075                 }
1076         } else {
1077                 ast_verb(1, "Activating Stasis app '%s'\n", app->name);
1078         }
1079
1080         app->handler = handler;
1081         ao2_replace(app->data, data);
1082         ao2_unlock(app);
1083 }
1084
1085 const char *stasis_app_name(const struct stasis_app *app)
1086 {
1087         return app->name;
1088 }
1089
1090 static int forwards_filter_by_type(void *obj, void *arg, int flags)
1091 {
1092         struct app_forwards *forward = obj;
1093         enum forward_type *forward_type = arg;
1094
1095         if (forward->forward_type == *forward_type) {
1096                 return CMP_MATCH;
1097         }
1098
1099         return 0;
1100 }
1101
1102 void stasis_app_to_cli(const struct stasis_app *app, struct ast_cli_args *a)
1103 {
1104         struct ao2_iterator *channels;
1105         struct ao2_iterator *endpoints;
1106         struct ao2_iterator *bridges;
1107         struct app_forwards *forward;
1108         enum forward_type forward_type;
1109
1110         ast_cli(a->fd, "Name: %s\n"
1111                 "  Debug: %s\n"
1112                 "  Subscription Model: %s\n",
1113                 app->name,
1114                 app->debug ? "Yes" : "No",
1115                 app->subscription_model == STASIS_APP_SUBSCRIBE_ALL ?
1116                         "Global Resource Subscription" :
1117                         "Application/Explicit Resource Subscription");
1118         ast_cli(a->fd, "  Subscriptions: %d\n", ao2_container_count(app->forwards));
1119
1120         ast_cli(a->fd, "    Channels:\n");
1121         forward_type = FORWARD_CHANNEL;
1122         channels = ao2_callback(app->forwards, OBJ_MULTIPLE,
1123                 forwards_filter_by_type, &forward_type);
1124         if (channels) {
1125                 while ((forward = ao2_iterator_next(channels))) {
1126                         ast_cli(a->fd, "      %s (%d)\n", forward->id, forward->interested);
1127                         ao2_ref(forward, -1);
1128                 }
1129                 ao2_iterator_destroy(channels);
1130         }
1131
1132         ast_cli(a->fd, "    Bridges:\n");
1133         forward_type = FORWARD_BRIDGE;
1134         bridges = ao2_callback(app->forwards, OBJ_MULTIPLE,
1135                 forwards_filter_by_type, &forward_type);
1136         if (bridges) {
1137                 while ((forward = ao2_iterator_next(bridges))) {
1138                         ast_cli(a->fd, "      %s (%d)\n", forward->id, forward->interested);
1139                         ao2_ref(forward, -1);
1140                 }
1141                 ao2_iterator_destroy(bridges);
1142         }
1143
1144         ast_cli(a->fd, "    Endpoints:\n");
1145         forward_type = FORWARD_ENDPOINT;
1146         endpoints = ao2_callback(app->forwards, OBJ_MULTIPLE,
1147                 forwards_filter_by_type, &forward_type);
1148         if (endpoints) {
1149                 while ((forward = ao2_iterator_next(endpoints))) {
1150                         ast_cli(a->fd, "      %s (%d)\n", forward->id, forward->interested);
1151                         ao2_ref(forward, -1);
1152                 }
1153                 ao2_iterator_destroy(endpoints);
1154         }
1155 }
1156
1157 struct ast_json *app_to_json(const struct stasis_app *app)
1158 {
1159         struct ast_json *json;
1160         struct ast_json *channels;
1161         struct ast_json *bridges;
1162         struct ast_json *endpoints;
1163         struct ao2_iterator i;
1164         struct app_forwards *forwards;
1165
1166         json = ast_json_pack("{s: s, s: [], s: [], s: []}",
1167                 "name", app->name,
1168                 "channel_ids", "bridge_ids", "endpoint_ids");
1169         if (!json) {
1170                 return NULL;
1171         }
1172         channels = ast_json_object_get(json, "channel_ids");
1173         bridges = ast_json_object_get(json, "bridge_ids");
1174         endpoints = ast_json_object_get(json, "endpoint_ids");
1175
1176         i = ao2_iterator_init(app->forwards, 0);
1177         while ((forwards = ao2_iterator_next(&i))) {
1178                 struct ast_json *array = NULL;
1179                 int append_res;
1180
1181                 switch (forwards->forward_type) {
1182                 case FORWARD_CHANNEL:
1183                         array = channels;
1184                         break;
1185                 case FORWARD_BRIDGE:
1186                         array = bridges;
1187                         break;
1188                 case FORWARD_ENDPOINT:
1189                         array = endpoints;
1190                         break;
1191                 }
1192
1193                 /* If forward_type value is unexpected this will safely return an error. */
1194                 append_res = ast_json_array_append(array, ast_json_string_create(forwards->id));
1195                 ao2_ref(forwards, -1);
1196
1197                 if (append_res != 0) {
1198                         ast_log(LOG_ERROR, "Error building response\n");
1199                         ao2_iterator_destroy(&i);
1200                         ast_json_unref(json);
1201
1202                         return NULL;
1203                 }
1204         }
1205         ao2_iterator_destroy(&i);
1206
1207         return json;
1208 }
1209
1210 int app_subscribe_channel(struct stasis_app *app, struct ast_channel *chan)
1211 {
1212         struct app_forwards *forwards;
1213
1214         if (!app) {
1215                 return -1;
1216         }
1217
1218         ao2_lock(app->forwards);
1219         /* If subscribed to all, don't subscribe again */
1220         forwards = ao2_find(app->forwards, CHANNEL_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1221         if (forwards) {
1222                 ao2_unlock(app->forwards);
1223                 ao2_ref(forwards, -1);
1224
1225                 return 0;
1226         }
1227
1228         forwards = ao2_find(app->forwards,
1229                 chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL,
1230                 OBJ_SEARCH_KEY | OBJ_NOLOCK);
1231         if (!forwards) {
1232                 int res;
1233
1234                 /* Forwards not found, create one */
1235                 forwards = forwards_create_channel(app, chan);
1236                 if (!forwards) {
1237                         ao2_unlock(app->forwards);
1238
1239                         return -1;
1240                 }
1241
1242                 res = ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
1243                 if (!res) {
1244                         ao2_unlock(app->forwards);
1245                         ao2_ref(forwards, -1);
1246
1247                         return -1;
1248                 }
1249         }
1250
1251         ++forwards->interested;
1252         ast_debug(3, "Channel '%s' is %d interested in %s\n",
1253                 chan ? ast_channel_uniqueid(chan) : "ALL",
1254                 forwards->interested,
1255                 app->name);
1256
1257         ao2_unlock(app->forwards);
1258         ao2_ref(forwards, -1);
1259
1260         return 0;
1261 }
1262
1263 static int subscribe_channel(struct stasis_app *app, void *obj)
1264 {
1265         return app_subscribe_channel(app, obj);
1266 }
1267
1268 static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate)
1269 {
1270         struct app_forwards *forwards;
1271
1272         if (!id) {
1273                 if (!strcmp(kind, "bridge")) {
1274                         id = BRIDGE_ALL;
1275                 } else if (!strcmp(kind, "channel")) {
1276                         id = CHANNEL_ALL;
1277                 } else if (!strcmp(kind, "endpoint")) {
1278                         id = ENDPOINT_ALL;
1279                 } else {
1280                         ast_log(LOG_WARNING, "Unknown subscription kind '%s'\n", kind);
1281                         return -1;
1282                 }
1283         }
1284
1285         ao2_lock(app->forwards);
1286         forwards = ao2_find(app->forwards, id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1287         if (!forwards) {
1288                 ao2_unlock(app->forwards);
1289                 ast_debug(3, "App '%s' not subscribed to %s '%s'\n", app->name, kind, id);
1290                 return -1;
1291         }
1292         forwards->interested--;
1293
1294         ast_debug(3, "%s '%s': is %d interested in %s\n", kind, id, forwards->interested, app->name);
1295         if (forwards->interested == 0 || terminate) {
1296                 /* No one is interested any more; unsubscribe */
1297                 ast_debug(3, "%s '%s' unsubscribed from %s\n", kind, id, app->name);
1298                 forwards_unsubscribe(forwards);
1299                 ao2_find(app->forwards, forwards,
1300                         OBJ_POINTER | OBJ_NOLOCK | OBJ_UNLINK |
1301                         OBJ_NODATA);
1302
1303                 if (!strcmp(kind, "endpoint")) {
1304                         messaging_app_unsubscribe_endpoint(app->name, id);
1305                 }
1306         }
1307         ao2_unlock(app->forwards);
1308         ao2_ref(forwards, -1);
1309
1310         return 0;
1311 }
1312
1313 int app_unsubscribe_channel(struct stasis_app *app, struct ast_channel *chan)
1314 {
1315         if (!app) {
1316                 return -1;
1317         }
1318
1319         return app_unsubscribe_channel_id(app, chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL);
1320 }
1321
1322 int app_unsubscribe_channel_id(struct stasis_app *app, const char *channel_id)
1323 {
1324         if (!app) {
1325                 return -1;
1326         }
1327
1328         return unsubscribe(app, "channel", channel_id, 0);
1329 }
1330
1331 int app_is_subscribed_channel_id(struct stasis_app *app, const char *channel_id)
1332 {
1333         struct app_forwards *forwards;
1334
1335         if (ast_strlen_zero(channel_id)) {
1336                 channel_id = CHANNEL_ALL;
1337         }
1338         forwards = ao2_find(app->forwards, channel_id, OBJ_SEARCH_KEY);
1339         ao2_cleanup(forwards);
1340
1341         return forwards != NULL;
1342 }
1343
1344 static void *channel_find(const struct stasis_app *app, const char *id)
1345 {
1346         return ast_channel_get_by_name(id);
1347 }
1348
1349 struct stasis_app_event_source channel_event_source = {
1350         .scheme = "channel:",
1351         .find = channel_find,
1352         .subscribe = subscribe_channel,
1353         .unsubscribe = app_unsubscribe_channel_id,
1354         .is_subscribed = app_is_subscribed_channel_id
1355 };
1356
1357 int app_subscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
1358 {
1359         struct app_forwards *forwards;
1360
1361         if (!app) {
1362                 return -1;
1363         }
1364
1365         ao2_lock(app->forwards);
1366         /* If subscribed to all, don't subscribe again */
1367         forwards = ao2_find(app->forwards, BRIDGE_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1368         if (forwards) {
1369                 ao2_unlock(app->forwards);
1370                 ao2_ref(forwards, -1);
1371
1372                 return 0;
1373         }
1374
1375         forwards = ao2_find(app->forwards,
1376                 bridge ? bridge->uniqueid : BRIDGE_ALL,
1377                 OBJ_SEARCH_KEY | OBJ_NOLOCK);
1378         if (!forwards) {
1379                 int res;
1380
1381                 /* Forwards not found, create one */
1382                 forwards = forwards_create_bridge(app, bridge);
1383                 if (!forwards) {
1384                         ao2_unlock(app->forwards);
1385
1386                         return -1;
1387                 }
1388
1389                 res = ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
1390                 if (!res) {
1391                         ao2_unlock(app->forwards);
1392                         ao2_ref(forwards, -1);
1393
1394                         return -1;
1395                 }
1396         }
1397
1398         ++forwards->interested;
1399         ast_debug(3, "Bridge '%s' is %d interested in %s\n",
1400                 bridge ? bridge->uniqueid : "ALL",
1401                 forwards->interested,
1402                 app->name);
1403
1404         ao2_unlock(app->forwards);
1405         ao2_ref(forwards, -1);
1406
1407         return 0;
1408 }
1409
1410 static int subscribe_bridge(struct stasis_app *app, void *obj)
1411 {
1412         return app_subscribe_bridge(app, obj);
1413 }
1414
1415 int app_unsubscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
1416 {
1417         if (!app) {
1418                 return -1;
1419         }
1420
1421         return app_unsubscribe_bridge_id(app, bridge ? bridge->uniqueid : BRIDGE_ALL);
1422 }
1423
1424 int app_unsubscribe_bridge_id(struct stasis_app *app, const char *bridge_id)
1425 {
1426         if (!app) {
1427                 return -1;
1428         }
1429
1430         return unsubscribe(app, "bridge", bridge_id, 0);
1431 }
1432
1433 int app_is_subscribed_bridge_id(struct stasis_app *app, const char *bridge_id)
1434 {
1435         struct app_forwards *forwards;
1436
1437         if (ast_strlen_zero(bridge_id)) {
1438                 bridge_id = BRIDGE_ALL;
1439         }
1440
1441         forwards = ao2_find(app->forwards, bridge_id, OBJ_SEARCH_KEY);
1442         ao2_cleanup(forwards);
1443
1444         return forwards != NULL;
1445 }
1446
1447 static void *bridge_find(const struct stasis_app *app, const char *id)
1448 {
1449         return stasis_app_bridge_find_by_id(id);
1450 }
1451
1452 struct stasis_app_event_source bridge_event_source = {
1453         .scheme = "bridge:",
1454         .find = bridge_find,
1455         .subscribe = subscribe_bridge,
1456         .unsubscribe = app_unsubscribe_bridge_id,
1457         .is_subscribed = app_is_subscribed_bridge_id
1458 };
1459
1460 int app_subscribe_endpoint(struct stasis_app *app, struct ast_endpoint *endpoint)
1461 {
1462         struct app_forwards *forwards;
1463
1464         if (!app) {
1465                 return -1;
1466         }
1467
1468         ao2_lock(app->forwards);
1469         /* If subscribed to all, don't subscribe again */
1470         forwards = ao2_find(app->forwards, ENDPOINT_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1471         if (forwards) {
1472                 ao2_unlock(app->forwards);
1473                 ao2_ref(forwards, -1);
1474
1475                 return 0;
1476         }
1477
1478         forwards = ao2_find(app->forwards,
1479                 endpoint ? ast_endpoint_get_id(endpoint) : ENDPOINT_ALL,
1480                 OBJ_SEARCH_KEY | OBJ_NOLOCK);
1481         if (!forwards) {
1482                 int res;
1483
1484                 /* Forwards not found, create one */
1485                 forwards = forwards_create_endpoint(app, endpoint);
1486                 if (!forwards) {
1487                         ao2_unlock(app->forwards);
1488
1489                         return -1;
1490                 }
1491
1492                 res = ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
1493                 if (!res) {
1494                         ao2_unlock(app->forwards);
1495                         ao2_ref(forwards, -1);
1496
1497                         return -1;
1498                 }
1499
1500                 /* Subscribe for messages */
1501                 messaging_app_subscribe_endpoint(app->name, endpoint, &message_received_handler, app);
1502         }
1503
1504         ++forwards->interested;
1505         ast_debug(3, "Endpoint '%s' is %d interested in %s\n",
1506                 endpoint ? ast_endpoint_get_id(endpoint) : "ALL",
1507                 forwards->interested,
1508                 app->name);
1509
1510         ao2_unlock(app->forwards);
1511         ao2_ref(forwards, -1);
1512
1513         return 0;
1514 }
1515
1516 static int subscribe_endpoint(struct stasis_app *app, void *obj)
1517 {
1518         return app_subscribe_endpoint(app, obj);
1519 }
1520
1521 int app_unsubscribe_endpoint_id(struct stasis_app *app, const char *endpoint_id)
1522 {
1523         if (!app) {
1524                 return -1;
1525         }
1526
1527         return unsubscribe(app, "endpoint", endpoint_id, 0);
1528 }
1529
1530 int app_is_subscribed_endpoint_id(struct stasis_app *app, const char *endpoint_id)
1531 {
1532         struct app_forwards *forwards;
1533
1534         if (ast_strlen_zero(endpoint_id)) {
1535                 endpoint_id = ENDPOINT_ALL;
1536         }
1537         forwards = ao2_find(app->forwards, endpoint_id, OBJ_SEARCH_KEY);
1538         ao2_cleanup(forwards);
1539
1540         return forwards != NULL;
1541 }
1542
1543 static void *endpoint_find(const struct stasis_app *app, const char *id)
1544 {
1545         return ast_endpoint_find_by_id(id);
1546 }
1547
1548 struct stasis_app_event_source endpoint_event_source = {
1549         .scheme = "endpoint:",
1550         .find = endpoint_find,
1551         .subscribe = subscribe_endpoint,
1552         .unsubscribe = app_unsubscribe_endpoint_id,
1553         .is_subscribed = app_is_subscribed_endpoint_id
1554 };
1555
1556 void stasis_app_register_event_sources(void)
1557 {
1558         stasis_app_register_event_source(&channel_event_source);
1559         stasis_app_register_event_source(&bridge_event_source);
1560         stasis_app_register_event_source(&endpoint_event_source);
1561 }
1562
1563 void stasis_app_unregister_event_sources(void)
1564 {
1565         stasis_app_unregister_event_source(&endpoint_event_source);
1566         stasis_app_unregister_event_source(&bridge_event_source);
1567         stasis_app_unregister_event_source(&channel_event_source);
1568 }