Fix build warnings
[asterisk/asterisk.git] / res / stasis / app.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis application support.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 #include "asterisk.h"
27
28 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
29
30 #include "app.h"
31
32 #include "asterisk/callerid.h"
33 #include "asterisk/stasis_app.h"
34 #include "asterisk/stasis_bridges.h"
35 #include "asterisk/stasis_channels.h"
36 #include "asterisk/stasis_message_router.h"
37
38 struct app {
39         /*! Aggregation topic for this application. */
40         struct stasis_topic *topic;
41         /*! Router for handling messages forwarded to \a topic. */
42         struct stasis_message_router *router;
43         /*! Subscription to watch for bridge merge messages */
44         struct stasis_subscription *bridge_merge_sub;
45         /*! Container of the channel forwards to this app's topic. */
46         struct ao2_container *forwards;
47         /*! Callback function for this application. */
48         stasis_app_cb handler;
49         /*! Opaque data to hand to callback function. */
50         void *data;
51         /*! Name of the Stasis application */
52         char name[];
53 };
54
55 /*! Subscription info for a particular channel/bridge. */
56 struct app_forwards {
57         /*! Count of number of times this channel/bridge has been subscribed */
58         int interested;
59
60         /*! Forward for the regular topic */
61         struct stasis_subscription *topic_forward;
62         /*! Forward for the caching topic */
63         struct stasis_subscription *topic_cached_forward;
64
65         /*! Unique id of the object being forwarded */
66         char id[];
67 };
68
69 static void forwards_dtor(void *obj)
70 {
71 #ifdef AST_DEVMODE
72         struct app_forwards *forwards = obj;
73 #endif /* AST_DEVMODE */
74
75         ast_assert(forwards->topic_forward == NULL);
76         ast_assert(forwards->topic_cached_forward == NULL);
77 }
78
79 static void forwards_unsubscribe(struct app_forwards *forwards)
80 {
81         stasis_unsubscribe(forwards->topic_forward);
82         forwards->topic_forward = NULL;
83         stasis_unsubscribe(forwards->topic_cached_forward);
84         forwards->topic_cached_forward = NULL;
85 }
86
87 static struct app_forwards *forwards_create(struct app *app,
88         const char *id)
89 {
90         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
91
92         if (!app || ast_strlen_zero(id)) {
93                 return NULL;
94         }
95
96         forwards = ao2_alloc(sizeof(*forwards) + strlen(id) + 1, forwards_dtor);
97         if (!forwards) {
98                 return NULL;
99         }
100
101         strcpy(forwards->id, id);
102
103         ao2_ref(forwards, +1);
104         return forwards;
105 }
106
107 /*! Forward a channel's topics to an app */
108 static struct app_forwards *forwards_create_channel(struct app *app,
109         struct ast_channel *chan)
110 {
111         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
112
113         if (!app || !chan) {
114                 return NULL;
115         }
116
117         forwards = forwards_create(app, ast_channel_uniqueid(chan));
118         if (!forwards) {
119                 return NULL;
120         }
121
122         forwards->topic_forward = stasis_forward_all(ast_channel_topic(chan),
123                 app->topic);
124         if (!forwards->topic_forward) {
125                 return NULL;
126         }
127
128         forwards->topic_cached_forward = stasis_forward_all(
129                 ast_channel_topic_cached(chan), app->topic);
130         if (!forwards->topic_cached_forward) {
131                 /* Half-subscribed is a bad thing */
132                 stasis_unsubscribe(forwards->topic_forward);
133                 forwards->topic_forward = NULL;
134                 return NULL;
135         }
136
137         ao2_ref(forwards, +1);
138         return forwards;
139 }
140
141 /*! Forward a bridge's topics to an app */
142 static struct app_forwards *forwards_create_bridge(struct app *app,
143         struct ast_bridge *bridge)
144 {
145         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
146
147         if (!app || !bridge) {
148                 return NULL;
149         }
150
151         forwards = forwards_create(app, bridge->uniqueid);
152         if (!forwards) {
153                 return NULL;
154         }
155
156         forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge),
157                 app->topic);
158         if (!forwards->topic_forward) {
159                 return NULL;
160         }
161
162         forwards->topic_cached_forward = stasis_forward_all(
163                 ast_bridge_topic_cached(bridge), app->topic);
164         if (!forwards->topic_cached_forward) {
165                 /* Half-subscribed is a bad thing */
166                 stasis_unsubscribe(forwards->topic_forward);
167                 forwards->topic_forward = NULL;
168                 return NULL;
169         }
170
171         ao2_ref(forwards, +1);
172         return forwards;
173 }
174
175 static int forwards_sort(const void *obj_left, const void *obj_right, int flags)
176 {
177     const struct app_forwards *object_left = obj_left;
178     const struct app_forwards *object_right = obj_right;
179     const char *right_key = obj_right;
180     int cmp;
181
182     switch (flags & (OBJ_POINTER | OBJ_KEY | OBJ_PARTIAL_KEY)) {
183     case OBJ_POINTER:
184         right_key = object_right->id;
185         /* Fall through */
186     case OBJ_KEY:
187         cmp = strcmp(object_left->id, right_key);
188         break;
189     case OBJ_PARTIAL_KEY:
190         /*
191          * We could also use a partial key struct containing a length
192          * so strlen() does not get called for every comparison instead.
193          */
194         cmp = strncmp(object_left->id, right_key, strlen(right_key));
195         break;
196     default:
197         /* Sort can only work on something with a full or partial key. */
198         ast_assert(0);
199         cmp = 0;
200         break;
201     }
202     return cmp;
203 }
204
205 static void app_dtor(void *obj)
206 {
207         struct app *app = obj;
208
209         ast_verb(1, "Destroying Stasis app %s\n", app->name);
210
211         ast_assert(app->router == NULL);
212         ast_assert(app->bridge_merge_sub == NULL);
213
214         ao2_cleanup(app->topic);
215         app->topic = NULL;
216         ao2_cleanup(app->forwards);
217         app->forwards = NULL;
218         ao2_cleanup(app->data);
219         app->data = NULL;
220 }
221
222 static void sub_default_handler(void *data, struct stasis_subscription *sub,
223         struct stasis_topic *topic, struct stasis_message *message)
224 {
225         struct app *app = data;
226         RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
227
228         if (stasis_subscription_final_message(sub, message)) {
229                 ao2_cleanup(app);
230         }
231
232         /* By default, send any message that has a JSON representation */
233         json = stasis_message_to_json(message);
234         if (!json) {
235                 return;
236         }
237
238         app_send(app, json);
239 }
240
241 /*! \brief Typedef for callbacks that get called on channel snapshot updates */
242 typedef struct ast_json *(*channel_snapshot_monitor)(
243         struct ast_channel_snapshot *old_snapshot,
244         struct ast_channel_snapshot *new_snapshot,
245         const struct timeval *tv);
246
247 static struct ast_json *simple_channel_event(
248         const char *type,
249         struct ast_channel_snapshot *snapshot,
250         const struct timeval *tv)
251 {
252         return ast_json_pack("{s: s, s: o, s: o}",
253                 "type", type,
254                 "timestamp", ast_json_timeval(*tv, NULL),
255                 "channel", ast_channel_snapshot_to_json(snapshot));
256 }
257
258 static struct ast_json *channel_created_event(
259         struct ast_channel_snapshot *snapshot,
260         const struct timeval *tv)
261 {
262         return simple_channel_event("ChannelCreated", snapshot, tv);
263 }
264
265 static struct ast_json *channel_destroyed_event(
266         struct ast_channel_snapshot *snapshot,
267         const struct timeval *tv)
268 {
269         return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
270                 "type", "ChannelDestroyed",
271                 "timestamp", ast_json_timeval(*tv, NULL),
272                 "cause", snapshot->hangupcause,
273                 "cause_txt", ast_cause2str(snapshot->hangupcause),
274                 "channel", ast_channel_snapshot_to_json(snapshot));
275 }
276
277 static struct ast_json *channel_state_change_event(
278         struct ast_channel_snapshot *snapshot,
279         const struct timeval *tv)
280 {
281         return simple_channel_event("ChannelStateChange", snapshot, tv);
282 }
283
284 /*! \brief Handle channel state changes */
285 static struct ast_json *channel_state(
286         struct ast_channel_snapshot *old_snapshot,
287         struct ast_channel_snapshot *new_snapshot,
288         const struct timeval *tv)
289 {
290         struct ast_channel_snapshot *snapshot = new_snapshot ?
291                 new_snapshot : old_snapshot;
292
293         if (!old_snapshot) {
294                 return channel_created_event(snapshot, tv);
295         } else if (!new_snapshot) {
296                 return channel_destroyed_event(snapshot, tv);
297         } else if (old_snapshot->state != new_snapshot->state) {
298                 return channel_state_change_event(snapshot, tv);
299         }
300
301         return NULL;
302 }
303
304 static struct ast_json *channel_dialplan(
305         struct ast_channel_snapshot *old_snapshot,
306         struct ast_channel_snapshot *new_snapshot,
307         const struct timeval *tv)
308 {
309         RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
310
311         /* No Newexten event on cache clear or first event */
312         if (!old_snapshot || !new_snapshot) {
313                 return NULL;
314         }
315
316         /* Empty application is not valid for a Newexten event */
317         if (ast_strlen_zero(new_snapshot->appl)) {
318                 return NULL;
319         }
320
321         if (ast_channel_snapshot_cep_equal(old_snapshot, new_snapshot)) {
322                 return NULL;
323         }
324
325         return ast_json_pack("{s: s, s: o, s: s, s: s, s: o}",
326                 "type", "ChannelDialplan",
327                 "timestamp", ast_json_timeval(*tv, NULL),
328                 "dialplan_app", new_snapshot->appl,
329                 "dialplan_app_data", new_snapshot->data,
330                 "channel", ast_channel_snapshot_to_json(new_snapshot));
331 }
332
333 static struct ast_json *channel_callerid(
334         struct ast_channel_snapshot *old_snapshot,
335         struct ast_channel_snapshot *new_snapshot,
336         const struct timeval *tv)
337 {
338         RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
339
340         /* No NewCallerid event on cache clear or first event */
341         if (!old_snapshot || !new_snapshot) {
342                 return NULL;
343         }
344
345         if (ast_channel_snapshot_caller_id_equal(old_snapshot, new_snapshot)) {
346                 return NULL;
347         }
348
349         return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
350                 "type", "ChannelCallerId",
351                 "timestamp", ast_json_timeval(*tv, NULL),
352                 "caller_presentation", new_snapshot->caller_pres,
353                 "caller_presentation_txt", ast_describe_caller_presentation(
354                         new_snapshot->caller_pres),
355                 "channel", ast_channel_snapshot_to_json(new_snapshot));
356 }
357
358 static channel_snapshot_monitor channel_monitors[] = {
359         channel_state,
360         channel_dialplan,
361         channel_callerid
362 };
363
364 static void sub_channel_update_handler(void *data,
365                 struct stasis_subscription *sub,
366                 struct stasis_topic *topic,
367                 struct stasis_message *message)
368 {
369         struct app *app = data;
370         struct stasis_cache_update *update;
371         struct ast_channel_snapshot *new_snapshot;
372         struct ast_channel_snapshot *old_snapshot;
373         const struct timeval *tv;
374         int i;
375
376         ast_assert(stasis_message_type(message) == stasis_cache_update_type());
377
378         update = stasis_message_data(message);
379
380         ast_assert(update->type == ast_channel_snapshot_type());
381
382         new_snapshot = stasis_message_data(update->new_snapshot);
383         old_snapshot = stasis_message_data(update->old_snapshot);
384
385         /* Pull timestamp from the new snapshot, or from the update message
386          * when there isn't one. */
387         tv = update->new_snapshot ?
388                 stasis_message_timestamp(update->new_snapshot) :
389                 stasis_message_timestamp(message);
390
391         for (i = 0; i < ARRAY_LEN(channel_monitors); ++i) {
392                 RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
393
394                 msg = channel_monitors[i](old_snapshot, new_snapshot, tv);
395                 if (msg) {
396                         app_send(app, msg);
397                 }
398         }
399 }
400
401 static struct ast_json *simple_bridge_event(
402         const char *type,
403         struct ast_bridge_snapshot *snapshot,
404         const struct timeval *tv)
405 {
406         return ast_json_pack("{s: s, s: o, s: o}",
407                 "type", type,
408                 "timestamp", ast_json_timeval(*tv, NULL),
409                 "bridge", ast_bridge_snapshot_to_json(snapshot));
410 }
411
412 static void sub_bridge_update_handler(void *data,
413                 struct stasis_subscription *sub,
414                 struct stasis_topic *topic,
415                 struct stasis_message *message)
416 {
417         RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
418         struct app *app = data;
419         struct stasis_cache_update *update;
420         struct ast_bridge_snapshot *new_snapshot;
421         struct ast_bridge_snapshot *old_snapshot;
422         const struct timeval *tv;
423
424         ast_assert(stasis_message_type(message) == stasis_cache_update_type());
425
426         update = stasis_message_data(message);
427
428         ast_assert(update->type == ast_bridge_snapshot_type());
429
430         new_snapshot = stasis_message_data(update->new_snapshot);
431         old_snapshot = stasis_message_data(update->old_snapshot);
432         tv = update->new_snapshot ?
433                 stasis_message_timestamp(update->new_snapshot) :
434                 stasis_message_timestamp(message);
435
436         if (!new_snapshot) {
437                 json = simple_bridge_event("BridgeDestroyed", old_snapshot, tv);
438         } else if (!old_snapshot) {
439                 json = simple_bridge_event("BridgeCreated", old_snapshot, tv);
440         }
441
442         if (!json) {
443                 return;
444         }
445
446         app_send(app, json);
447 }
448
449 static void bridge_merge_handler(void *data, struct stasis_subscription *sub,
450         struct stasis_topic *topic, struct stasis_message *message)
451 {
452         struct app *app = data;
453         struct ast_bridge_merge_message *merge;
454         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
455
456         if (stasis_subscription_final_message(sub, message)) {
457                 ao2_cleanup(app);
458         }
459
460         if (stasis_message_type(message) != ast_bridge_merge_message_type()) {
461                 return;
462         }
463
464         merge = stasis_message_data(message);
465
466         /* Find out if we're subscribed to either bridge */
467         forwards = ao2_find(app->forwards, merge->from->uniqueid,
468                 OBJ_SEARCH_KEY);
469         if (!forwards) {
470                 forwards = ao2_find(app->forwards, merge->to->uniqueid,
471                         OBJ_SEARCH_KEY);
472         }
473
474         if (!forwards) {
475                 return;
476         }
477
478         /* Forward the message to the app */
479         stasis_forward_message(app->topic, topic, message);
480 }
481
482 struct app *app_create(const char *name, stasis_app_cb handler, void *data)
483 {
484         RAII_VAR(struct app *, app, NULL, ao2_cleanup);
485         size_t size;
486         int res = 0;
487
488         ast_assert(name != NULL);
489         ast_assert(handler != NULL);
490
491         ast_verb(1, "Creating Stasis app '%s'\n", name);
492
493         size = sizeof(*app) + strlen(name) + 1;
494         app = ao2_alloc_options(size, app_dtor, AO2_ALLOC_OPT_LOCK_MUTEX);
495
496         if (!app) {
497                 return NULL;
498         }
499
500         app->forwards = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_MUTEX,
501                 AO2_CONTAINER_ALLOC_OPT_DUPS_OBJ_REJECT,
502                 forwards_sort, NULL);
503         if (!app->forwards) {
504                 return NULL;
505         }
506
507         app->topic = stasis_topic_create(name);
508         if (!app->topic) {
509                 return NULL;
510         }
511
512         app->bridge_merge_sub = stasis_subscribe(ast_bridge_topic_all(),
513                 bridge_merge_handler, app);
514         if (!app->bridge_merge_sub) {
515                 return NULL;
516         }
517         /* Subscription holds a reference */
518         ao2_ref(app, +1);
519
520         app->router = stasis_message_router_create(app->topic);
521         if (!app->router) {
522                 return NULL;
523         }
524
525         res |= stasis_message_router_add_cache_update(app->router,
526                 ast_bridge_snapshot_type(), sub_bridge_update_handler, app);
527
528         res |= stasis_message_router_add_cache_update(app->router,
529                 ast_channel_snapshot_type(), sub_channel_update_handler, app);
530
531         res |= stasis_message_router_set_default(app->router,
532                 sub_default_handler, app);
533
534         if (res != 0) {
535                 return NULL;
536         }
537         /* Router holds a reference */
538         ao2_ref(app, +1);
539
540         strncpy(app->name, name, size - sizeof(*app));
541         app->handler = handler;
542         ao2_ref(data, +1);
543         app->data = data;
544
545         ao2_ref(app, +1);
546         return app;
547 }
548
549 /*!
550  * \brief Send a message to the given application.
551  * \param app App to send the message to.
552  * \param message Message to send.
553  */
554 void app_send(struct app *app, struct ast_json *message)
555 {
556         stasis_app_cb handler;
557         RAII_VAR(void *, data, NULL, ao2_cleanup);
558
559         /* Copy off mutable state with lock held */
560         {
561                 SCOPED_AO2LOCK(lock, app);
562                 handler = app->handler;
563                 if (app->data) {
564                         ao2_ref(app->data, +1);
565                         data = app->data;
566                 }
567                 /* Name is immutable; no need to copy */
568         }
569
570         if (!handler) {
571                 ast_verb(3,
572                         "Inactive Stasis app '%s' missed message\n", app->name);
573                 return;
574         }
575
576         handler(data, app->name, message);
577 }
578
579 void app_deactivate(struct app *app)
580 {
581         SCOPED_AO2LOCK(lock, app);
582         ast_verb(1, "Deactivating Stasis app '%s'\n", app->name);
583         app->handler = NULL;
584         ao2_cleanup(app->data);
585         app->data = NULL;
586 }
587
588 void app_shutdown(struct app *app)
589 {
590         SCOPED_AO2LOCK(lock, app);
591
592         ast_assert(app_is_finished(app));
593
594         stasis_message_router_unsubscribe(app->router);
595         app->router = NULL;
596         stasis_unsubscribe(app->bridge_merge_sub);
597         app->bridge_merge_sub = NULL;
598 }
599
600 int app_is_active(struct app *app)
601 {
602         SCOPED_AO2LOCK(lock, app);
603         return app->handler != NULL;
604 }
605
606 int app_is_finished(struct app *app)
607 {
608         SCOPED_AO2LOCK(lock, app);
609
610         return app->handler == NULL && ao2_container_count(app->forwards) == 0;
611 }
612
613 void app_update(struct app *app, stasis_app_cb handler, void *data)
614 {
615         SCOPED_AO2LOCK(lock, app);
616
617         if (app->handler) {
618                 RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
619
620                 ast_verb(1, "Replacing Stasis app '%s'\n", app->name);
621
622                 msg = ast_json_pack("{s: s, s: s}",
623                         "type", "ApplicationReplaced",
624                         "application", app->name);
625                 if (msg) {
626                         app_send(app, msg);
627                 }
628         } else {
629                 ast_verb(1, "Activating Stasis app '%s'\n", app->name);
630         }
631
632         app->handler = handler;
633         ao2_cleanup(app->data);
634         if (data) {
635                 ao2_ref(data, +1);
636         }
637         app->data = data;
638 }
639
640 const char *app_name(const struct app *app)
641 {
642         return app->name;
643 }
644
645 int app_subscribe_channel(struct app *app, struct ast_channel *chan)
646 {
647         int res;
648
649         if (!app || !chan) {
650                 return -1;
651         } else {
652                 RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
653                 SCOPED_AO2LOCK(lock, app->forwards);
654
655                 forwards = ao2_find(app->forwards, ast_channel_uniqueid(chan),
656                         OBJ_SEARCH_KEY | OBJ_NOLOCK);
657                 if (!forwards) {
658                         /* Forwards not found, create one */
659                         forwards = forwards_create_channel(app, chan);
660                         if (!forwards) {
661                                 return -1;
662                         }
663
664                         res = ao2_link_flags(app->forwards, forwards,
665                                 OBJ_NOLOCK);
666                         if (!res) {
667                                 return -1;
668                         }
669                 }
670
671                 ++forwards->interested;
672                 return 0;
673         }
674 }
675
676 static int unsubscribe(struct app *app, const char *kind, const char *id)
677 {
678         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
679         SCOPED_AO2LOCK(lock, app->forwards);
680
681         forwards = ao2_find(app->forwards, id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
682         if (!forwards) {
683                 ast_log(LOG_ERROR,
684                         "App '%s' not subscribed to %s '%s'",
685                         app->name, kind, id);
686                 return -1;
687         }
688
689         if (--forwards->interested == 0) {
690                 /* No one is interested any more; unsubscribe */
691                 forwards_unsubscribe(forwards);
692                 ao2_find(app->forwards, forwards,
693                         OBJ_POINTER | OBJ_NOLOCK | OBJ_UNLINK |
694                         OBJ_NODATA);
695         }
696
697         return 0;
698 }
699
700 int app_unsubscribe_channel(struct app *app, struct ast_channel *chan)
701 {
702         if (!app || !chan) {
703                 return -1;
704         }
705
706         return unsubscribe(app, "channel", ast_channel_uniqueid(chan));
707 }
708
709 int app_subscribe_bridge(struct app *app, struct ast_bridge *bridge)
710 {
711         if (!app || !bridge) {
712                 return -1;
713         } else {
714                 RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
715                 SCOPED_AO2LOCK(lock, app->forwards);
716
717                 forwards = ao2_find(app->forwards, bridge->uniqueid,
718                         OBJ_SEARCH_KEY | OBJ_NOLOCK);
719
720                 if (!forwards) {
721                         /* Forwards not found, create one */
722                         forwards = forwards_create_bridge(app, bridge);
723                         if (!forwards) {
724                                 return -1;
725                         }
726                         ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
727                 }
728
729                 ++forwards->interested;
730                 return 0;
731         }
732 }
733
734 int app_unsubscribe_bridge(struct app *app, struct ast_bridge *bridge)
735 {
736         if (!app || !bridge) {
737                 return -1;
738         }
739
740         return unsubscribe(app, "bridge", bridge->uniqueid);
741 }