res_stasis: Enable transfers and provide events when they occur.
[asterisk/asterisk.git] / res / stasis / app.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis application support.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 #include "asterisk.h"
27
28 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
29
30 #include "app.h"
31
32 #include "asterisk/callerid.h"
33 #include "asterisk/stasis_app.h"
34 #include "asterisk/stasis_bridges.h"
35 #include "asterisk/stasis_channels.h"
36 #include "asterisk/stasis_endpoints.h"
37 #include "asterisk/stasis_message_router.h"
38
39 struct stasis_app {
40         /*! Aggregation topic for this application. */
41         struct stasis_topic *topic;
42         /*! Router for handling messages forwarded to \a topic. */
43         struct stasis_message_router *router;
44         /*! Router for handling messages to the bridge all \a topic. */
45         struct stasis_message_router *bridge_router;
46         /*! Container of the channel forwards to this app's topic. */
47         struct ao2_container *forwards;
48         /*! Callback function for this application. */
49         stasis_app_cb handler;
50         /*! Opaque data to hand to callback function. */
51         void *data;
52         /*! Name of the Stasis application */
53         char name[];
54 };
55
56 enum forward_type {
57         FORWARD_CHANNEL,
58         FORWARD_BRIDGE,
59         FORWARD_ENDPOINT,
60 };
61
62 /*! Subscription info for a particular channel/bridge. */
63 struct app_forwards {
64         /*! Count of number of times this channel/bridge has been subscribed */
65         int interested;
66
67         /*! Forward for the regular topic */
68         struct stasis_forward *topic_forward;
69         /*! Forward for the caching topic */
70         struct stasis_forward *topic_cached_forward;
71
72         /* Type of object being forwarded */
73         enum forward_type forward_type;
74         /*! Unique id of the object being forwarded */
75         char id[];
76 };
77
78 static void forwards_dtor(void *obj)
79 {
80 #ifdef AST_DEVMODE
81         struct app_forwards *forwards = obj;
82 #endif /* AST_DEVMODE */
83
84         ast_assert(forwards->topic_forward == NULL);
85         ast_assert(forwards->topic_cached_forward == NULL);
86 }
87
88 static void forwards_unsubscribe(struct app_forwards *forwards)
89 {
90         stasis_forward_cancel(forwards->topic_forward);
91         forwards->topic_forward = NULL;
92         stasis_forward_cancel(forwards->topic_cached_forward);
93         forwards->topic_cached_forward = NULL;
94 }
95
96 static struct app_forwards *forwards_create(struct stasis_app *app,
97         const char *id)
98 {
99         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
100
101         if (!app || ast_strlen_zero(id)) {
102                 return NULL;
103         }
104
105         forwards = ao2_alloc(sizeof(*forwards) + strlen(id) + 1, forwards_dtor);
106         if (!forwards) {
107                 return NULL;
108         }
109
110         strcpy(forwards->id, id);
111
112         ao2_ref(forwards, +1);
113         return forwards;
114 }
115
116 /*! Forward a channel's topics to an app */
117 static struct app_forwards *forwards_create_channel(struct stasis_app *app,
118         struct ast_channel *chan)
119 {
120         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
121
122         if (!app || !chan) {
123                 return NULL;
124         }
125
126         forwards = forwards_create(app, ast_channel_uniqueid(chan));
127         if (!forwards) {
128                 return NULL;
129         }
130
131         forwards->forward_type = FORWARD_CHANNEL;
132         forwards->topic_forward = stasis_forward_all(ast_channel_topic(chan),
133                 app->topic);
134         if (!forwards->topic_forward) {
135                 return NULL;
136         }
137
138         forwards->topic_cached_forward = stasis_forward_all(
139                 ast_channel_topic_cached(chan), app->topic);
140         if (!forwards->topic_cached_forward) {
141                 /* Half-subscribed is a bad thing */
142                 stasis_forward_cancel(forwards->topic_forward);
143                 forwards->topic_forward = NULL;
144                 return NULL;
145         }
146
147         ao2_ref(forwards, +1);
148         return forwards;
149 }
150
151 /*! Forward a bridge's topics to an app */
152 static struct app_forwards *forwards_create_bridge(struct stasis_app *app,
153         struct ast_bridge *bridge)
154 {
155         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
156
157         if (!app || !bridge) {
158                 return NULL;
159         }
160
161         forwards = forwards_create(app, bridge->uniqueid);
162         if (!forwards) {
163                 return NULL;
164         }
165
166         forwards->forward_type = FORWARD_BRIDGE;
167         forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge),
168                 app->topic);
169         if (!forwards->topic_forward) {
170                 return NULL;
171         }
172
173         forwards->topic_cached_forward = stasis_forward_all(
174                 ast_bridge_topic_cached(bridge), app->topic);
175         if (!forwards->topic_cached_forward) {
176                 /* Half-subscribed is a bad thing */
177                 stasis_forward_cancel(forwards->topic_forward);
178                 forwards->topic_forward = NULL;
179                 return NULL;
180         }
181
182         ao2_ref(forwards, +1);
183         return forwards;
184 }
185
186 /*! Forward a endpoint's topics to an app */
187 static struct app_forwards *forwards_create_endpoint(struct stasis_app *app,
188         struct ast_endpoint *endpoint)
189 {
190         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
191
192         if (!app || !endpoint) {
193                 return NULL;
194         }
195
196         forwards = forwards_create(app, ast_endpoint_get_id(endpoint));
197         if (!forwards) {
198                 return NULL;
199         }
200
201         forwards->forward_type = FORWARD_ENDPOINT;
202         forwards->topic_forward = stasis_forward_all(ast_endpoint_topic(endpoint),
203                 app->topic);
204         if (!forwards->topic_forward) {
205                 return NULL;
206         }
207
208         forwards->topic_cached_forward = stasis_forward_all(
209                 ast_endpoint_topic_cached(endpoint), app->topic);
210         if (!forwards->topic_cached_forward) {
211                 /* Half-subscribed is a bad thing */
212                 stasis_forward_cancel(forwards->topic_forward);
213                 forwards->topic_forward = NULL;
214                 return NULL;
215         }
216
217         ao2_ref(forwards, +1);
218         return forwards;
219 }
220
221 static int forwards_sort(const void *obj_left, const void *obj_right, int flags)
222 {
223         const struct app_forwards *object_left = obj_left;
224         const struct app_forwards *object_right = obj_right;
225         const char *right_key = obj_right;
226         int cmp;
227
228         switch (flags & (OBJ_POINTER | OBJ_KEY | OBJ_PARTIAL_KEY)) {
229         case OBJ_POINTER:
230                 right_key = object_right->id;
231                 /* Fall through */
232         case OBJ_KEY:
233                 cmp = strcmp(object_left->id, right_key);
234                 break;
235         case OBJ_PARTIAL_KEY:
236                 /*
237                  * We could also use a partial key struct containing a length
238                  * so strlen() does not get called for every comparison instead.
239                  */
240                 cmp = strncmp(object_left->id, right_key, strlen(right_key));
241                 break;
242         default:
243                 /* Sort can only work on something with a full or partial key. */
244                 ast_assert(0);
245                 cmp = 0;
246                 break;
247         }
248         return cmp;
249 }
250
251 static void app_dtor(void *obj)
252 {
253         struct stasis_app *app = obj;
254
255         ast_verb(1, "Destroying Stasis app %s\n", app->name);
256
257         ast_assert(app->router == NULL);
258         ast_assert(app->bridge_router == NULL);
259
260         ao2_cleanup(app->topic);
261         app->topic = NULL;
262         ao2_cleanup(app->forwards);
263         app->forwards = NULL;
264         ao2_cleanup(app->data);
265         app->data = NULL;
266 }
267
268 static void call_forwarded_handler(struct stasis_app *app, struct stasis_message *message)
269 {
270         struct ast_multi_channel_blob *payload = stasis_message_data(message);
271         struct ast_channel_snapshot *snapshot = ast_multi_channel_blob_get_channel(payload, "forwarded");
272         struct ast_channel *chan;
273
274         if (!snapshot) {
275                 return;
276         }
277
278         chan = ast_channel_get_by_name(snapshot->uniqueid);
279         if (!chan) {
280                 return;
281         }
282
283         app_subscribe_channel(app, chan);
284         ast_channel_unref(chan);
285 }
286
287 static void sub_default_handler(void *data, struct stasis_subscription *sub,
288         struct stasis_message *message)
289 {
290         struct stasis_app *app = data;
291         RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
292
293         if (stasis_subscription_final_message(sub, message)) {
294                 ao2_cleanup(app);
295         }
296
297         if (stasis_message_type(message) == ast_channel_dial_type()) {
298                 call_forwarded_handler(app, message);
299         }
300
301         /* By default, send any message that has a JSON representation */
302         json = stasis_message_to_json(message, stasis_app_get_sanitizer());
303         if (!json) {
304                 return;
305         }
306
307         app_send(app, json);
308 }
309
310 /*! \brief Typedef for callbacks that get called on channel snapshot updates */
311 typedef struct ast_json *(*channel_snapshot_monitor)(
312         struct ast_channel_snapshot *old_snapshot,
313         struct ast_channel_snapshot *new_snapshot,
314         const struct timeval *tv);
315
316 static struct ast_json *simple_channel_event(
317         const char *type,
318         struct ast_channel_snapshot *snapshot,
319         const struct timeval *tv)
320 {
321         struct ast_json *json_channel = ast_channel_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
322
323         if (!json_channel) {
324                 return NULL;
325         }
326
327         return ast_json_pack("{s: s, s: o, s: o}",
328                 "type", type,
329                 "timestamp", ast_json_timeval(*tv, NULL),
330                 "channel", json_channel);
331 }
332
333 static struct ast_json *channel_created_event(
334         struct ast_channel_snapshot *snapshot,
335         const struct timeval *tv)
336 {
337         return simple_channel_event("ChannelCreated", snapshot, tv);
338 }
339
340 static struct ast_json *channel_destroyed_event(
341         struct ast_channel_snapshot *snapshot,
342         const struct timeval *tv)
343 {
344         struct ast_json *json_channel = ast_channel_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
345
346         if (!json_channel) {
347                 return NULL;
348         }
349
350         return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
351                 "type", "ChannelDestroyed",
352                 "timestamp", ast_json_timeval(*tv, NULL),
353                 "cause", snapshot->hangupcause,
354                 "cause_txt", ast_cause2str(snapshot->hangupcause),
355                 "channel", json_channel);
356 }
357
358 static struct ast_json *channel_state_change_event(
359         struct ast_channel_snapshot *snapshot,
360         const struct timeval *tv)
361 {
362         return simple_channel_event("ChannelStateChange", snapshot, tv);
363 }
364
365 /*! \brief Handle channel state changes */
366 static struct ast_json *channel_state(
367         struct ast_channel_snapshot *old_snapshot,
368         struct ast_channel_snapshot *new_snapshot,
369         const struct timeval *tv)
370 {
371         struct ast_channel_snapshot *snapshot = new_snapshot ?
372                 new_snapshot : old_snapshot;
373
374         if (!old_snapshot) {
375                 return channel_created_event(snapshot, tv);
376         } else if (!new_snapshot) {
377                 return channel_destroyed_event(snapshot, tv);
378         } else if (old_snapshot->state != new_snapshot->state) {
379                 return channel_state_change_event(snapshot, tv);
380         }
381
382         return NULL;
383 }
384
385 static struct ast_json *channel_dialplan(
386         struct ast_channel_snapshot *old_snapshot,
387         struct ast_channel_snapshot *new_snapshot,
388         const struct timeval *tv)
389 {
390         RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
391         struct ast_json *json_channel;
392
393         /* No Newexten event on cache clear or first event */
394         if (!old_snapshot || !new_snapshot) {
395                 return NULL;
396         }
397
398         /* Empty application is not valid for a Newexten event */
399         if (ast_strlen_zero(new_snapshot->appl)) {
400                 return NULL;
401         }
402
403         if (ast_channel_snapshot_cep_equal(old_snapshot, new_snapshot)) {
404                 return NULL;
405         }
406
407         json_channel = ast_channel_snapshot_to_json(new_snapshot, stasis_app_get_sanitizer());
408         if (!json_channel) {
409                 return NULL;
410         }
411
412         return ast_json_pack("{s: s, s: o, s: s, s: s, s: o}",
413                 "type", "ChannelDialplan",
414                 "timestamp", ast_json_timeval(*tv, NULL),
415                 "dialplan_app", new_snapshot->appl,
416                 "dialplan_app_data", new_snapshot->data,
417                 "channel", json_channel);
418 }
419
420 static struct ast_json *channel_callerid(
421         struct ast_channel_snapshot *old_snapshot,
422         struct ast_channel_snapshot *new_snapshot,
423         const struct timeval *tv)
424 {
425         RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
426         struct ast_json *json_channel;
427
428         /* No NewCallerid event on cache clear or first event */
429         if (!old_snapshot || !new_snapshot) {
430                 return NULL;
431         }
432
433         if (ast_channel_snapshot_caller_id_equal(old_snapshot, new_snapshot)) {
434                 return NULL;
435         }
436
437         json_channel = ast_channel_snapshot_to_json(new_snapshot, stasis_app_get_sanitizer());
438         if (!json_channel) {
439                 return NULL;
440         }
441
442         return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
443                 "type", "ChannelCallerId",
444                 "timestamp", ast_json_timeval(*tv, NULL),
445                 "caller_presentation", new_snapshot->caller_pres,
446                 "caller_presentation_txt", ast_describe_caller_presentation(
447                         new_snapshot->caller_pres),
448                 "channel", json_channel);
449 }
450
451 static channel_snapshot_monitor channel_monitors[] = {
452         channel_state,
453         channel_dialplan,
454         channel_callerid
455 };
456
457 static void sub_channel_update_handler(void *data,
458         struct stasis_subscription *sub,
459         struct stasis_message *message)
460 {
461         struct stasis_app *app = data;
462         struct stasis_cache_update *update;
463         struct ast_channel_snapshot *new_snapshot;
464         struct ast_channel_snapshot *old_snapshot;
465         const struct timeval *tv;
466         int i;
467
468         ast_assert(stasis_message_type(message) == stasis_cache_update_type());
469
470         update = stasis_message_data(message);
471
472         ast_assert(update->type == ast_channel_snapshot_type());
473
474         new_snapshot = stasis_message_data(update->new_snapshot);
475         old_snapshot = stasis_message_data(update->old_snapshot);
476
477         /* Pull timestamp from the new snapshot, or from the update message
478          * when there isn't one. */
479         tv = update->new_snapshot ?
480                 stasis_message_timestamp(update->new_snapshot) :
481                 stasis_message_timestamp(message);
482
483         for (i = 0; i < ARRAY_LEN(channel_monitors); ++i) {
484                 RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
485
486                 msg = channel_monitors[i](old_snapshot, new_snapshot, tv);
487                 if (msg) {
488                         app_send(app, msg);
489                 }
490         }
491 }
492
493 static struct ast_json *simple_endpoint_event(
494         const char *type,
495         struct ast_endpoint_snapshot *snapshot,
496         const struct timeval *tv)
497 {
498         struct ast_json *json_endpoint = ast_endpoint_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
499
500         if (!json_endpoint) {
501                 return NULL;
502         }
503
504         return ast_json_pack("{s: s, s: o, s: o}",
505                 "type", type,
506                 "timestamp", ast_json_timeval(*tv, NULL),
507                 "endpoint", json_endpoint);
508 }
509
510 static void sub_endpoint_update_handler(void *data,
511         struct stasis_subscription *sub,
512         struct stasis_message *message)
513 {
514         RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
515         struct stasis_app *app = data;
516         struct stasis_cache_update *update;
517         struct ast_endpoint_snapshot *new_snapshot;
518         const struct timeval *tv;
519
520         ast_assert(stasis_message_type(message) == stasis_cache_update_type());
521
522         update = stasis_message_data(message);
523
524         ast_assert(update->type == ast_endpoint_snapshot_type());
525
526         new_snapshot = stasis_message_data(update->new_snapshot);
527         tv = update->new_snapshot ?
528                 stasis_message_timestamp(update->new_snapshot) :
529                 stasis_message_timestamp(message);
530
531         json = simple_endpoint_event("EndpointStateChange", new_snapshot, tv);
532
533         if (!json) {
534                 return;
535         }
536
537         app_send(app, json);
538 }
539
540 static struct ast_json *simple_bridge_event(
541         const char *type,
542         struct ast_bridge_snapshot *snapshot,
543         const struct timeval *tv)
544 {
545         struct ast_json *json_bridge = ast_bridge_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
546         if (!json_bridge) {
547                 return NULL;
548         }
549
550         return ast_json_pack("{s: s, s: o, s: o}",
551                 "type", type,
552                 "timestamp", ast_json_timeval(*tv, NULL),
553                 "bridge", json_bridge);
554 }
555
556 static void sub_bridge_update_handler(void *data,
557         struct stasis_subscription *sub,
558         struct stasis_message *message)
559 {
560         RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
561         struct stasis_app *app = data;
562         struct stasis_cache_update *update;
563         struct ast_bridge_snapshot *new_snapshot;
564         struct ast_bridge_snapshot *old_snapshot;
565         const struct timeval *tv;
566
567         ast_assert(stasis_message_type(message) == stasis_cache_update_type());
568
569         update = stasis_message_data(message);
570
571         ast_assert(update->type == ast_bridge_snapshot_type());
572
573         new_snapshot = stasis_message_data(update->new_snapshot);
574         old_snapshot = stasis_message_data(update->old_snapshot);
575         tv = update->new_snapshot ?
576                 stasis_message_timestamp(update->new_snapshot) :
577                 stasis_message_timestamp(message);
578
579         if (!new_snapshot) {
580                 json = simple_bridge_event("BridgeDestroyed", old_snapshot, tv);
581         } else if (!old_snapshot) {
582                 json = simple_bridge_event("BridgeCreated", new_snapshot, tv);
583         }
584
585         if (!json) {
586                 return;
587         }
588
589         app_send(app, json);
590 }
591
592
593 /*! \brief Helper function for determining if the application is subscribed to a given entity */
594 static int bridge_app_subscribed(struct stasis_app *app, const char *uniqueid)
595 {
596         struct app_forwards *forwards = NULL;
597
598         forwards = ao2_find(app->forwards, uniqueid, OBJ_SEARCH_KEY);
599         if (!forwards) {
600                 return 0;
601         }
602
603         ao2_ref(forwards, -1);
604         return 1;
605 }
606
607 static void bridge_merge_handler(void *data, struct stasis_subscription *sub,
608         struct stasis_message *message)
609 {
610         struct stasis_app *app = data;
611         struct ast_bridge_merge_message *merge;
612
613         merge = stasis_message_data(message);
614
615         /* Find out if we're subscribed to either bridge */
616         if (bridge_app_subscribed(app, merge->from->uniqueid) ||
617                 bridge_app_subscribed(app, merge->to->uniqueid)) {
618                 /* Forward the message to the app */
619                 stasis_publish(app->topic, message);
620         }
621 }
622
623 /*! \brief Callback function for checking if channels in a bridge are subscribed to */
624 static int bridge_app_subscribed_involved(struct stasis_app *app, struct ast_bridge_snapshot *snapshot)
625 {
626         int subscribed = 0;
627         struct ao2_iterator iter;
628         char *uniqueid;
629
630         if (bridge_app_subscribed(app, snapshot->uniqueid)) {
631                 return 1;
632         }
633
634         iter = ao2_iterator_init(snapshot->channels, 0);
635         for (; (uniqueid = ao2_iterator_next(&iter)); ao2_ref(uniqueid, -1)) {
636                 if (bridge_app_subscribed(app, uniqueid)) {
637                         subscribed = 1;
638                         ao2_ref(uniqueid, -1);
639                         break;
640                 }
641         }
642         ao2_iterator_destroy(&iter);
643
644         return subscribed;
645 }
646
647 static void bridge_blind_transfer_handler(void *data, struct stasis_subscription *sub,
648         struct stasis_message *message)
649 {
650         struct stasis_app *app = data;
651         struct ast_bridge_blob *blob = stasis_message_data(message);
652
653         if (bridge_app_subscribed(app, blob->channel->uniqueid) ||
654                 bridge_app_subscribed_involved(app, blob->bridge)) {
655                 stasis_publish(app->topic, message);
656         }
657 }
658
659 static void bridge_attended_transfer_handler(void *data, struct stasis_subscription *sub,
660         struct stasis_message *message)
661 {
662         struct stasis_app *app = data;
663         struct ast_attended_transfer_message *transfer_msg = stasis_message_data(message);
664         int subscribed = 0;
665
666         subscribed = bridge_app_subscribed(app, transfer_msg->to_transferee.channel_snapshot->uniqueid);
667         if (!subscribed) {
668                 subscribed = bridge_app_subscribed(app, transfer_msg->to_transfer_target.channel_snapshot->uniqueid);
669         }
670         if (!subscribed && transfer_msg->to_transferee.bridge_snapshot) {
671                 subscribed = bridge_app_subscribed_involved(app, transfer_msg->to_transferee.bridge_snapshot);
672         }
673         if (!subscribed && transfer_msg->to_transfer_target.bridge_snapshot) {
674                 subscribed = bridge_app_subscribed_involved(app, transfer_msg->to_transfer_target.bridge_snapshot);
675         }
676
677         if (!subscribed) {
678                 switch (transfer_msg->dest_type) {
679                 case AST_ATTENDED_TRANSFER_DEST_BRIDGE_MERGE:
680                         subscribed = bridge_app_subscribed(app, transfer_msg->dest.bridge);
681                         break;
682                 case AST_ATTENDED_TRANSFER_DEST_LINK:
683                         subscribed = bridge_app_subscribed(app, transfer_msg->dest.links[0]->uniqueid);
684                         if (!subscribed) {
685                                 subscribed = bridge_app_subscribed(app, transfer_msg->dest.links[1]->uniqueid);
686                         }
687                         break;
688                 break;
689                 case AST_ATTENDED_TRANSFER_DEST_THREEWAY:
690                         subscribed = bridge_app_subscribed_involved(app, transfer_msg->dest.threeway.bridge_snapshot);
691                         if (!subscribed) {
692                                 subscribed = bridge_app_subscribed(app, transfer_msg->dest.threeway.channel_snapshot->uniqueid);
693                         }
694                         break;
695                 default:
696                         break;
697                 }
698         }
699
700         if (subscribed) {
701                 stasis_publish(app->topic, message);
702         }
703 }
704
705 static void bridge_default_handler(void *data, struct stasis_subscription *sub,
706         struct stasis_message *message)
707 {
708         struct stasis_app *app = data;
709
710         if (stasis_subscription_final_message(sub, message)) {
711                 ao2_cleanup(app);
712         }
713 }
714
715 struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *data)
716 {
717         RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
718         size_t size;
719         int res = 0;
720
721         ast_assert(name != NULL);
722         ast_assert(handler != NULL);
723
724         ast_verb(1, "Creating Stasis app '%s'\n", name);
725
726         size = sizeof(*app) + strlen(name) + 1;
727         app = ao2_alloc_options(size, app_dtor, AO2_ALLOC_OPT_LOCK_MUTEX);
728
729         if (!app) {
730                 return NULL;
731         }
732
733         app->forwards = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_MUTEX,
734                 AO2_CONTAINER_ALLOC_OPT_DUPS_OBJ_REJECT,
735                 forwards_sort, NULL);
736         if (!app->forwards) {
737                 return NULL;
738         }
739
740         app->topic = stasis_topic_create(name);
741         if (!app->topic) {
742                 return NULL;
743         }
744
745         app->bridge_router = stasis_message_router_create(ast_bridge_topic_all());
746         if (!app->bridge_router) {
747                 return NULL;
748         }
749
750         res |= stasis_message_router_add(app->bridge_router,
751                 ast_bridge_merge_message_type(), bridge_merge_handler, app);
752
753         res |= stasis_message_router_add(app->bridge_router,
754                 ast_blind_transfer_type(), bridge_blind_transfer_handler, app);
755
756         res |= stasis_message_router_add(app->bridge_router,
757                 ast_attended_transfer_type(), bridge_attended_transfer_handler, app);
758
759         res |= stasis_message_router_set_default(app->bridge_router,
760                 bridge_default_handler, app);
761
762         if (res != 0) {
763                 return NULL;
764         }
765         /* Bridge router holds a reference */
766         ao2_ref(app, +1);
767
768         app->router = stasis_message_router_create(app->topic);
769         if (!app->router) {
770                 return NULL;
771         }
772
773         res |= stasis_message_router_add_cache_update(app->router,
774                 ast_bridge_snapshot_type(), sub_bridge_update_handler, app);
775
776         res |= stasis_message_router_add_cache_update(app->router,
777                 ast_channel_snapshot_type(), sub_channel_update_handler, app);
778
779         res |= stasis_message_router_add_cache_update(app->router,
780                 ast_endpoint_snapshot_type(), sub_endpoint_update_handler, app);
781
782         res |= stasis_message_router_set_default(app->router,
783                 sub_default_handler, app);
784
785         if (res != 0) {
786                 return NULL;
787         }
788         /* Router holds a reference */
789         ao2_ref(app, +1);
790
791         strncpy(app->name, name, size - sizeof(*app));
792         app->handler = handler;
793         ao2_ref(data, +1);
794         app->data = data;
795
796         ao2_ref(app, +1);
797         return app;
798 }
799
800 /*!
801  * \brief Send a message to the given application.
802  * \param app App to send the message to.
803  * \param message Message to send.
804  */
805 void app_send(struct stasis_app *app, struct ast_json *message)
806 {
807         stasis_app_cb handler;
808         RAII_VAR(void *, data, NULL, ao2_cleanup);
809
810         /* Copy off mutable state with lock held */
811         {
812                 SCOPED_AO2LOCK(lock, app);
813                 handler = app->handler;
814                 if (app->data) {
815                         ao2_ref(app->data, +1);
816                         data = app->data;
817                 }
818                 /* Name is immutable; no need to copy */
819         }
820
821         if (!handler) {
822                 ast_verb(3,
823                         "Inactive Stasis app '%s' missed message\n", app->name);
824                 return;
825         }
826
827         handler(data, app->name, message);
828 }
829
830 void app_deactivate(struct stasis_app *app)
831 {
832         SCOPED_AO2LOCK(lock, app);
833         ast_verb(1, "Deactivating Stasis app '%s'\n", app->name);
834         app->handler = NULL;
835         ao2_cleanup(app->data);
836         app->data = NULL;
837 }
838
839 void app_shutdown(struct stasis_app *app)
840 {
841         SCOPED_AO2LOCK(lock, app);
842
843         ast_assert(app_is_finished(app));
844
845         stasis_message_router_unsubscribe(app->router);
846         app->router = NULL;
847         stasis_message_router_unsubscribe(app->bridge_router);
848         app->bridge_router = NULL;
849 }
850
851 int app_is_active(struct stasis_app *app)
852 {
853         SCOPED_AO2LOCK(lock, app);
854         return app->handler != NULL;
855 }
856
857 int app_is_finished(struct stasis_app *app)
858 {
859         SCOPED_AO2LOCK(lock, app);
860
861         return app->handler == NULL && ao2_container_count(app->forwards) == 0;
862 }
863
864 void app_update(struct stasis_app *app, stasis_app_cb handler, void *data)
865 {
866         SCOPED_AO2LOCK(lock, app);
867
868         if (app->handler) {
869                 RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
870
871                 ast_verb(1, "Replacing Stasis app '%s'\n", app->name);
872
873                 msg = ast_json_pack("{s: s, s: s}",
874                         "type", "ApplicationReplaced",
875                         "application", app->name);
876                 if (msg) {
877                         app_send(app, msg);
878                 }
879         } else {
880                 ast_verb(1, "Activating Stasis app '%s'\n", app->name);
881         }
882
883         app->handler = handler;
884         ao2_cleanup(app->data);
885         if (data) {
886                 ao2_ref(data, +1);
887         }
888         app->data = data;
889 }
890
891 const char *app_name(const struct stasis_app *app)
892 {
893         return app->name;
894 }
895
896 struct ast_json *app_to_json(const struct stasis_app *app)
897 {
898         RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
899         struct ast_json *channels;
900         struct ast_json *bridges;
901         struct ast_json *endpoints;
902         struct ao2_iterator i;
903         void *obj;
904
905         json = ast_json_pack("{s: s, s: [], s: [], s: []}",
906                 "name", app->name,
907                 "channel_ids", "bridge_ids", "endpoint_ids");
908         channels = ast_json_object_get(json, "channel_ids");
909         bridges = ast_json_object_get(json, "bridge_ids");
910         endpoints = ast_json_object_get(json, "endpoint_ids");
911
912         i = ao2_iterator_init(app->forwards, 0);
913         while ((obj = ao2_iterator_next(&i))) {
914                 RAII_VAR(struct app_forwards *, forwards, obj, ao2_cleanup);
915                 RAII_VAR(struct ast_json *, id, NULL, ast_json_unref);
916                 int append_res = -1;
917
918                 id = ast_json_string_create(forwards->id);
919
920                 switch (forwards->forward_type) {
921                 case FORWARD_CHANNEL:
922                         append_res = ast_json_array_append(channels,
923                                 ast_json_ref(id));
924                         break;
925                 case FORWARD_BRIDGE:
926                         append_res = ast_json_array_append(bridges,
927                                 ast_json_ref(id));
928                         break;
929                 case FORWARD_ENDPOINT:
930                         append_res = ast_json_array_append(endpoints,
931                                 ast_json_ref(id));
932                         break;
933                 }
934
935                 if (append_res != 0) {
936                         ast_log(LOG_ERROR, "Error building response\n");
937                         ao2_iterator_destroy(&i);
938                         return NULL;
939                 }
940         }
941         ao2_iterator_destroy(&i);
942
943         return ast_json_ref(json);
944 }
945
946 int app_subscribe_channel(struct stasis_app *app, struct ast_channel *chan)
947 {
948         int res;
949
950         if (!app || !chan) {
951                 return -1;
952         } else {
953                 RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
954                 SCOPED_AO2LOCK(lock, app->forwards);
955
956                 forwards = ao2_find(app->forwards, ast_channel_uniqueid(chan),
957                         OBJ_SEARCH_KEY | OBJ_NOLOCK);
958                 if (!forwards) {
959                         /* Forwards not found, create one */
960                         forwards = forwards_create_channel(app, chan);
961                         if (!forwards) {
962                                 return -1;
963                         }
964
965                         res = ao2_link_flags(app->forwards, forwards,
966                                 OBJ_NOLOCK);
967                         if (!res) {
968                                 return -1;
969                         }
970                 }
971
972                 ++forwards->interested;
973                 return 0;
974         }
975 }
976
977 static int subscribe_channel(struct stasis_app *app, void *obj)
978 {
979         return app_subscribe_channel(app, obj);
980 }
981
982 static int unsubscribe(struct stasis_app *app, const char *kind, const char *id)
983 {
984         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
985         SCOPED_AO2LOCK(lock, app->forwards);
986
987         forwards = ao2_find(app->forwards, id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
988         if (!forwards) {
989                 ast_log(LOG_WARNING,
990                         "App '%s' not subscribed to %s '%s'\n",
991                         app->name, kind, id);
992                 return -1;
993         }
994
995         if (--forwards->interested == 0) {
996                 /* No one is interested any more; unsubscribe */
997                 forwards_unsubscribe(forwards);
998                 ao2_find(app->forwards, forwards,
999                         OBJ_POINTER | OBJ_NOLOCK | OBJ_UNLINK |
1000                         OBJ_NODATA);
1001         }
1002
1003         return 0;
1004 }
1005
1006 int app_unsubscribe_channel(struct stasis_app *app, struct ast_channel *chan)
1007 {
1008         if (!app || !chan) {
1009                 return -1;
1010         }
1011
1012         return app_unsubscribe_channel_id(app, ast_channel_uniqueid(chan));
1013 }
1014
1015 int app_unsubscribe_channel_id(struct stasis_app *app, const char *channel_id)
1016 {
1017         if (!app || !channel_id) {
1018                 return -1;
1019         }
1020
1021         return unsubscribe(app, "channel", channel_id);
1022 }
1023
1024 int app_is_subscribed_channel_id(struct stasis_app *app, const char *channel_id)
1025 {
1026         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
1027         forwards = ao2_find(app->forwards, channel_id, OBJ_SEARCH_KEY);
1028         return forwards != NULL;
1029 }
1030
1031 static void *channel_find(const struct stasis_app *app, const char *id)
1032 {
1033         return ast_channel_get_by_name(id);
1034 }
1035
1036 struct stasis_app_event_source channel_event_source = {
1037         .scheme = "channel:",
1038         .find = channel_find,
1039         .subscribe = subscribe_channel,
1040         .unsubscribe = app_unsubscribe_channel_id,
1041         .is_subscribed = app_is_subscribed_channel_id
1042 };
1043
1044 int app_subscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
1045 {
1046         if (!app || !bridge) {
1047                 return -1;
1048         } else {
1049                 RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
1050                 SCOPED_AO2LOCK(lock, app->forwards);
1051
1052                 forwards = ao2_find(app->forwards, bridge->uniqueid,
1053                         OBJ_SEARCH_KEY | OBJ_NOLOCK);
1054
1055                 if (!forwards) {
1056                         /* Forwards not found, create one */
1057                         forwards = forwards_create_bridge(app, bridge);
1058                         if (!forwards) {
1059                                 return -1;
1060                         }
1061                         ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
1062                 }
1063
1064                 ++forwards->interested;
1065                 return 0;
1066         }
1067 }
1068
1069 static int subscribe_bridge(struct stasis_app *app, void *obj)
1070 {
1071         return app_subscribe_bridge(app, obj);
1072 }
1073
1074 int app_unsubscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
1075 {
1076         if (!app || !bridge) {
1077                 return -1;
1078         }
1079
1080         return app_unsubscribe_bridge_id(app, bridge->uniqueid);
1081 }
1082
1083 int app_unsubscribe_bridge_id(struct stasis_app *app, const char *bridge_id)
1084 {
1085         if (!app || !bridge_id) {
1086                 return -1;
1087         }
1088
1089         return unsubscribe(app, "bridge", bridge_id);
1090 }
1091
1092 int app_is_subscribed_bridge_id(struct stasis_app *app, const char *bridge_id)
1093 {
1094         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
1095         forwards = ao2_find(app->forwards, bridge_id, OBJ_SEARCH_KEY);
1096         return forwards != NULL;
1097 }
1098
1099 static void *bridge_find(const struct stasis_app *app, const char *id)
1100 {
1101         return stasis_app_bridge_find_by_id(id);
1102 }
1103
1104 struct stasis_app_event_source bridge_event_source = {
1105         .scheme = "bridge:",
1106         .find = bridge_find,
1107         .subscribe = subscribe_bridge,
1108         .unsubscribe = app_unsubscribe_bridge_id,
1109         .is_subscribed = app_is_subscribed_bridge_id
1110 };
1111
1112 int app_subscribe_endpoint(struct stasis_app *app, struct ast_endpoint *endpoint)
1113 {
1114         if (!app || !endpoint) {
1115                 return -1;
1116         } else {
1117                 RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
1118                 SCOPED_AO2LOCK(lock, app->forwards);
1119
1120                 forwards = ao2_find(app->forwards, ast_endpoint_get_id(endpoint),
1121                         OBJ_SEARCH_KEY | OBJ_NOLOCK);
1122
1123                 if (!forwards) {
1124                         /* Forwards not found, create one */
1125                         forwards = forwards_create_endpoint(app, endpoint);
1126                         if (!forwards) {
1127                                 return -1;
1128                         }
1129                         ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
1130                 }
1131
1132                 ++forwards->interested;
1133                 return 0;
1134         }
1135 }
1136
1137 static int subscribe_endpoint(struct stasis_app *app, void *obj)
1138 {
1139         return app_subscribe_endpoint(app, obj);
1140 }
1141
1142 int app_unsubscribe_endpoint_id(struct stasis_app *app, const char *endpoint_id)
1143 {
1144         if (!app || !endpoint_id) {
1145                 return -1;
1146         }
1147
1148         return unsubscribe(app, "endpoint", endpoint_id);
1149 }
1150
1151 int app_is_subscribed_endpoint_id(struct stasis_app *app, const char *endpoint_id)
1152 {
1153         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
1154         forwards = ao2_find(app->forwards, endpoint_id, OBJ_SEARCH_KEY);
1155         return forwards != NULL;
1156 }
1157
1158 static void *endpoint_find(const struct stasis_app *app, const char *id)
1159 {
1160         return ast_endpoint_find_by_id(id);
1161 }
1162
1163 struct stasis_app_event_source endpoint_event_source = {
1164         .scheme = "endpoint:",
1165         .find = endpoint_find,
1166         .subscribe = subscribe_endpoint,
1167         .unsubscribe = app_unsubscribe_endpoint_id,
1168         .is_subscribed = app_is_subscribed_endpoint_id
1169 };
1170
1171 void stasis_app_register_event_sources(void)
1172 {
1173         stasis_app_register_event_source(&channel_event_source);
1174         stasis_app_register_event_source(&bridge_event_source);
1175         stasis_app_register_event_source(&endpoint_event_source);
1176 }
1177
1178 int stasis_app_is_core_event_source(struct stasis_app_event_source *obj)
1179 {
1180         return obj == &endpoint_event_source ||
1181                 obj == &bridge_event_source ||
1182                 obj == &channel_event_source;
1183 }
1184
1185 void stasis_app_unregister_event_sources(void)
1186 {
1187         stasis_app_unregister_event_source(&endpoint_event_source);
1188         stasis_app_unregister_event_source(&bridge_event_source);
1189         stasis_app_unregister_event_source(&channel_event_source);
1190 }
1191
1192