2 * Asterisk -- An open source telephony toolkit.
4 * Copyright (C) 2013, Digium, Inc.
6 * David M. Lee, II <dlee@digium.com>
8 * See http://www.asterisk.org for more information about
9 * the Asterisk project. Please do not directly contact
10 * any of the maintainers of this project for assistance;
11 * the project provides a web site, mailing lists and IRC
12 * channels for your use.
14 * This program is free software, distributed under the terms of
15 * the GNU General Public License Version 2. See the LICENSE file
16 * at the top of the source tree.
21 * \brief Stasis application support.
23 * \author David M. Lee, II <dlee@digium.com>
28 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
32 #include "asterisk/callerid.h"
33 #include "asterisk/stasis_app.h"
34 #include "asterisk/stasis_bridges.h"
35 #include "asterisk/stasis_channels.h"
36 #include "asterisk/stasis_endpoints.h"
37 #include "asterisk/stasis_message_router.h"
40 /*! Aggregation topic for this application. */
41 struct stasis_topic *topic;
42 /*! Router for handling messages forwarded to \a topic. */
43 struct stasis_message_router *router;
44 /*! Subscription to watch for bridge merge messages */
45 struct stasis_subscription *bridge_merge_sub;
46 /*! Container of the channel forwards to this app's topic. */
47 struct ao2_container *forwards;
48 /*! Callback function for this application. */
49 stasis_app_cb handler;
50 /*! Opaque data to hand to callback function. */
52 /*! Name of the Stasis application */
62 /*! Subscription info for a particular channel/bridge. */
64 /*! Count of number of times this channel/bridge has been subscribed */
67 /*! Forward for the regular topic */
68 struct stasis_forward *topic_forward;
69 /*! Forward for the caching topic */
70 struct stasis_forward *topic_cached_forward;
72 /* Type of object being forwarded */
73 enum forward_type forward_type;
74 /*! Unique id of the object being forwarded */
78 static void forwards_dtor(void *obj)
81 struct app_forwards *forwards = obj;
82 #endif /* AST_DEVMODE */
84 ast_assert(forwards->topic_forward == NULL);
85 ast_assert(forwards->topic_cached_forward == NULL);
88 static void forwards_unsubscribe(struct app_forwards *forwards)
90 stasis_forward_cancel(forwards->topic_forward);
91 forwards->topic_forward = NULL;
92 stasis_forward_cancel(forwards->topic_cached_forward);
93 forwards->topic_cached_forward = NULL;
96 static struct app_forwards *forwards_create(struct app *app,
99 RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
101 if (!app || ast_strlen_zero(id)) {
105 forwards = ao2_alloc(sizeof(*forwards) + strlen(id) + 1, forwards_dtor);
110 strcpy(forwards->id, id);
112 ao2_ref(forwards, +1);
116 /*! Forward a channel's topics to an app */
117 static struct app_forwards *forwards_create_channel(struct app *app,
118 struct ast_channel *chan)
120 RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
126 forwards = forwards_create(app, ast_channel_uniqueid(chan));
131 forwards->forward_type = FORWARD_CHANNEL;
132 forwards->topic_forward = stasis_forward_all(ast_channel_topic(chan),
134 if (!forwards->topic_forward) {
138 forwards->topic_cached_forward = stasis_forward_all(
139 ast_channel_topic_cached(chan), app->topic);
140 if (!forwards->topic_cached_forward) {
141 /* Half-subscribed is a bad thing */
142 stasis_forward_cancel(forwards->topic_forward);
143 forwards->topic_forward = NULL;
147 ao2_ref(forwards, +1);
151 /*! Forward a bridge's topics to an app */
152 static struct app_forwards *forwards_create_bridge(struct app *app,
153 struct ast_bridge *bridge)
155 RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
157 if (!app || !bridge) {
161 forwards = forwards_create(app, bridge->uniqueid);
166 forwards->forward_type = FORWARD_BRIDGE;
167 forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge),
169 if (!forwards->topic_forward) {
173 forwards->topic_cached_forward = stasis_forward_all(
174 ast_bridge_topic_cached(bridge), app->topic);
175 if (!forwards->topic_cached_forward) {
176 /* Half-subscribed is a bad thing */
177 stasis_forward_cancel(forwards->topic_forward);
178 forwards->topic_forward = NULL;
182 ao2_ref(forwards, +1);
186 /*! Forward a endpoint's topics to an app */
187 static struct app_forwards *forwards_create_endpoint(struct app *app,
188 struct ast_endpoint *endpoint)
190 RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
192 if (!app || !endpoint) {
196 forwards = forwards_create(app, ast_endpoint_get_id(endpoint));
201 forwards->forward_type = FORWARD_ENDPOINT;
202 forwards->topic_forward = stasis_forward_all(ast_endpoint_topic(endpoint),
204 if (!forwards->topic_forward) {
208 forwards->topic_cached_forward = stasis_forward_all(
209 ast_endpoint_topic_cached(endpoint), app->topic);
210 if (!forwards->topic_cached_forward) {
211 /* Half-subscribed is a bad thing */
212 stasis_forward_cancel(forwards->topic_forward);
213 forwards->topic_forward = NULL;
217 ao2_ref(forwards, +1);
221 static int forwards_sort(const void *obj_left, const void *obj_right, int flags)
223 const struct app_forwards *object_left = obj_left;
224 const struct app_forwards *object_right = obj_right;
225 const char *right_key = obj_right;
228 switch (flags & (OBJ_POINTER | OBJ_KEY | OBJ_PARTIAL_KEY)) {
230 right_key = object_right->id;
233 cmp = strcmp(object_left->id, right_key);
235 case OBJ_PARTIAL_KEY:
237 * We could also use a partial key struct containing a length
238 * so strlen() does not get called for every comparison instead.
240 cmp = strncmp(object_left->id, right_key, strlen(right_key));
243 /* Sort can only work on something with a full or partial key. */
251 static void app_dtor(void *obj)
253 struct app *app = obj;
255 ast_verb(1, "Destroying Stasis app %s\n", app->name);
257 ast_assert(app->router == NULL);
258 ast_assert(app->bridge_merge_sub == NULL);
260 ao2_cleanup(app->topic);
262 ao2_cleanup(app->forwards);
263 app->forwards = NULL;
264 ao2_cleanup(app->data);
268 static void sub_default_handler(void *data, struct stasis_subscription *sub,
269 struct stasis_message *message)
271 struct app *app = data;
272 RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
274 if (stasis_subscription_final_message(sub, message)) {
278 /* By default, send any message that has a JSON representation */
279 json = stasis_message_to_json(message);
287 /*! \brief Typedef for callbacks that get called on channel snapshot updates */
288 typedef struct ast_json *(*channel_snapshot_monitor)(
289 struct ast_channel_snapshot *old_snapshot,
290 struct ast_channel_snapshot *new_snapshot,
291 const struct timeval *tv);
293 static struct ast_json *simple_channel_event(
295 struct ast_channel_snapshot *snapshot,
296 const struct timeval *tv)
298 return ast_json_pack("{s: s, s: o, s: o}",
300 "timestamp", ast_json_timeval(*tv, NULL),
301 "channel", ast_channel_snapshot_to_json(snapshot));
304 static struct ast_json *channel_created_event(
305 struct ast_channel_snapshot *snapshot,
306 const struct timeval *tv)
308 return simple_channel_event("ChannelCreated", snapshot, tv);
311 static struct ast_json *channel_destroyed_event(
312 struct ast_channel_snapshot *snapshot,
313 const struct timeval *tv)
315 return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
316 "type", "ChannelDestroyed",
317 "timestamp", ast_json_timeval(*tv, NULL),
318 "cause", snapshot->hangupcause,
319 "cause_txt", ast_cause2str(snapshot->hangupcause),
320 "channel", ast_channel_snapshot_to_json(snapshot));
323 static struct ast_json *channel_state_change_event(
324 struct ast_channel_snapshot *snapshot,
325 const struct timeval *tv)
327 return simple_channel_event("ChannelStateChange", snapshot, tv);
330 /*! \brief Handle channel state changes */
331 static struct ast_json *channel_state(
332 struct ast_channel_snapshot *old_snapshot,
333 struct ast_channel_snapshot *new_snapshot,
334 const struct timeval *tv)
336 struct ast_channel_snapshot *snapshot = new_snapshot ?
337 new_snapshot : old_snapshot;
340 return channel_created_event(snapshot, tv);
341 } else if (!new_snapshot) {
342 return channel_destroyed_event(snapshot, tv);
343 } else if (old_snapshot->state != new_snapshot->state) {
344 return channel_state_change_event(snapshot, tv);
350 static struct ast_json *channel_dialplan(
351 struct ast_channel_snapshot *old_snapshot,
352 struct ast_channel_snapshot *new_snapshot,
353 const struct timeval *tv)
355 RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
357 /* No Newexten event on cache clear or first event */
358 if (!old_snapshot || !new_snapshot) {
362 /* Empty application is not valid for a Newexten event */
363 if (ast_strlen_zero(new_snapshot->appl)) {
367 if (ast_channel_snapshot_cep_equal(old_snapshot, new_snapshot)) {
371 return ast_json_pack("{s: s, s: o, s: s, s: s, s: o}",
372 "type", "ChannelDialplan",
373 "timestamp", ast_json_timeval(*tv, NULL),
374 "dialplan_app", new_snapshot->appl,
375 "dialplan_app_data", new_snapshot->data,
376 "channel", ast_channel_snapshot_to_json(new_snapshot));
379 static struct ast_json *channel_callerid(
380 struct ast_channel_snapshot *old_snapshot,
381 struct ast_channel_snapshot *new_snapshot,
382 const struct timeval *tv)
384 RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
386 /* No NewCallerid event on cache clear or first event */
387 if (!old_snapshot || !new_snapshot) {
391 if (ast_channel_snapshot_caller_id_equal(old_snapshot, new_snapshot)) {
395 return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
396 "type", "ChannelCallerId",
397 "timestamp", ast_json_timeval(*tv, NULL),
398 "caller_presentation", new_snapshot->caller_pres,
399 "caller_presentation_txt", ast_describe_caller_presentation(
400 new_snapshot->caller_pres),
401 "channel", ast_channel_snapshot_to_json(new_snapshot));
404 static channel_snapshot_monitor channel_monitors[] = {
410 static void sub_channel_update_handler(void *data,
411 struct stasis_subscription *sub,
412 struct stasis_message *message)
414 struct app *app = data;
415 struct stasis_cache_update *update;
416 struct ast_channel_snapshot *new_snapshot;
417 struct ast_channel_snapshot *old_snapshot;
418 const struct timeval *tv;
421 ast_assert(stasis_message_type(message) == stasis_cache_update_type());
423 update = stasis_message_data(message);
425 ast_assert(update->type == ast_channel_snapshot_type());
427 new_snapshot = stasis_message_data(update->new_snapshot);
428 old_snapshot = stasis_message_data(update->old_snapshot);
430 /* Pull timestamp from the new snapshot, or from the update message
431 * when there isn't one. */
432 tv = update->new_snapshot ?
433 stasis_message_timestamp(update->new_snapshot) :
434 stasis_message_timestamp(message);
436 for (i = 0; i < ARRAY_LEN(channel_monitors); ++i) {
437 RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
439 msg = channel_monitors[i](old_snapshot, new_snapshot, tv);
446 static struct ast_json *simple_endpoint_event(
448 struct ast_endpoint_snapshot *snapshot,
449 const struct timeval *tv)
451 return ast_json_pack("{s: s, s: o, s: o}",
453 "timestamp", ast_json_timeval(*tv, NULL),
454 "endpoint", ast_endpoint_snapshot_to_json(snapshot));
457 static void sub_endpoint_update_handler(void *data,
458 struct stasis_subscription *sub,
459 struct stasis_message *message)
461 RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
462 struct app *app = data;
463 struct stasis_cache_update *update;
464 struct ast_endpoint_snapshot *new_snapshot;
465 const struct timeval *tv;
467 ast_assert(stasis_message_type(message) == stasis_cache_update_type());
469 update = stasis_message_data(message);
471 ast_assert(update->type == ast_endpoint_snapshot_type());
473 new_snapshot = stasis_message_data(update->new_snapshot);
474 tv = update->new_snapshot ?
475 stasis_message_timestamp(update->new_snapshot) :
476 stasis_message_timestamp(message);
478 json = simple_endpoint_event("EndpointStateChange", new_snapshot, tv);
487 static struct ast_json *simple_bridge_event(
489 struct ast_bridge_snapshot *snapshot,
490 const struct timeval *tv)
492 return ast_json_pack("{s: s, s: o, s: o}",
494 "timestamp", ast_json_timeval(*tv, NULL),
495 "bridge", ast_bridge_snapshot_to_json(snapshot));
498 static void sub_bridge_update_handler(void *data,
499 struct stasis_subscription *sub,
500 struct stasis_message *message)
502 RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
503 struct app *app = data;
504 struct stasis_cache_update *update;
505 struct ast_bridge_snapshot *new_snapshot;
506 struct ast_bridge_snapshot *old_snapshot;
507 const struct timeval *tv;
509 ast_assert(stasis_message_type(message) == stasis_cache_update_type());
511 update = stasis_message_data(message);
513 ast_assert(update->type == ast_bridge_snapshot_type());
515 new_snapshot = stasis_message_data(update->new_snapshot);
516 old_snapshot = stasis_message_data(update->old_snapshot);
517 tv = update->new_snapshot ?
518 stasis_message_timestamp(update->new_snapshot) :
519 stasis_message_timestamp(message);
522 json = simple_bridge_event("BridgeDestroyed", old_snapshot, tv);
523 } else if (!old_snapshot) {
524 json = simple_bridge_event("BridgeCreated", old_snapshot, tv);
534 static void bridge_merge_handler(void *data, struct stasis_subscription *sub,
535 struct stasis_message *message)
537 struct app *app = data;
538 struct ast_bridge_merge_message *merge;
539 RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
541 if (stasis_subscription_final_message(sub, message)) {
545 if (stasis_message_type(message) != ast_bridge_merge_message_type()) {
549 merge = stasis_message_data(message);
551 /* Find out if we're subscribed to either bridge */
552 forwards = ao2_find(app->forwards, merge->from->uniqueid,
555 forwards = ao2_find(app->forwards, merge->to->uniqueid,
563 /* Forward the message to the app */
564 stasis_publish(app->topic, message);
567 struct app *app_create(const char *name, stasis_app_cb handler, void *data)
569 RAII_VAR(struct app *, app, NULL, ao2_cleanup);
573 ast_assert(name != NULL);
574 ast_assert(handler != NULL);
576 ast_verb(1, "Creating Stasis app '%s'\n", name);
578 size = sizeof(*app) + strlen(name) + 1;
579 app = ao2_alloc_options(size, app_dtor, AO2_ALLOC_OPT_LOCK_MUTEX);
585 app->forwards = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_MUTEX,
586 AO2_CONTAINER_ALLOC_OPT_DUPS_OBJ_REJECT,
587 forwards_sort, NULL);
588 if (!app->forwards) {
592 app->topic = stasis_topic_create(name);
597 app->bridge_merge_sub = stasis_subscribe(ast_bridge_topic_all(),
598 bridge_merge_handler, app);
599 if (!app->bridge_merge_sub) {
602 /* Subscription holds a reference */
605 app->router = stasis_message_router_create(app->topic);
610 res |= stasis_message_router_add_cache_update(app->router,
611 ast_bridge_snapshot_type(), sub_bridge_update_handler, app);
613 res |= stasis_message_router_add_cache_update(app->router,
614 ast_channel_snapshot_type(), sub_channel_update_handler, app);
616 res |= stasis_message_router_add_cache_update(app->router,
617 ast_endpoint_snapshot_type(), sub_endpoint_update_handler, app);
619 res |= stasis_message_router_set_default(app->router,
620 sub_default_handler, app);
625 /* Router holds a reference */
628 strncpy(app->name, name, size - sizeof(*app));
629 app->handler = handler;
638 * \brief Send a message to the given application.
639 * \param app App to send the message to.
640 * \param message Message to send.
642 void app_send(struct app *app, struct ast_json *message)
644 stasis_app_cb handler;
645 RAII_VAR(void *, data, NULL, ao2_cleanup);
647 /* Copy off mutable state with lock held */
649 SCOPED_AO2LOCK(lock, app);
650 handler = app->handler;
652 ao2_ref(app->data, +1);
655 /* Name is immutable; no need to copy */
660 "Inactive Stasis app '%s' missed message\n", app->name);
664 handler(data, app->name, message);
667 void app_deactivate(struct app *app)
669 SCOPED_AO2LOCK(lock, app);
670 ast_verb(1, "Deactivating Stasis app '%s'\n", app->name);
672 ao2_cleanup(app->data);
676 void app_shutdown(struct app *app)
678 SCOPED_AO2LOCK(lock, app);
680 ast_assert(app_is_finished(app));
682 stasis_message_router_unsubscribe(app->router);
684 stasis_unsubscribe(app->bridge_merge_sub);
685 app->bridge_merge_sub = NULL;
688 int app_is_active(struct app *app)
690 SCOPED_AO2LOCK(lock, app);
691 return app->handler != NULL;
694 int app_is_finished(struct app *app)
696 SCOPED_AO2LOCK(lock, app);
698 return app->handler == NULL && ao2_container_count(app->forwards) == 0;
701 void app_update(struct app *app, stasis_app_cb handler, void *data)
703 SCOPED_AO2LOCK(lock, app);
706 RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
708 ast_verb(1, "Replacing Stasis app '%s'\n", app->name);
710 msg = ast_json_pack("{s: s, s: s}",
711 "type", "ApplicationReplaced",
712 "application", app->name);
717 ast_verb(1, "Activating Stasis app '%s'\n", app->name);
720 app->handler = handler;
721 ao2_cleanup(app->data);
728 const char *app_name(const struct app *app)
733 struct ast_json *app_to_json(const struct app *app)
735 RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
736 struct ast_json *channels;
737 struct ast_json *bridges;
738 struct ast_json *endpoints;
739 struct ao2_iterator i;
742 json = ast_json_pack("{s: s, s: [], s: [], s: []}",
744 "channel_ids", "bridge_ids", "endpoint_ids");
745 channels = ast_json_object_get(json, "channel_ids");
746 bridges = ast_json_object_get(json, "bridge_ids");
747 endpoints = ast_json_object_get(json, "endpoint_ids");
749 i = ao2_iterator_init(app->forwards, 0);
750 while ((obj = ao2_iterator_next(&i))) {
751 RAII_VAR(struct app_forwards *, forwards, obj, ao2_cleanup);
752 RAII_VAR(struct ast_json *, id, NULL, ast_json_unref);
755 id = ast_json_string_create(forwards->id);
757 switch (forwards->forward_type) {
758 case FORWARD_CHANNEL:
759 append_res = ast_json_array_append(channels,
763 append_res = ast_json_array_append(bridges,
766 case FORWARD_ENDPOINT:
767 append_res = ast_json_array_append(endpoints,
772 if (append_res != 0) {
773 ast_log(LOG_ERROR, "Error building response\n");
774 ao2_iterator_destroy(&i);
778 ao2_iterator_destroy(&i);
780 return ast_json_ref(json);
783 int app_subscribe_channel(struct app *app, struct ast_channel *chan)
790 RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
791 SCOPED_AO2LOCK(lock, app->forwards);
793 forwards = ao2_find(app->forwards, ast_channel_uniqueid(chan),
794 OBJ_SEARCH_KEY | OBJ_NOLOCK);
796 /* Forwards not found, create one */
797 forwards = forwards_create_channel(app, chan);
802 res = ao2_link_flags(app->forwards, forwards,
809 ++forwards->interested;
814 static int unsubscribe(struct app *app, const char *kind, const char *id)
816 RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
817 SCOPED_AO2LOCK(lock, app->forwards);
819 forwards = ao2_find(app->forwards, id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
822 "App '%s' not subscribed to %s '%s'\n",
823 app->name, kind, id);
827 if (--forwards->interested == 0) {
828 /* No one is interested any more; unsubscribe */
829 forwards_unsubscribe(forwards);
830 ao2_find(app->forwards, forwards,
831 OBJ_POINTER | OBJ_NOLOCK | OBJ_UNLINK |
838 int app_unsubscribe_channel(struct app *app, struct ast_channel *chan)
844 return app_unsubscribe_channel_id(app, ast_channel_uniqueid(chan));
847 int app_unsubscribe_channel_id(struct app *app, const char *channel_id)
849 if (!app || !channel_id) {
853 return unsubscribe(app, "channel", channel_id);
856 int app_is_subscribed_channel_id(struct app *app, const char *channel_id)
858 RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
859 forwards = ao2_find(app->forwards, channel_id, OBJ_SEARCH_KEY);
860 return forwards != NULL;
863 int app_subscribe_bridge(struct app *app, struct ast_bridge *bridge)
865 if (!app || !bridge) {
868 RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
869 SCOPED_AO2LOCK(lock, app->forwards);
871 forwards = ao2_find(app->forwards, bridge->uniqueid,
872 OBJ_SEARCH_KEY | OBJ_NOLOCK);
875 /* Forwards not found, create one */
876 forwards = forwards_create_bridge(app, bridge);
880 ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
883 ++forwards->interested;
888 int app_unsubscribe_bridge(struct app *app, struct ast_bridge *bridge)
890 if (!app || !bridge) {
894 return app_unsubscribe_bridge_id(app, bridge->uniqueid);
897 int app_unsubscribe_bridge_id(struct app *app, const char *bridge_id)
899 if (!app || !bridge_id) {
903 return unsubscribe(app, "bridge", bridge_id);
906 int app_is_subscribed_bridge_id(struct app *app, const char *bridge_id)
908 RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
909 forwards = ao2_find(app->forwards, bridge_id, OBJ_SEARCH_KEY);
910 return forwards != NULL;
913 int app_subscribe_endpoint(struct app *app, struct ast_endpoint *endpoint)
915 if (!app || !endpoint) {
918 RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
919 SCOPED_AO2LOCK(lock, app->forwards);
921 forwards = ao2_find(app->forwards, ast_endpoint_get_id(endpoint),
922 OBJ_SEARCH_KEY | OBJ_NOLOCK);
925 /* Forwards not found, create one */
926 forwards = forwards_create_endpoint(app, endpoint);
930 ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
933 ++forwards->interested;