2 * Asterisk -- An open source telephony toolkit.
4 * Copyright (C) 2013, Digium, Inc.
6 * David M. Lee, II <dlee@digium.com>
8 * See http://www.asterisk.org for more information about
9 * the Asterisk project. Please do not directly contact
10 * any of the maintainers of this project for assistance;
11 * the project provides a web site, mailing lists and IRC
12 * channels for your use.
14 * This program is free software, distributed under the terms of
15 * the GNU General Public License Version 2. See the LICENSE file
16 * at the top of the source tree.
21 * \brief Stasis application support.
23 * \author David M. Lee, II <dlee@digium.com>
28 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
32 #include "asterisk/callerid.h"
33 #include "asterisk/stasis_app.h"
34 #include "asterisk/stasis_bridges.h"
35 #include "asterisk/stasis_channels.h"
36 #include "asterisk/stasis_message_router.h"
39 /*! Aggregation topic for this application. */
40 struct stasis_topic *topic;
41 /*! Router for handling messages forwarded to \a topic. */
42 struct stasis_message_router *router;
43 /*! Subscription to watch for bridge merge messages */
44 struct stasis_subscription *bridge_merge_sub;
45 /*! Container of the channel forwards to this app's topic. */
46 struct ao2_container *forwards;
47 /*! Callback function for this application. */
48 stasis_app_cb handler;
49 /*! Opaque data to hand to callback function. */
51 /*! Name of the Stasis application */
55 /*! Subscription info for a particular channel/bridge. */
57 /*! Count of number of times this channel/bridge has been subscribed */
60 /*! Forward for the regular topic */
61 struct stasis_subscription *topic_forward;
62 /*! Forward for the caching topic */
63 struct stasis_subscription *topic_cached_forward;
65 /*! Unique id of the object being forwarded */
69 static void forwards_dtor(void *obj)
72 struct app_forwards *forwards = obj;
73 #endif /* AST_DEVMODE */
75 ast_assert(forwards->topic_forward == NULL);
76 ast_assert(forwards->topic_cached_forward == NULL);
79 static void forwards_unsubscribe(struct app_forwards *forwards)
81 stasis_unsubscribe(forwards->topic_forward);
82 forwards->topic_forward = NULL;
83 stasis_unsubscribe(forwards->topic_cached_forward);
84 forwards->topic_cached_forward = NULL;
87 static struct app_forwards *forwards_create(struct app *app,
90 RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
92 if (!app || ast_strlen_zero(id)) {
96 forwards = ao2_alloc(sizeof(*forwards) + strlen(id) + 1, forwards_dtor);
101 strcpy(forwards->id, id);
103 ao2_ref(forwards, +1);
107 /*! Forward a channel's topics to an app */
108 static struct app_forwards *forwards_create_channel(struct app *app,
109 struct ast_channel *chan)
111 RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
117 forwards = forwards_create(app, ast_channel_uniqueid(chan));
122 forwards->topic_forward = stasis_forward_all(ast_channel_topic(chan),
124 if (!forwards->topic_forward) {
128 forwards->topic_cached_forward = stasis_forward_all(
129 ast_channel_topic_cached(chan), app->topic);
130 if (!forwards->topic_cached_forward) {
131 /* Half-subscribed is a bad thing */
132 stasis_unsubscribe(forwards->topic_forward);
133 forwards->topic_forward = NULL;
137 ao2_ref(forwards, +1);
141 /*! Forward a bridge's topics to an app */
142 static struct app_forwards *forwards_create_bridge(struct app *app,
143 struct ast_bridge *bridge)
145 RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
147 if (!app || !bridge) {
151 forwards = forwards_create(app, bridge->uniqueid);
156 forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge),
158 if (!forwards->topic_forward) {
162 forwards->topic_cached_forward = stasis_forward_all(
163 ast_bridge_topic_cached(bridge), app->topic);
164 if (!forwards->topic_cached_forward) {
165 /* Half-subscribed is a bad thing */
166 stasis_unsubscribe(forwards->topic_forward);
167 forwards->topic_forward = NULL;
171 ao2_ref(forwards, +1);
175 static int forwards_sort(const void *obj_left, const void *obj_right, int flags)
177 const struct app_forwards *object_left = obj_left;
178 const struct app_forwards *object_right = obj_right;
179 const char *right_key = obj_right;
182 switch (flags & (OBJ_POINTER | OBJ_KEY | OBJ_PARTIAL_KEY)) {
184 right_key = object_right->id;
187 cmp = strcmp(object_left->id, right_key);
189 case OBJ_PARTIAL_KEY:
191 * We could also use a partial key struct containing a length
192 * so strlen() does not get called for every comparison instead.
194 cmp = strncmp(object_left->id, right_key, strlen(right_key));
197 /* Sort can only work on something with a full or partial key. */
205 static void app_dtor(void *obj)
207 struct app *app = obj;
209 ast_verb(1, "Destroying Stasis app %s\n", app->name);
211 ast_assert(app->router == NULL);
212 ast_assert(app->bridge_merge_sub == NULL);
214 ao2_cleanup(app->topic);
216 ao2_cleanup(app->forwards);
217 app->forwards = NULL;
218 ao2_cleanup(app->data);
222 static void sub_default_handler(void *data, struct stasis_subscription *sub,
223 struct stasis_topic *topic, struct stasis_message *message)
225 struct app *app = data;
226 RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
228 if (stasis_subscription_final_message(sub, message)) {
232 /* By default, send any message that has a JSON representation */
233 json = stasis_message_to_json(message);
241 /*! \brief Typedef for callbacks that get called on channel snapshot updates */
242 typedef struct ast_json *(*channel_snapshot_monitor)(
243 struct ast_channel_snapshot *old_snapshot,
244 struct ast_channel_snapshot *new_snapshot,
245 const struct timeval *tv);
247 static struct ast_json *simple_channel_event(
249 struct ast_channel_snapshot *snapshot,
250 const struct timeval *tv)
252 return ast_json_pack("{s: s, s: o, s: o}",
254 "timestamp", ast_json_timeval(*tv, NULL),
255 "channel", ast_channel_snapshot_to_json(snapshot));
258 static struct ast_json *channel_created_event(
259 struct ast_channel_snapshot *snapshot,
260 const struct timeval *tv)
262 return simple_channel_event("ChannelCreated", snapshot, tv);
265 static struct ast_json *channel_destroyed_event(
266 struct ast_channel_snapshot *snapshot,
267 const struct timeval *tv)
269 return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
270 "type", "ChannelDestroyed",
271 "timestamp", ast_json_timeval(*tv, NULL),
272 "cause", snapshot->hangupcause,
273 "cause_txt", ast_cause2str(snapshot->hangupcause),
274 "channel", ast_channel_snapshot_to_json(snapshot));
277 static struct ast_json *channel_state_change_event(
278 struct ast_channel_snapshot *snapshot,
279 const struct timeval *tv)
281 return simple_channel_event("ChannelStateChange", snapshot, tv);
284 /*! \brief Handle channel state changes */
285 static struct ast_json *channel_state(
286 struct ast_channel_snapshot *old_snapshot,
287 struct ast_channel_snapshot *new_snapshot,
288 const struct timeval *tv)
290 struct ast_channel_snapshot *snapshot = new_snapshot ?
291 new_snapshot : old_snapshot;
294 return channel_created_event(snapshot, tv);
295 } else if (!new_snapshot) {
296 return channel_destroyed_event(snapshot, tv);
297 } else if (old_snapshot->state != new_snapshot->state) {
298 return channel_state_change_event(snapshot, tv);
304 static struct ast_json *channel_dialplan(
305 struct ast_channel_snapshot *old_snapshot,
306 struct ast_channel_snapshot *new_snapshot,
307 const struct timeval *tv)
309 RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
311 /* No Newexten event on cache clear or first event */
312 if (!old_snapshot || !new_snapshot) {
316 /* Empty application is not valid for a Newexten event */
317 if (ast_strlen_zero(new_snapshot->appl)) {
321 if (ast_channel_snapshot_cep_equal(old_snapshot, new_snapshot)) {
325 return ast_json_pack("{s: s, s: o, s: s, s: s, s: o}",
326 "type", "ChannelDialplan",
327 "timestamp", ast_json_timeval(*tv, NULL),
328 "dialplan_app", new_snapshot->appl,
329 "dialplan_app_data", new_snapshot->data,
330 "channel", ast_channel_snapshot_to_json(new_snapshot));
333 static struct ast_json *channel_callerid(
334 struct ast_channel_snapshot *old_snapshot,
335 struct ast_channel_snapshot *new_snapshot,
336 const struct timeval *tv)
338 RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
340 /* No NewCallerid event on cache clear or first event */
341 if (!old_snapshot || !new_snapshot) {
345 if (ast_channel_snapshot_caller_id_equal(old_snapshot, new_snapshot)) {
349 return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
350 "type", "ChannelCallerId",
351 "timestamp", ast_json_timeval(*tv, NULL),
352 "caller_presentation", new_snapshot->caller_pres,
353 "caller_presentation_txt", ast_describe_caller_presentation(
354 new_snapshot->caller_pres),
355 "channel", ast_channel_snapshot_to_json(new_snapshot));
358 static channel_snapshot_monitor channel_monitors[] = {
364 static void sub_channel_update_handler(void *data,
365 struct stasis_subscription *sub,
366 struct stasis_topic *topic,
367 struct stasis_message *message)
369 struct app *app = data;
370 struct stasis_cache_update *update;
371 struct ast_channel_snapshot *new_snapshot;
372 struct ast_channel_snapshot *old_snapshot;
373 const struct timeval *tv;
376 ast_assert(stasis_message_type(message) == stasis_cache_update_type());
378 update = stasis_message_data(message);
380 ast_assert(update->type == ast_channel_snapshot_type());
382 new_snapshot = stasis_message_data(update->new_snapshot);
383 old_snapshot = stasis_message_data(update->old_snapshot);
385 /* Pull timestamp from the new snapshot, or from the update message
386 * when there isn't one. */
387 tv = update->new_snapshot ?
388 stasis_message_timestamp(update->new_snapshot) :
389 stasis_message_timestamp(message);
391 for (i = 0; i < ARRAY_LEN(channel_monitors); ++i) {
392 RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
394 msg = channel_monitors[i](old_snapshot, new_snapshot, tv);
401 static struct ast_json *simple_bridge_event(
403 struct ast_bridge_snapshot *snapshot,
404 const struct timeval *tv)
406 return ast_json_pack("{s: s, s: o, s: o}",
408 "timestamp", ast_json_timeval(*tv, NULL),
409 "bridge", ast_bridge_snapshot_to_json(snapshot));
412 static void sub_bridge_update_handler(void *data,
413 struct stasis_subscription *sub,
414 struct stasis_topic *topic,
415 struct stasis_message *message)
417 RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
418 struct app *app = data;
419 struct stasis_cache_update *update;
420 struct ast_bridge_snapshot *new_snapshot;
421 struct ast_bridge_snapshot *old_snapshot;
422 const struct timeval *tv;
424 ast_assert(stasis_message_type(message) == stasis_cache_update_type());
426 update = stasis_message_data(message);
428 ast_assert(update->type == ast_bridge_snapshot_type());
430 new_snapshot = stasis_message_data(update->new_snapshot);
431 old_snapshot = stasis_message_data(update->old_snapshot);
432 tv = update->new_snapshot ?
433 stasis_message_timestamp(update->new_snapshot) :
434 stasis_message_timestamp(message);
437 json = simple_bridge_event("BridgeDestroyed", old_snapshot, tv);
438 } else if (!old_snapshot) {
439 json = simple_bridge_event("BridgeCreated", old_snapshot, tv);
449 static void bridge_merge_handler(void *data, struct stasis_subscription *sub,
450 struct stasis_topic *topic, struct stasis_message *message)
452 struct app *app = data;
453 struct ast_bridge_merge_message *merge;
454 RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
456 if (stasis_subscription_final_message(sub, message)) {
460 if (stasis_message_type(message) != ast_bridge_merge_message_type()) {
464 merge = stasis_message_data(message);
466 /* Find out if we're subscribed to either bridge */
467 forwards = ao2_find(app->forwards, merge->from->uniqueid,
470 forwards = ao2_find(app->forwards, merge->to->uniqueid,
478 /* Forward the message to the app */
479 stasis_forward_message(app->topic, topic, message);
482 struct app *app_create(const char *name, stasis_app_cb handler, void *data)
484 RAII_VAR(struct app *, app, NULL, ao2_cleanup);
488 ast_assert(name != NULL);
489 ast_assert(handler != NULL);
491 ast_verb(1, "Creating Stasis app '%s'\n", name);
493 size = sizeof(*app) + strlen(name) + 1;
494 app = ao2_alloc_options(size, app_dtor, AO2_ALLOC_OPT_LOCK_MUTEX);
500 app->forwards = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_MUTEX,
501 AO2_CONTAINER_ALLOC_OPT_DUPS_OBJ_REJECT,
502 forwards_sort, NULL);
503 if (!app->forwards) {
507 app->topic = stasis_topic_create(name);
512 app->bridge_merge_sub = stasis_subscribe(ast_bridge_topic_all(),
513 bridge_merge_handler, app);
514 if (!app->bridge_merge_sub) {
517 /* Subscription holds a reference */
520 app->router = stasis_message_router_create(app->topic);
525 res |= stasis_message_router_add_cache_update(app->router,
526 ast_bridge_snapshot_type(), sub_bridge_update_handler, app);
528 res |= stasis_message_router_add_cache_update(app->router,
529 ast_channel_snapshot_type(), sub_channel_update_handler, app);
531 res |= stasis_message_router_set_default(app->router,
532 sub_default_handler, app);
537 /* Router holds a reference */
540 strncpy(app->name, name, size - sizeof(*app));
541 app->handler = handler;
550 * \brief Send a message to the given application.
551 * \param app App to send the message to.
552 * \param message Message to send.
554 void app_send(struct app *app, struct ast_json *message)
556 stasis_app_cb handler;
557 RAII_VAR(void *, data, NULL, ao2_cleanup);
559 /* Copy off mutable state with lock held */
561 SCOPED_AO2LOCK(lock, app);
562 handler = app->handler;
564 ao2_ref(app->data, +1);
567 /* Name is immutable; no need to copy */
572 "Inactive Stasis app '%s' missed message\n", app->name);
576 handler(data, app->name, message);
579 void app_deactivate(struct app *app)
581 SCOPED_AO2LOCK(lock, app);
582 ast_verb(1, "Deactivating Stasis app '%s'\n", app->name);
584 ao2_cleanup(app->data);
588 void app_shutdown(struct app *app)
590 SCOPED_AO2LOCK(lock, app);
592 ast_assert(app_is_finished(app));
594 stasis_message_router_unsubscribe(app->router);
596 stasis_unsubscribe(app->bridge_merge_sub);
597 app->bridge_merge_sub = NULL;
600 int app_is_active(struct app *app)
602 SCOPED_AO2LOCK(lock, app);
603 return app->handler != NULL;
606 int app_is_finished(struct app *app)
608 SCOPED_AO2LOCK(lock, app);
610 return app->handler == NULL && ao2_container_count(app->forwards) == 0;
613 void app_update(struct app *app, stasis_app_cb handler, void *data)
615 SCOPED_AO2LOCK(lock, app);
618 RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
620 ast_verb(1, "Replacing Stasis app '%s'\n", app->name);
622 msg = ast_json_pack("{s: s, s: s}",
623 "type", "ApplicationReplaced",
624 "application", app->name);
629 ast_verb(1, "Activating Stasis app '%s'\n", app->name);
632 app->handler = handler;
633 ao2_cleanup(app->data);
640 const char *app_name(const struct app *app)
645 int app_subscribe_channel(struct app *app, struct ast_channel *chan)
652 RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
653 SCOPED_AO2LOCK(lock, app->forwards);
655 forwards = ao2_find(app->forwards, ast_channel_uniqueid(chan),
656 OBJ_SEARCH_KEY | OBJ_NOLOCK);
658 /* Forwards not found, create one */
659 forwards = forwards_create_channel(app, chan);
664 res = ao2_link_flags(app->forwards, forwards,
671 ++forwards->interested;
676 static int unsubscribe(struct app *app, const char *kind, const char *id)
678 RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
679 SCOPED_AO2LOCK(lock, app->forwards);
681 forwards = ao2_find(app->forwards, id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
684 "App '%s' not subscribed to %s '%s'",
685 app->name, kind, id);
689 if (--forwards->interested == 0) {
690 /* No one is interested any more; unsubscribe */
691 forwards_unsubscribe(forwards);
692 ao2_find(app->forwards, forwards,
693 OBJ_POINTER | OBJ_NOLOCK | OBJ_UNLINK |
700 int app_unsubscribe_channel(struct app *app, struct ast_channel *chan)
706 return unsubscribe(app, "channel", ast_channel_uniqueid(chan));
709 int app_subscribe_bridge(struct app *app, struct ast_bridge *bridge)
711 if (!app || !bridge) {
714 RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
715 SCOPED_AO2LOCK(lock, app->forwards);
717 forwards = ao2_find(app->forwards, bridge->uniqueid,
718 OBJ_SEARCH_KEY | OBJ_NOLOCK);
721 /* Forwards not found, create one */
722 forwards = forwards_create_bridge(app, bridge);
726 ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
729 ++forwards->interested;
734 int app_unsubscribe_bridge(struct app *app, struct ast_bridge *bridge)
736 if (!app || !bridge) {
740 return unsubscribe(app, "bridge", bridge->uniqueid);