8abe0c19c23be80f829fd0d234084ba8de089417
[asterisk/asterisk.git] / res / stasis / app.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis application support.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 #include "asterisk.h"
27
28 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
29
30 #include "app.h"
31
32 #include "asterisk/callerid.h"
33 #include "asterisk/stasis_app.h"
34 #include "asterisk/stasis_bridges.h"
35 #include "asterisk/stasis_channels.h"
36 #include "asterisk/stasis_message_router.h"
37
38 struct app {
39         /*! Aggregation topic for this application. */
40         struct stasis_topic *topic;
41         /*! Router for handling messages forwarded to \a topic. */
42         struct stasis_message_router *router;
43         /*! Subscription to watch for bridge merge messages */
44         struct stasis_subscription *bridge_merge_sub;
45         /*! Container of the channel forwards to this app's topic. */
46         struct ao2_container *forwards;
47         /*! Callback function for this application. */
48         stasis_app_cb handler;
49         /*! Opaque data to hand to callback function. */
50         void *data;
51         /*! Name of the Stasis application */
52         char name[];
53 };
54
55 /*! Subscription info for a particular channel/bridge. */
56 struct app_forwards {
57         /*! Count of number of times this channel/bridge has been subscribed */
58         int interested;
59
60         /*! Forward for the regular topic */
61         struct stasis_subscription *topic_forward;
62         /*! Forward for the caching topic */
63         struct stasis_subscription *topic_cached_forward;
64
65         /*! Unique id of the object being forwarded */
66         char id[];
67 };
68
69 static void forwards_dtor(void *obj)
70 {
71         struct app_forwards *forwards = obj;
72
73         ast_assert(forwards->topic_forward == NULL);
74         ast_assert(forwards->topic_cached_forward == NULL);
75 }
76
77 static void forwards_unsubscribe(struct app_forwards *forwards)
78 {
79         stasis_unsubscribe(forwards->topic_forward);
80         forwards->topic_forward = NULL;
81         stasis_unsubscribe(forwards->topic_cached_forward);
82         forwards->topic_cached_forward = NULL;
83 }
84
85 static struct app_forwards *forwards_create(struct app *app,
86         const char *id)
87 {
88         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
89
90         if (!app || ast_strlen_zero(id)) {
91                 return NULL;
92         }
93
94         forwards = ao2_alloc(sizeof(*forwards) + strlen(id) + 1, forwards_dtor);
95         if (!forwards) {
96                 return NULL;
97         }
98
99         strcpy(forwards->id, id);
100
101         ao2_ref(forwards, +1);
102         return forwards;
103 }
104
105 /*! Forward a channel's topics to an app */
106 static struct app_forwards *forwards_create_channel(struct app *app,
107         struct ast_channel *chan)
108 {
109         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
110
111         if (!app || !chan) {
112                 return NULL;
113         }
114
115         forwards = forwards_create(app, ast_channel_uniqueid(chan));
116         if (!forwards) {
117                 return NULL;
118         }
119
120         forwards->topic_forward = stasis_forward_all(ast_channel_topic(chan),
121                 app->topic);
122         if (!forwards->topic_forward) {
123                 return NULL;
124         }
125
126         forwards->topic_cached_forward = stasis_forward_all(
127                 ast_channel_topic_cached(chan), app->topic);
128         if (!forwards->topic_cached_forward) {
129                 /* Half-subscribed is a bad thing */
130                 stasis_unsubscribe(forwards->topic_forward);
131                 forwards->topic_forward = NULL;
132                 return NULL;
133         }
134
135         ao2_ref(forwards, +1);
136         return forwards;
137 }
138
139 /*! Forward a bridge's topics to an app */
140 static struct app_forwards *forwards_create_bridge(struct app *app,
141         struct ast_bridge *bridge)
142 {
143         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
144
145         if (!app || !bridge) {
146                 return NULL;
147         }
148
149         forwards = forwards_create(app, bridge->uniqueid);
150         if (!forwards) {
151                 return NULL;
152         }
153
154         forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge),
155                 app->topic);
156         if (!forwards->topic_forward) {
157                 return NULL;
158         }
159
160         forwards->topic_cached_forward = stasis_forward_all(
161                 ast_bridge_topic_cached(bridge), app->topic);
162         if (!forwards->topic_cached_forward) {
163                 /* Half-subscribed is a bad thing */
164                 stasis_unsubscribe(forwards->topic_forward);
165                 forwards->topic_forward = NULL;
166                 return NULL;
167         }
168
169         ao2_ref(forwards, +1);
170         return forwards;
171 }
172
173 static int forwards_sort(const void *obj_left, const void *obj_right, int flags)
174 {
175     const struct app_forwards *object_left = obj_left;
176     const struct app_forwards *object_right = obj_right;
177     const char *right_key = obj_right;
178     int cmp;
179
180     switch (flags & (OBJ_POINTER | OBJ_KEY | OBJ_PARTIAL_KEY)) {
181     case OBJ_POINTER:
182         right_key = object_right->id;
183         /* Fall through */
184     case OBJ_KEY:
185         cmp = strcmp(object_left->id, right_key);
186         break;
187     case OBJ_PARTIAL_KEY:
188         /*
189          * We could also use a partial key struct containing a length
190          * so strlen() does not get called for every comparison instead.
191          */
192         cmp = strncmp(object_left->id, right_key, strlen(right_key));
193         break;
194     default:
195         /* Sort can only work on something with a full or partial key. */
196         ast_assert(0);
197         cmp = 0;
198         break;
199     }
200     return cmp;
201 }
202
203 static void app_dtor(void *obj)
204 {
205         struct app *app = obj;
206
207         ast_verb(1, "Destroying Stasis app %s\n", app->name);
208
209         ast_assert(app->router == NULL);
210         ast_assert(app->bridge_merge_sub == NULL);
211
212         ao2_cleanup(app->topic);
213         app->topic = NULL;
214         ao2_cleanup(app->forwards);
215         app->forwards = NULL;
216         ao2_cleanup(app->data);
217         app->data = NULL;
218 }
219
220 static void sub_default_handler(void *data, struct stasis_subscription *sub,
221         struct stasis_topic *topic, struct stasis_message *message)
222 {
223         struct app *app = data;
224         RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
225
226         if (stasis_subscription_final_message(sub, message)) {
227                 ao2_cleanup(app);
228         }
229
230         /* By default, send any message that has a JSON representation */
231         json = stasis_message_to_json(message);
232         if (!json) {
233                 return;
234         }
235
236         app_send(app, json);
237 }
238
239 /*! \brief Typedef for callbacks that get called on channel snapshot updates */
240 typedef struct ast_json *(*channel_snapshot_monitor)(
241         struct ast_channel_snapshot *old_snapshot,
242         struct ast_channel_snapshot *new_snapshot,
243         const struct timeval *tv);
244
245 static struct ast_json *simple_channel_event(
246         const char *type,
247         struct ast_channel_snapshot *snapshot,
248         const struct timeval *tv)
249 {
250         return ast_json_pack("{s: s, s: o, s: o}",
251                 "type", type,
252                 "timestamp", ast_json_timeval(*tv, NULL),
253                 "channel", ast_channel_snapshot_to_json(snapshot));
254 }
255
256 static struct ast_json *channel_created_event(
257         struct ast_channel_snapshot *snapshot,
258         const struct timeval *tv)
259 {
260         return simple_channel_event("ChannelCreated", snapshot, tv);
261 }
262
263 static struct ast_json *channel_destroyed_event(
264         struct ast_channel_snapshot *snapshot,
265         const struct timeval *tv)
266 {
267         return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
268                 "type", "ChannelDestroyed",
269                 "timestamp", ast_json_timeval(*tv, NULL),
270                 "cause", snapshot->hangupcause,
271                 "cause_txt", ast_cause2str(snapshot->hangupcause),
272                 "channel", ast_channel_snapshot_to_json(snapshot));
273 }
274
275 static struct ast_json *channel_state_change_event(
276         struct ast_channel_snapshot *snapshot,
277         const struct timeval *tv)
278 {
279         return simple_channel_event("ChannelStateChange", snapshot, tv);
280 }
281
282 /*! \brief Handle channel state changes */
283 static struct ast_json *channel_state(
284         struct ast_channel_snapshot *old_snapshot,
285         struct ast_channel_snapshot *new_snapshot,
286         const struct timeval *tv)
287 {
288         struct ast_channel_snapshot *snapshot = new_snapshot ?
289                 new_snapshot : old_snapshot;
290
291         if (!old_snapshot) {
292                 return channel_created_event(snapshot, tv);
293         } else if (!new_snapshot) {
294                 return channel_destroyed_event(snapshot, tv);
295         } else if (old_snapshot->state != new_snapshot->state) {
296                 return channel_state_change_event(snapshot, tv);
297         }
298
299         return NULL;
300 }
301
302 static struct ast_json *channel_dialplan(
303         struct ast_channel_snapshot *old_snapshot,
304         struct ast_channel_snapshot *new_snapshot,
305         const struct timeval *tv)
306 {
307         RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
308
309         /* No Newexten event on cache clear or first event */
310         if (!old_snapshot || !new_snapshot) {
311                 return NULL;
312         }
313
314         /* Empty application is not valid for a Newexten event */
315         if (ast_strlen_zero(new_snapshot->appl)) {
316                 return NULL;
317         }
318
319         if (ast_channel_snapshot_cep_equal(old_snapshot, new_snapshot)) {
320                 return NULL;
321         }
322
323         return ast_json_pack("{s: s, s: o, s: s, s: s, s: o}",
324                 "type", "ChannelDialplan",
325                 "timestamp", ast_json_timeval(*tv, NULL),
326                 "dialplan_app", new_snapshot->appl,
327                 "dialplan_app_data", new_snapshot->data,
328                 "channel", ast_channel_snapshot_to_json(new_snapshot));
329 }
330
331 static struct ast_json *channel_callerid(
332         struct ast_channel_snapshot *old_snapshot,
333         struct ast_channel_snapshot *new_snapshot,
334         const struct timeval *tv)
335 {
336         RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
337
338         /* No NewCallerid event on cache clear or first event */
339         if (!old_snapshot || !new_snapshot) {
340                 return NULL;
341         }
342
343         if (ast_channel_snapshot_caller_id_equal(old_snapshot, new_snapshot)) {
344                 return NULL;
345         }
346
347         return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
348                 "type", "ChannelCallerId",
349                 "timestamp", ast_json_timeval(*tv, NULL),
350                 "caller_presentation", new_snapshot->caller_pres,
351                 "caller_presentation_txt", ast_describe_caller_presentation(
352                         new_snapshot->caller_pres),
353                 "channel", ast_channel_snapshot_to_json(new_snapshot));
354 }
355
356 static channel_snapshot_monitor channel_monitors[] = {
357         channel_state,
358         channel_dialplan,
359         channel_callerid
360 };
361
362 static void sub_channel_update_handler(void *data,
363                 struct stasis_subscription *sub,
364                 struct stasis_topic *topic,
365                 struct stasis_message *message)
366 {
367         struct app *app = data;
368         struct stasis_cache_update *update;
369         struct ast_channel_snapshot *new_snapshot;
370         struct ast_channel_snapshot *old_snapshot;
371         const struct timeval *tv;
372         int i;
373
374         ast_assert(stasis_message_type(message) == stasis_cache_update_type());
375
376         update = stasis_message_data(message);
377
378         ast_assert(update->type == ast_channel_snapshot_type());
379
380         new_snapshot = stasis_message_data(update->new_snapshot);
381         old_snapshot = stasis_message_data(update->old_snapshot);
382
383         /* Pull timestamp from the new snapshot, or from the update message
384          * when there isn't one. */
385         tv = update->new_snapshot ?
386                 stasis_message_timestamp(update->new_snapshot) :
387                 stasis_message_timestamp(message);
388
389         for (i = 0; i < ARRAY_LEN(channel_monitors); ++i) {
390                 RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
391
392                 msg = channel_monitors[i](old_snapshot, new_snapshot, tv);
393                 if (msg) {
394                         app_send(app, msg);
395                 }
396         }
397 }
398
399 static struct ast_json *simple_bridge_event(
400         const char *type,
401         struct ast_bridge_snapshot *snapshot,
402         const struct timeval *tv)
403 {
404         return ast_json_pack("{s: s, s: o, s: o}",
405                 "type", type,
406                 "timestamp", ast_json_timeval(*tv, NULL),
407                 "bridge", ast_bridge_snapshot_to_json(snapshot));
408 }
409
410 static void sub_bridge_update_handler(void *data,
411                 struct stasis_subscription *sub,
412                 struct stasis_topic *topic,
413                 struct stasis_message *message)
414 {
415         RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
416         struct app *app = data;
417         struct stasis_cache_update *update;
418         struct ast_bridge_snapshot *new_snapshot;
419         struct ast_bridge_snapshot *old_snapshot;
420         const struct timeval *tv;
421
422         ast_assert(stasis_message_type(message) == stasis_cache_update_type());
423
424         update = stasis_message_data(message);
425
426         ast_assert(update->type == ast_bridge_snapshot_type());
427
428         new_snapshot = stasis_message_data(update->new_snapshot);
429         old_snapshot = stasis_message_data(update->old_snapshot);
430         tv = update->new_snapshot ?
431                 stasis_message_timestamp(update->new_snapshot) :
432                 stasis_message_timestamp(message);
433
434         if (!new_snapshot) {
435                 json = simple_bridge_event("BridgeDestroyed", old_snapshot, tv);
436         } else if (!old_snapshot) {
437                 json = simple_bridge_event("BridgeCreated", old_snapshot, tv);
438         }
439
440         if (!json) {
441                 return;
442         }
443
444         app_send(app, json);
445 }
446
447 static void bridge_merge_handler(void *data, struct stasis_subscription *sub,
448         struct stasis_topic *topic, struct stasis_message *message)
449 {
450         struct app *app = data;
451         struct ast_bridge_merge_message *merge;
452         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
453
454         if (stasis_subscription_final_message(sub, message)) {
455                 ao2_cleanup(app);
456         }
457
458         if (stasis_message_type(message) != ast_bridge_merge_message_type()) {
459                 return;
460         }
461
462         merge = stasis_message_data(message);
463
464         /* Find out if we're subscribed to either bridge */
465         forwards = ao2_find(app->forwards, merge->from->uniqueid,
466                 OBJ_SEARCH_KEY);
467         if (!forwards) {
468                 forwards = ao2_find(app->forwards, merge->to->uniqueid,
469                         OBJ_SEARCH_KEY);
470         }
471
472         if (!forwards) {
473                 return;
474         }
475
476         /* Forward the message to the app */
477         stasis_forward_message(app->topic, topic, message);
478 }
479
480 struct app *app_create(const char *name, stasis_app_cb handler, void *data)
481 {
482         RAII_VAR(struct app *, app, NULL, ao2_cleanup);
483         size_t size;
484         int res = 0;
485
486         ast_assert(name != NULL);
487         ast_assert(handler != NULL);
488
489         ast_verb(1, "Creating Stasis app '%s'\n", name);
490
491         size = sizeof(*app) + strlen(name) + 1;
492         app = ao2_alloc_options(size, app_dtor, AO2_ALLOC_OPT_LOCK_MUTEX);
493
494         if (!app) {
495                 return NULL;
496         }
497
498         app->forwards = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_MUTEX,
499                 AO2_CONTAINER_ALLOC_OPT_DUPS_OBJ_REJECT,
500                 forwards_sort, NULL);
501         if (!app->forwards) {
502                 return NULL;
503         }
504
505         app->topic = stasis_topic_create(name);
506         if (!app->topic) {
507                 return NULL;
508         }
509
510         app->bridge_merge_sub = stasis_subscribe(ast_bridge_topic_all(),
511                 bridge_merge_handler, app);
512         if (!app->bridge_merge_sub) {
513                 return NULL;
514         }
515         /* Subscription holds a reference */
516         ao2_ref(app, +1);
517
518         app->router = stasis_message_router_create(app->topic);
519         if (!app->router) {
520                 return NULL;
521         }
522
523         res |= stasis_message_router_add_cache_update(app->router,
524                 ast_bridge_snapshot_type(), sub_bridge_update_handler, app);
525
526         res |= stasis_message_router_add_cache_update(app->router,
527                 ast_channel_snapshot_type(), sub_channel_update_handler, app);
528
529         res |= stasis_message_router_set_default(app->router,
530                 sub_default_handler, app);
531
532         if (res != 0) {
533                 return NULL;
534         }
535         /* Router holds a reference */
536         ao2_ref(app, +1);
537
538         strncpy(app->name, name, size - sizeof(*app));
539         app->handler = handler;
540         ao2_ref(data, +1);
541         app->data = data;
542
543         ao2_ref(app, +1);
544         return app;
545 }
546
547 /*!
548  * \brief Send a message to the given application.
549  * \param app App to send the message to.
550  * \param message Message to send.
551  */
552 void app_send(struct app *app, struct ast_json *message)
553 {
554         stasis_app_cb handler;
555         RAII_VAR(void *, data, NULL, ao2_cleanup);
556
557         /* Copy off mutable state with lock held */
558         {
559                 SCOPED_AO2LOCK(lock, app);
560                 handler = app->handler;
561                 if (app->data) {
562                         ao2_ref(app->data, +1);
563                         data = app->data;
564                 }
565                 /* Name is immutable; no need to copy */
566         }
567
568         if (!handler) {
569                 ast_verb(3,
570                         "Inactive Stasis app '%s' missed message\n", app->name);
571                 return;
572         }
573
574         handler(data, app->name, message);
575 }
576
577 void app_deactivate(struct app *app)
578 {
579         SCOPED_AO2LOCK(lock, app);
580         ast_verb(1, "Deactivating Stasis app '%s'\n", app->name);
581         app->handler = NULL;
582         ao2_cleanup(app->data);
583         app->data = NULL;
584 }
585
586 void app_shutdown(struct app *app)
587 {
588         SCOPED_AO2LOCK(lock, app);
589
590         ast_assert(app_is_finished(app));
591
592         stasis_message_router_unsubscribe(app->router);
593         app->router = NULL;
594         stasis_unsubscribe(app->bridge_merge_sub);
595         app->bridge_merge_sub = NULL;
596 }
597
598 int app_is_active(struct app *app)
599 {
600         SCOPED_AO2LOCK(lock, app);
601         return app->handler != NULL;
602 }
603
604 int app_is_finished(struct app *app)
605 {
606         SCOPED_AO2LOCK(lock, app);
607
608         return app->handler == NULL && ao2_container_count(app->forwards) == 0;
609 }
610
611 void app_update(struct app *app, stasis_app_cb handler, void *data)
612 {
613         SCOPED_AO2LOCK(lock, app);
614
615         if (app->handler) {
616                 RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
617
618                 ast_verb(1, "Replacing Stasis app '%s'\n", app->name);
619
620                 msg = ast_json_pack("{s: s, s: s}",
621                         "type", "ApplicationReplaced",
622                         "application", app->name);
623                 if (msg) {
624                         app_send(app, msg);
625                 }
626         } else {
627                 ast_verb(1, "Activating Stasis app '%s'\n", app->name);
628         }
629
630         app->handler = handler;
631         ao2_cleanup(app->data);
632         if (data) {
633                 ao2_ref(data, +1);
634         }
635         app->data = data;
636 }
637
638 const char *app_name(const struct app *app)
639 {
640         return app->name;
641 }
642
643 int app_subscribe_channel(struct app *app, struct ast_channel *chan)
644 {
645         int res;
646
647         if (!app || !chan) {
648                 return -1;
649         } else {
650                 RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
651                 SCOPED_AO2LOCK(lock, app->forwards);
652
653                 forwards = ao2_find(app->forwards, ast_channel_uniqueid(chan),
654                         OBJ_SEARCH_KEY | OBJ_NOLOCK);
655                 if (!forwards) {
656                         /* Forwards not found, create one */
657                         forwards = forwards_create_channel(app, chan);
658                         if (!forwards) {
659                                 return -1;
660                         }
661
662                         res = ao2_link_flags(app->forwards, forwards,
663                                 OBJ_NOLOCK);
664                         if (!res) {
665                                 return -1;
666                         }
667                 }
668
669                 ++forwards->interested;
670                 return 0;
671         }
672 }
673
674 static int unsubscribe(struct app *app, const char *kind, const char *id)
675 {
676         RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
677         SCOPED_AO2LOCK(lock, app->forwards);
678
679         forwards = ao2_find(app->forwards, id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
680         if (!forwards) {
681                 ast_log(LOG_ERROR,
682                         "App '%s' not subscribed to %s '%s'",
683                         app->name, kind, id);
684                 return -1;
685         }
686
687         if (--forwards->interested == 0) {
688                 /* No one is interested any more; unsubscribe */
689                 forwards_unsubscribe(forwards);
690                 ao2_find(app->forwards, forwards,
691                         OBJ_POINTER | OBJ_NOLOCK | OBJ_UNLINK |
692                         OBJ_NODATA);
693         }
694
695         return 0;
696 }
697
698 int app_unsubscribe_channel(struct app *app, struct ast_channel *chan)
699 {
700         if (!app || !chan) {
701                 return -1;
702         }
703
704         return unsubscribe(app, "channel", ast_channel_uniqueid(chan));
705 }
706
707 int app_subscribe_bridge(struct app *app, struct ast_bridge *bridge)
708 {
709         if (!app || !bridge) {
710                 return -1;
711         } else {
712                 RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
713                 SCOPED_AO2LOCK(lock, app->forwards);
714
715                 forwards = ao2_find(app->forwards, bridge->uniqueid,
716                         OBJ_SEARCH_KEY | OBJ_NOLOCK);
717
718                 if (!forwards) {
719                         /* Forwards not found, create one */
720                         forwards = forwards_create_bridge(app, bridge);
721                         if (!forwards) {
722                                 return -1;
723                         }
724                         ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
725                 }
726
727                 ++forwards->interested;
728                 return 0;
729         }
730 }
731
732 int app_unsubscribe_bridge(struct app *app, struct ast_bridge *bridge)
733 {
734         if (!app || !bridge) {
735                 return -1;
736         }
737
738         return unsubscribe(app, "bridge", bridge->uniqueid);
739 }