8e9872aec35ae9e03dcf931c09caa50931234f69
[asterisk/asterisk.git] / res / stasis / app.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis application support.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 #include "asterisk.h"
27
28 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
29
30 #include "app.h"
31
32 #include "asterisk/callerid.h"
33 #include "asterisk/stasis_app.h"
34 #include "asterisk/stasis_bridges.h"
35 #include "asterisk/stasis_channels.h"
36 #include "asterisk/stasis_endpoints.h"
37 #include "asterisk/stasis_message_router.h"
38
39 struct stasis_app {
40         /*! Aggregation topic for this application. */
41         struct stasis_topic *topic;
42         /*! Router for handling messages forwarded to \a topic. */
43         struct stasis_message_router *router;
44         /*! Subscription to watch for bridge merge messages */
45         struct stasis_subscription *bridge_merge_sub;
46         /*! Container of the channel forwards to this app's topic. */
47         struct ao2_container *forwards;
48         /*! Callback function for this application. */
49         stasis_app_cb handler;
50         /*! Opaque data to hand to callback function. */
51         void *data;
52         /*! Name of the Stasis application */
53         char name[];
54 };
55
56 enum forward_type {
57         FORWARD_CHANNEL,
58         FORWARD_BRIDGE,
59         FORWARD_ENDPOINT,
60 };
61
62 /*! Subscription info for a particular channel/bridge. */
63 struct app_forwards {
64         /*! Count of number of times this channel/bridge has been subscribed */
65         int interested;
66
67         /*! Forward for the regular topic */
68         struct stasis_forward *topic_forward;
69         /*! Forward for the caching topic */
70         struct stasis_forward *topic_cached_forward;
71
72         /* Type of object being forwarded */
73         enum forward_type forward_type;
74         /*! Unique id of the object being forwarded */
75         char id[];
76 };
77
78 static void forwards_dtor(void *obj)
79 {
80 #ifdef AST_DEVMODE
81         struct app_forwards *forwards = obj;
82 #endif /* AST_DEVMODE */
83
84         ast_assert(forwards->topic_forward == NULL);
85         ast_assert(forwards->topic_cached_forward == NULL);
86 }
87
88 static void forwards_unsubscribe(struct app_forwards *forwards)
89 {
90         stasis_forward_cancel(forwards->topic_forward);
91         forwards->topic_forward = NULL;
92         stasis_forward_cancel(forwards->topic_cached_forward);
93         forwards->topic_cached_forward = NULL;
94 }
95
96 static struct app_forwards *forwards_create(struct stasis_app *app,
97         const char *id)
98 {
99         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
100
101         if (!app || ast_strlen_zero(id)) {
102                 return NULL;
103         }
104
105         forwards = ao2_alloc(sizeof(*forwards) + strlen(id) + 1, forwards_dtor);
106         if (!forwards) {
107                 return NULL;
108         }
109
110         strcpy(forwards->id, id);
111
112         ao2_ref(forwards, +1);
113         return forwards;
114 }
115
116 /*! Forward a channel's topics to an app */
117 static struct app_forwards *forwards_create_channel(struct stasis_app *app,
118         struct ast_channel *chan)
119 {
120         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
121
122         if (!app || !chan) {
123                 return NULL;
124         }
125
126         forwards = forwards_create(app, ast_channel_uniqueid(chan));
127         if (!forwards) {
128                 return NULL;
129         }
130
131         forwards->forward_type = FORWARD_CHANNEL;
132         forwards->topic_forward = stasis_forward_all(ast_channel_topic(chan),
133                 app->topic);
134         if (!forwards->topic_forward) {
135                 return NULL;
136         }
137
138         forwards->topic_cached_forward = stasis_forward_all(
139                 ast_channel_topic_cached(chan), app->topic);
140         if (!forwards->topic_cached_forward) {
141                 /* Half-subscribed is a bad thing */
142                 stasis_forward_cancel(forwards->topic_forward);
143                 forwards->topic_forward = NULL;
144                 return NULL;
145         }
146
147         ao2_ref(forwards, +1);
148         return forwards;
149 }
150
151 /*! Forward a bridge's topics to an app */
152 static struct app_forwards *forwards_create_bridge(struct stasis_app *app,
153         struct ast_bridge *bridge)
154 {
155         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
156
157         if (!app || !bridge) {
158                 return NULL;
159         }
160
161         forwards = forwards_create(app, bridge->uniqueid);
162         if (!forwards) {
163                 return NULL;
164         }
165
166         forwards->forward_type = FORWARD_BRIDGE;
167         forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge),
168                 app->topic);
169         if (!forwards->topic_forward) {
170                 return NULL;
171         }
172
173         forwards->topic_cached_forward = stasis_forward_all(
174                 ast_bridge_topic_cached(bridge), app->topic);
175         if (!forwards->topic_cached_forward) {
176                 /* Half-subscribed is a bad thing */
177                 stasis_forward_cancel(forwards->topic_forward);
178                 forwards->topic_forward = NULL;
179                 return NULL;
180         }
181
182         ao2_ref(forwards, +1);
183         return forwards;
184 }
185
186 /*! Forward a endpoint's topics to an app */
187 static struct app_forwards *forwards_create_endpoint(struct stasis_app *app,
188         struct ast_endpoint *endpoint)
189 {
190         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
191
192         if (!app || !endpoint) {
193                 return NULL;
194         }
195
196         forwards = forwards_create(app, ast_endpoint_get_id(endpoint));
197         if (!forwards) {
198                 return NULL;
199         }
200
201         forwards->forward_type = FORWARD_ENDPOINT;
202         forwards->topic_forward = stasis_forward_all(ast_endpoint_topic(endpoint),
203                 app->topic);
204         if (!forwards->topic_forward) {
205                 return NULL;
206         }
207
208         forwards->topic_cached_forward = stasis_forward_all(
209                 ast_endpoint_topic_cached(endpoint), app->topic);
210         if (!forwards->topic_cached_forward) {
211                 /* Half-subscribed is a bad thing */
212                 stasis_forward_cancel(forwards->topic_forward);
213                 forwards->topic_forward = NULL;
214                 return NULL;
215         }
216
217         ao2_ref(forwards, +1);
218         return forwards;
219 }
220
221 static int forwards_sort(const void *obj_left, const void *obj_right, int flags)
222 {
223         const struct app_forwards *object_left = obj_left;
224         const struct app_forwards *object_right = obj_right;
225         const char *right_key = obj_right;
226         int cmp;
227
228         switch (flags & (OBJ_POINTER | OBJ_KEY | OBJ_PARTIAL_KEY)) {
229         case OBJ_POINTER:
230                 right_key = object_right->id;
231                 /* Fall through */
232         case OBJ_KEY:
233                 cmp = strcmp(object_left->id, right_key);
234                 break;
235         case OBJ_PARTIAL_KEY:
236                 /*
237                  * We could also use a partial key struct containing a length
238                  * so strlen() does not get called for every comparison instead.
239                  */
240                 cmp = strncmp(object_left->id, right_key, strlen(right_key));
241                 break;
242         default:
243                 /* Sort can only work on something with a full or partial key. */
244                 ast_assert(0);
245                 cmp = 0;
246                 break;
247         }
248         return cmp;
249 }
250
251 static void app_dtor(void *obj)
252 {
253         struct stasis_app *app = obj;
254
255         ast_verb(1, "Destroying Stasis app %s\n", app->name);
256
257         ast_assert(app->router == NULL);
258         ast_assert(app->bridge_merge_sub == NULL);
259
260         ao2_cleanup(app->topic);
261         app->topic = NULL;
262         ao2_cleanup(app->forwards);
263         app->forwards = NULL;
264         ao2_cleanup(app->data);
265         app->data = NULL;
266 }
267
268 static void call_forwarded_handler(struct stasis_app *app, struct stasis_message *message)
269 {
270         struct ast_multi_channel_blob *payload = stasis_message_data(message);
271         struct ast_channel_snapshot *snapshot = ast_multi_channel_blob_get_channel(payload, "forwarded");
272         struct ast_channel *chan;
273
274         if (!snapshot) {
275                 return;
276         }
277
278         chan = ast_channel_get_by_name(snapshot->uniqueid);
279         if (!chan) {
280                 return;
281         }
282
283         app_subscribe_channel(app, chan);
284         ast_channel_unref(chan);
285 }
286
287 static void sub_default_handler(void *data, struct stasis_subscription *sub,
288         struct stasis_message *message)
289 {
290         struct stasis_app *app = data;
291         RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
292
293         if (stasis_subscription_final_message(sub, message)) {
294                 ao2_cleanup(app);
295         }
296
297         if (stasis_message_type(message) == ast_channel_dial_type()) {
298                 call_forwarded_handler(app, message);
299         }
300
301         /* By default, send any message that has a JSON representation */
302         json = stasis_message_to_json(message, stasis_app_get_sanitizer());
303         if (!json) {
304                 return;
305         }
306
307         app_send(app, json);
308 }
309
310 /*! \brief Typedef for callbacks that get called on channel snapshot updates */
311 typedef struct ast_json *(*channel_snapshot_monitor)(
312         struct ast_channel_snapshot *old_snapshot,
313         struct ast_channel_snapshot *new_snapshot,
314         const struct timeval *tv);
315
316 static struct ast_json *simple_channel_event(
317         const char *type,
318         struct ast_channel_snapshot *snapshot,
319         const struct timeval *tv)
320 {
321         struct ast_json *json_channel = ast_channel_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
322
323         if (!json_channel) {
324                 return NULL;
325         }
326
327         return ast_json_pack("{s: s, s: o, s: o}",
328                 "type", type,
329                 "timestamp", ast_json_timeval(*tv, NULL),
330                 "channel", json_channel);
331 }
332
333 static struct ast_json *channel_created_event(
334         struct ast_channel_snapshot *snapshot,
335         const struct timeval *tv)
336 {
337         return simple_channel_event("ChannelCreated", snapshot, tv);
338 }
339
340 static struct ast_json *channel_destroyed_event(
341         struct ast_channel_snapshot *snapshot,
342         const struct timeval *tv)
343 {
344         struct ast_json *json_channel = ast_channel_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
345
346         if (!json_channel) {
347                 return NULL;
348         }
349
350         return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
351                 "type", "ChannelDestroyed",
352                 "timestamp", ast_json_timeval(*tv, NULL),
353                 "cause", snapshot->hangupcause,
354                 "cause_txt", ast_cause2str(snapshot->hangupcause),
355                 "channel", json_channel);
356 }
357
358 static struct ast_json *channel_state_change_event(
359         struct ast_channel_snapshot *snapshot,
360         const struct timeval *tv)
361 {
362         return simple_channel_event("ChannelStateChange", snapshot, tv);
363 }
364
365 /*! \brief Handle channel state changes */
366 static struct ast_json *channel_state(
367         struct ast_channel_snapshot *old_snapshot,
368         struct ast_channel_snapshot *new_snapshot,
369         const struct timeval *tv)
370 {
371         struct ast_channel_snapshot *snapshot = new_snapshot ?
372                 new_snapshot : old_snapshot;
373
374         if (!old_snapshot) {
375                 return channel_created_event(snapshot, tv);
376         } else if (!new_snapshot) {
377                 return channel_destroyed_event(snapshot, tv);
378         } else if (old_snapshot->state != new_snapshot->state) {
379                 return channel_state_change_event(snapshot, tv);
380         }
381
382         return NULL;
383 }
384
385 static struct ast_json *channel_dialplan(
386         struct ast_channel_snapshot *old_snapshot,
387         struct ast_channel_snapshot *new_snapshot,
388         const struct timeval *tv)
389 {
390         RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
391         struct ast_json *json_channel;
392
393         /* No Newexten event on cache clear or first event */
394         if (!old_snapshot || !new_snapshot) {
395                 return NULL;
396         }
397
398         /* Empty application is not valid for a Newexten event */
399         if (ast_strlen_zero(new_snapshot->appl)) {
400                 return NULL;
401         }
402
403         if (ast_channel_snapshot_cep_equal(old_snapshot, new_snapshot)) {
404                 return NULL;
405         }
406
407         json_channel = ast_channel_snapshot_to_json(new_snapshot, stasis_app_get_sanitizer());
408         if (!json_channel) {
409                 return NULL;
410         }
411
412         return ast_json_pack("{s: s, s: o, s: s, s: s, s: o}",
413                 "type", "ChannelDialplan",
414                 "timestamp", ast_json_timeval(*tv, NULL),
415                 "dialplan_app", new_snapshot->appl,
416                 "dialplan_app_data", new_snapshot->data,
417                 "channel", json_channel);
418 }
419
420 static struct ast_json *channel_callerid(
421         struct ast_channel_snapshot *old_snapshot,
422         struct ast_channel_snapshot *new_snapshot,
423         const struct timeval *tv)
424 {
425         RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
426         struct ast_json *json_channel;
427
428         /* No NewCallerid event on cache clear or first event */
429         if (!old_snapshot || !new_snapshot) {
430                 return NULL;
431         }
432
433         if (ast_channel_snapshot_caller_id_equal(old_snapshot, new_snapshot)) {
434                 return NULL;
435         }
436
437         json_channel = ast_channel_snapshot_to_json(new_snapshot, stasis_app_get_sanitizer());
438         if (!json_channel) {
439                 return NULL;
440         }
441
442         return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
443                 "type", "ChannelCallerId",
444                 "timestamp", ast_json_timeval(*tv, NULL),
445                 "caller_presentation", new_snapshot->caller_pres,
446                 "caller_presentation_txt", ast_describe_caller_presentation(
447                         new_snapshot->caller_pres),
448                 "channel", json_channel);
449 }
450
451 static channel_snapshot_monitor channel_monitors[] = {
452         channel_state,
453         channel_dialplan,
454         channel_callerid
455 };
456
457 static void sub_channel_update_handler(void *data,
458         struct stasis_subscription *sub,
459         struct stasis_message *message)
460 {
461         struct stasis_app *app = data;
462         struct stasis_cache_update *update;
463         struct ast_channel_snapshot *new_snapshot;
464         struct ast_channel_snapshot *old_snapshot;
465         const struct timeval *tv;
466         int i;
467
468         ast_assert(stasis_message_type(message) == stasis_cache_update_type());
469
470         update = stasis_message_data(message);
471
472         ast_assert(update->type == ast_channel_snapshot_type());
473
474         new_snapshot = stasis_message_data(update->new_snapshot);
475         old_snapshot = stasis_message_data(update->old_snapshot);
476
477         /* Pull timestamp from the new snapshot, or from the update message
478          * when there isn't one. */
479         tv = update->new_snapshot ?
480                 stasis_message_timestamp(update->new_snapshot) :
481                 stasis_message_timestamp(message);
482
483         for (i = 0; i < ARRAY_LEN(channel_monitors); ++i) {
484                 RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
485
486                 msg = channel_monitors[i](old_snapshot, new_snapshot, tv);
487                 if (msg) {
488                         app_send(app, msg);
489                 }
490         }
491 }
492
493 static struct ast_json *simple_endpoint_event(
494         const char *type,
495         struct ast_endpoint_snapshot *snapshot,
496         const struct timeval *tv)
497 {
498         struct ast_json *json_endpoint = ast_endpoint_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
499
500         if (!json_endpoint) {
501                 return NULL;
502         }
503
504         return ast_json_pack("{s: s, s: o, s: o}",
505                 "type", type,
506                 "timestamp", ast_json_timeval(*tv, NULL),
507                 "endpoint", json_endpoint);
508 }
509
510 static void sub_endpoint_update_handler(void *data,
511         struct stasis_subscription *sub,
512         struct stasis_message *message)
513 {
514         RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
515         struct stasis_app *app = data;
516         struct stasis_cache_update *update;
517         struct ast_endpoint_snapshot *new_snapshot;
518         const struct timeval *tv;
519
520         ast_assert(stasis_message_type(message) == stasis_cache_update_type());
521
522         update = stasis_message_data(message);
523
524         ast_assert(update->type == ast_endpoint_snapshot_type());
525
526         new_snapshot = stasis_message_data(update->new_snapshot);
527         tv = update->new_snapshot ?
528                 stasis_message_timestamp(update->new_snapshot) :
529                 stasis_message_timestamp(message);
530
531         json = simple_endpoint_event("EndpointStateChange", new_snapshot, tv);
532
533         if (!json) {
534                 return;
535         }
536
537         app_send(app, json);
538 }
539
540 static struct ast_json *simple_bridge_event(
541         const char *type,
542         struct ast_bridge_snapshot *snapshot,
543         const struct timeval *tv)
544 {
545         struct ast_json *json_bridge = ast_bridge_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
546         if (!json_bridge) {
547                 return NULL;
548         }
549
550         return ast_json_pack("{s: s, s: o, s: o}",
551                 "type", type,
552                 "timestamp", ast_json_timeval(*tv, NULL),
553                 "bridge", json_bridge);
554 }
555
556 static void sub_bridge_update_handler(void *data,
557         struct stasis_subscription *sub,
558         struct stasis_message *message)
559 {
560         RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
561         struct stasis_app *app = data;
562         struct stasis_cache_update *update;
563         struct ast_bridge_snapshot *new_snapshot;
564         struct ast_bridge_snapshot *old_snapshot;
565         const struct timeval *tv;
566
567         ast_assert(stasis_message_type(message) == stasis_cache_update_type());
568
569         update = stasis_message_data(message);
570
571         ast_assert(update->type == ast_bridge_snapshot_type());
572
573         new_snapshot = stasis_message_data(update->new_snapshot);
574         old_snapshot = stasis_message_data(update->old_snapshot);
575         tv = update->new_snapshot ?
576                 stasis_message_timestamp(update->new_snapshot) :
577                 stasis_message_timestamp(message);
578
579         if (!new_snapshot) {
580                 json = simple_bridge_event("BridgeDestroyed", old_snapshot, tv);
581         } else if (!old_snapshot) {
582                 json = simple_bridge_event("BridgeCreated", new_snapshot, tv);
583         }
584
585         if (!json) {
586                 return;
587         }
588
589         app_send(app, json);
590 }
591
592 static void bridge_merge_handler(void *data, struct stasis_subscription *sub,
593         struct stasis_message *message)
594 {
595         struct stasis_app *app = data;
596         struct ast_bridge_merge_message *merge;
597         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
598
599         if (stasis_subscription_final_message(sub, message)) {
600                 ao2_cleanup(app);
601         }
602
603         if (stasis_message_type(message) != ast_bridge_merge_message_type()) {
604                 return;
605         }
606
607         merge = stasis_message_data(message);
608
609         /* Find out if we're subscribed to either bridge */
610         forwards = ao2_find(app->forwards, merge->from->uniqueid,
611                 OBJ_SEARCH_KEY);
612         if (!forwards) {
613                 forwards = ao2_find(app->forwards, merge->to->uniqueid,
614                         OBJ_SEARCH_KEY);
615         }
616
617         if (!forwards) {
618                 return;
619         }
620
621         /* Forward the message to the app */
622         stasis_publish(app->topic, message);
623 }
624
625 struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *data)
626 {
627         RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
628         size_t size;
629         int res = 0;
630
631         ast_assert(name != NULL);
632         ast_assert(handler != NULL);
633
634         ast_verb(1, "Creating Stasis app '%s'\n", name);
635
636         size = sizeof(*app) + strlen(name) + 1;
637         app = ao2_alloc_options(size, app_dtor, AO2_ALLOC_OPT_LOCK_MUTEX);
638
639         if (!app) {
640                 return NULL;
641         }
642
643         app->forwards = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_MUTEX,
644                 AO2_CONTAINER_ALLOC_OPT_DUPS_OBJ_REJECT,
645                 forwards_sort, NULL);
646         if (!app->forwards) {
647                 return NULL;
648         }
649
650         app->topic = stasis_topic_create(name);
651         if (!app->topic) {
652                 return NULL;
653         }
654
655         app->bridge_merge_sub = stasis_subscribe(ast_bridge_topic_all(),
656                 bridge_merge_handler, app);
657         if (!app->bridge_merge_sub) {
658                 return NULL;
659         }
660         /* Subscription holds a reference */
661         ao2_ref(app, +1);
662
663         app->router = stasis_message_router_create(app->topic);
664         if (!app->router) {
665                 return NULL;
666         }
667
668         res |= stasis_message_router_add_cache_update(app->router,
669                 ast_bridge_snapshot_type(), sub_bridge_update_handler, app);
670
671         res |= stasis_message_router_add_cache_update(app->router,
672                 ast_channel_snapshot_type(), sub_channel_update_handler, app);
673
674         res |= stasis_message_router_add_cache_update(app->router,
675                 ast_endpoint_snapshot_type(), sub_endpoint_update_handler, app);
676
677         res |= stasis_message_router_set_default(app->router,
678                 sub_default_handler, app);
679
680         if (res != 0) {
681                 return NULL;
682         }
683         /* Router holds a reference */
684         ao2_ref(app, +1);
685
686         strncpy(app->name, name, size - sizeof(*app));
687         app->handler = handler;
688         ao2_ref(data, +1);
689         app->data = data;
690
691         ao2_ref(app, +1);
692         return app;
693 }
694
695 /*!
696  * \brief Send a message to the given application.
697  * \param app App to send the message to.
698  * \param message Message to send.
699  */
700 void app_send(struct stasis_app *app, struct ast_json *message)
701 {
702         stasis_app_cb handler;
703         RAII_VAR(void *, data, NULL, ao2_cleanup);
704
705         /* Copy off mutable state with lock held */
706         {
707                 SCOPED_AO2LOCK(lock, app);
708                 handler = app->handler;
709                 if (app->data) {
710                         ao2_ref(app->data, +1);
711                         data = app->data;
712                 }
713                 /* Name is immutable; no need to copy */
714         }
715
716         if (!handler) {
717                 ast_verb(3,
718                         "Inactive Stasis app '%s' missed message\n", app->name);
719                 return;
720         }
721
722         handler(data, app->name, message);
723 }
724
725 void app_deactivate(struct stasis_app *app)
726 {
727         SCOPED_AO2LOCK(lock, app);
728         ast_verb(1, "Deactivating Stasis app '%s'\n", app->name);
729         app->handler = NULL;
730         ao2_cleanup(app->data);
731         app->data = NULL;
732 }
733
734 void app_shutdown(struct stasis_app *app)
735 {
736         SCOPED_AO2LOCK(lock, app);
737
738         ast_assert(app_is_finished(app));
739
740         stasis_message_router_unsubscribe(app->router);
741         app->router = NULL;
742         stasis_unsubscribe(app->bridge_merge_sub);
743         app->bridge_merge_sub = NULL;
744 }
745
746 int app_is_active(struct stasis_app *app)
747 {
748         SCOPED_AO2LOCK(lock, app);
749         return app->handler != NULL;
750 }
751
752 int app_is_finished(struct stasis_app *app)
753 {
754         SCOPED_AO2LOCK(lock, app);
755
756         return app->handler == NULL && ao2_container_count(app->forwards) == 0;
757 }
758
759 void app_update(struct stasis_app *app, stasis_app_cb handler, void *data)
760 {
761         SCOPED_AO2LOCK(lock, app);
762
763         if (app->handler) {
764                 RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
765
766                 ast_verb(1, "Replacing Stasis app '%s'\n", app->name);
767
768                 msg = ast_json_pack("{s: s, s: s}",
769                         "type", "ApplicationReplaced",
770                         "application", app->name);
771                 if (msg) {
772                         app_send(app, msg);
773                 }
774         } else {
775                 ast_verb(1, "Activating Stasis app '%s'\n", app->name);
776         }
777
778         app->handler = handler;
779         ao2_cleanup(app->data);
780         if (data) {
781                 ao2_ref(data, +1);
782         }
783         app->data = data;
784 }
785
786 const char *app_name(const struct stasis_app *app)
787 {
788         return app->name;
789 }
790
791 struct ast_json *app_to_json(const struct stasis_app *app)
792 {
793         RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
794         struct ast_json *channels;
795         struct ast_json *bridges;
796         struct ast_json *endpoints;
797         struct ao2_iterator i;
798         void *obj;
799
800         json = ast_json_pack("{s: s, s: [], s: [], s: []}",
801                 "name", app->name,
802                 "channel_ids", "bridge_ids", "endpoint_ids");
803         channels = ast_json_object_get(json, "channel_ids");
804         bridges = ast_json_object_get(json, "bridge_ids");
805         endpoints = ast_json_object_get(json, "endpoint_ids");
806
807         i = ao2_iterator_init(app->forwards, 0);
808         while ((obj = ao2_iterator_next(&i))) {
809                 RAII_VAR(struct app_forwards *, forwards, obj, ao2_cleanup);
810                 RAII_VAR(struct ast_json *, id, NULL, ast_json_unref);
811                 int append_res = -1;
812
813                 id = ast_json_string_create(forwards->id);
814
815                 switch (forwards->forward_type) {
816                 case FORWARD_CHANNEL:
817                         append_res = ast_json_array_append(channels,
818                                 ast_json_ref(id));
819                         break;
820                 case FORWARD_BRIDGE:
821                         append_res = ast_json_array_append(bridges,
822                                 ast_json_ref(id));
823                         break;
824                 case FORWARD_ENDPOINT:
825                         append_res = ast_json_array_append(endpoints,
826                                 ast_json_ref(id));
827                         break;
828                 }
829
830                 if (append_res != 0) {
831                         ast_log(LOG_ERROR, "Error building response\n");
832                         ao2_iterator_destroy(&i);
833                         return NULL;
834                 }
835         }
836         ao2_iterator_destroy(&i);
837
838         return ast_json_ref(json);
839 }
840
841 int app_subscribe_channel(struct stasis_app *app, struct ast_channel *chan)
842 {
843         int res;
844
845         if (!app || !chan) {
846                 return -1;
847         } else {
848                 RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
849                 SCOPED_AO2LOCK(lock, app->forwards);
850
851                 forwards = ao2_find(app->forwards, ast_channel_uniqueid(chan),
852                         OBJ_SEARCH_KEY | OBJ_NOLOCK);
853                 if (!forwards) {
854                         /* Forwards not found, create one */
855                         forwards = forwards_create_channel(app, chan);
856                         if (!forwards) {
857                                 return -1;
858                         }
859
860                         res = ao2_link_flags(app->forwards, forwards,
861                                 OBJ_NOLOCK);
862                         if (!res) {
863                                 return -1;
864                         }
865                 }
866
867                 ++forwards->interested;
868                 return 0;
869         }
870 }
871
872 static int subscribe_channel(struct stasis_app *app, void *obj)
873 {
874         return app_subscribe_channel(app, obj);
875 }
876
877 static int unsubscribe(struct stasis_app *app, const char *kind, const char *id)
878 {
879         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
880         SCOPED_AO2LOCK(lock, app->forwards);
881
882         forwards = ao2_find(app->forwards, id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
883         if (!forwards) {
884                 ast_log(LOG_WARNING,
885                         "App '%s' not subscribed to %s '%s'\n",
886                         app->name, kind, id);
887                 return -1;
888         }
889
890         if (--forwards->interested == 0) {
891                 /* No one is interested any more; unsubscribe */
892                 forwards_unsubscribe(forwards);
893                 ao2_find(app->forwards, forwards,
894                         OBJ_POINTER | OBJ_NOLOCK | OBJ_UNLINK |
895                         OBJ_NODATA);
896         }
897
898         return 0;
899 }
900
901 int app_unsubscribe_channel(struct stasis_app *app, struct ast_channel *chan)
902 {
903         if (!app || !chan) {
904                 return -1;
905         }
906
907         return app_unsubscribe_channel_id(app, ast_channel_uniqueid(chan));
908 }
909
910 int app_unsubscribe_channel_id(struct stasis_app *app, const char *channel_id)
911 {
912         if (!app || !channel_id) {
913                 return -1;
914         }
915
916         return unsubscribe(app, "channel", channel_id);
917 }
918
919 int app_is_subscribed_channel_id(struct stasis_app *app, const char *channel_id)
920 {
921         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
922         forwards = ao2_find(app->forwards, channel_id, OBJ_SEARCH_KEY);
923         return forwards != NULL;
924 }
925
926 static void *channel_find(const struct stasis_app *app, const char *id)
927 {
928         return ast_channel_get_by_name(id);
929 }
930
931 struct stasis_app_event_source channel_event_source = {
932         .scheme = "channel:",
933         .find = channel_find,
934         .subscribe = subscribe_channel,
935         .unsubscribe = app_unsubscribe_channel_id,
936         .is_subscribed = app_is_subscribed_channel_id
937 };
938
939 int app_subscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
940 {
941         if (!app || !bridge) {
942                 return -1;
943         } else {
944                 RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
945                 SCOPED_AO2LOCK(lock, app->forwards);
946
947                 forwards = ao2_find(app->forwards, bridge->uniqueid,
948                         OBJ_SEARCH_KEY | OBJ_NOLOCK);
949
950                 if (!forwards) {
951                         /* Forwards not found, create one */
952                         forwards = forwards_create_bridge(app, bridge);
953                         if (!forwards) {
954                                 return -1;
955                         }
956                         ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
957                 }
958
959                 ++forwards->interested;
960                 return 0;
961         }
962 }
963
964 static int subscribe_bridge(struct stasis_app *app, void *obj)
965 {
966         return app_subscribe_bridge(app, obj);
967 }
968
969 int app_unsubscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
970 {
971         if (!app || !bridge) {
972                 return -1;
973         }
974
975         return app_unsubscribe_bridge_id(app, bridge->uniqueid);
976 }
977
978 int app_unsubscribe_bridge_id(struct stasis_app *app, const char *bridge_id)
979 {
980         if (!app || !bridge_id) {
981                 return -1;
982         }
983
984         return unsubscribe(app, "bridge", bridge_id);
985 }
986
987 int app_is_subscribed_bridge_id(struct stasis_app *app, const char *bridge_id)
988 {
989         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
990         forwards = ao2_find(app->forwards, bridge_id, OBJ_SEARCH_KEY);
991         return forwards != NULL;
992 }
993
994 static void *bridge_find(const struct stasis_app *app, const char *id)
995 {
996         return stasis_app_bridge_find_by_id(id);
997 }
998
999 struct stasis_app_event_source bridge_event_source = {
1000         .scheme = "bridge:",
1001         .find = bridge_find,
1002         .subscribe = subscribe_bridge,
1003         .unsubscribe = app_unsubscribe_bridge_id,
1004         .is_subscribed = app_is_subscribed_bridge_id
1005 };
1006
1007 int app_subscribe_endpoint(struct stasis_app *app, struct ast_endpoint *endpoint)
1008 {
1009         if (!app || !endpoint) {
1010                 return -1;
1011         } else {
1012                 RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
1013                 SCOPED_AO2LOCK(lock, app->forwards);
1014
1015                 forwards = ao2_find(app->forwards, ast_endpoint_get_id(endpoint),
1016                         OBJ_SEARCH_KEY | OBJ_NOLOCK);
1017
1018                 if (!forwards) {
1019                         /* Forwards not found, create one */
1020                         forwards = forwards_create_endpoint(app, endpoint);
1021                         if (!forwards) {
1022                                 return -1;
1023                         }
1024                         ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
1025                 }
1026
1027                 ++forwards->interested;
1028                 return 0;
1029         }
1030 }
1031
1032 static int subscribe_endpoint(struct stasis_app *app, void *obj)
1033 {
1034         return app_subscribe_endpoint(app, obj);
1035 }
1036
1037 int app_unsubscribe_endpoint_id(struct stasis_app *app, const char *endpoint_id)
1038 {
1039         if (!app || !endpoint_id) {
1040                 return -1;
1041         }
1042
1043         return unsubscribe(app, "endpoint", endpoint_id);
1044 }
1045
1046 int app_is_subscribed_endpoint_id(struct stasis_app *app, const char *endpoint_id)
1047 {
1048         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
1049         forwards = ao2_find(app->forwards, endpoint_id, OBJ_SEARCH_KEY);
1050         return forwards != NULL;
1051 }
1052
1053 static void *endpoint_find(const struct stasis_app *app, const char *id)
1054 {
1055         return ast_endpoint_find_by_id(id);
1056 }
1057
1058 struct stasis_app_event_source endpoint_event_source = {
1059         .scheme = "endpoint:",
1060         .find = endpoint_find,
1061         .subscribe = subscribe_endpoint,
1062         .unsubscribe = app_unsubscribe_endpoint_id,
1063         .is_subscribed = app_is_subscribed_endpoint_id
1064 };
1065
1066 void stasis_app_register_event_sources(void)
1067 {
1068         stasis_app_register_event_source(&channel_event_source);
1069         stasis_app_register_event_source(&bridge_event_source);
1070         stasis_app_register_event_source(&endpoint_event_source);
1071 }
1072
1073 int stasis_app_is_core_event_source(struct stasis_app_event_source *obj)
1074 {
1075         return obj == &endpoint_event_source ||
1076                 obj == &bridge_event_source ||
1077                 obj == &channel_event_source;
1078 }
1079
1080 void stasis_app_unregister_event_sources(void)
1081 {
1082         stasis_app_unregister_event_source(&endpoint_event_source);
1083         stasis_app_unregister_event_source(&bridge_event_source);
1084         stasis_app_unregister_event_source(&channel_event_source);
1085 }
1086
1087