2 * Asterisk -- An open source telephony toolkit.
4 * Copyright (C) 2012 - 2013, Digium, Inc.
6 * David M. Lee, II <dlee@digium.com>
8 * See http://www.asterisk.org for more information about
9 * the Asterisk project. Please do not directly contact
10 * any of the maintainers of this project for assistance;
11 * the project provides a web site, mailing lists and IRC
12 * channels for your use.
14 * This program is free software, distributed under the terms of
15 * the GNU General Public License Version 2. See the LICENSE file
16 * at the top of the source tree.
21 * \brief Stasis application support.
23 * \author David M. Lee, II <dlee@digium.com>
27 <support_level>core</support_level>
32 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
34 #include "asterisk/astobj2.h"
35 #include "asterisk/channel.h"
36 #include "asterisk/lock.h"
37 #include "asterisk/module.h"
38 #include "asterisk/stasis.h"
39 #include "asterisk/stasis_app.h"
40 #include "asterisk/stasis_channels.h"
41 #include "asterisk/strings.h"
43 /*! Time to wait for a frame in the application */
44 #define MAX_WAIT_MS 200
47 * \brief Number of buckets for the Stasis application hash table. Remember to
48 * keep it a prime number!
50 #define APPS_NUM_BUCKETS 127
53 * \brief Number of buckets for the Stasis application hash table. Remember to
54 * keep it a prime number!
56 #define CONTROLS_NUM_BUCKETS 127
59 * \brief Stasis application container. Please call apps_registry() instead of
62 struct ao2_container *__apps_registry;
64 struct ao2_container *__app_controls;
66 /*! Ref-counting accessor for the stasis applications container */
67 static struct ao2_container *apps_registry(void)
69 ao2_ref(__apps_registry, +1);
70 return __apps_registry;
73 static struct ao2_container *app_controls(void)
75 ao2_ref(__app_controls, +1);
76 return __app_controls;
80 /*! Callback function for this application. */
81 stasis_app_cb handler;
82 /*! Opaque data to hand to callback function. */
84 /*! Name of the Stasis application */
88 static void app_dtor(void *obj)
90 struct app *app = obj;
92 ao2_cleanup(app->data);
96 /*! Constructor for \ref app. */
97 static struct app *app_create(const char *name, stasis_app_cb handler, void *data)
102 ast_assert(name != NULL);
103 ast_assert(handler != NULL);
105 size = sizeof(*app) + strlen(name) + 1;
106 app = ao2_alloc_options(size, app_dtor, AO2_ALLOC_OPT_LOCK_MUTEX);
112 strncpy(app->name, name, size - sizeof(*app));
113 app->handler = handler;
120 /*! AO2 hash function for \ref app */
121 static int app_hash(const void *obj, const int flags)
123 const struct app *app = obj;
124 const char *name = flags & OBJ_KEY ? obj : app->name;
126 return ast_str_hash(name);
129 /*! AO2 comparison function for \ref app */
130 static int app_compare(void *lhs, void *rhs, int flags)
132 const struct app *lhs_app = lhs;
133 const struct app *rhs_app = rhs;
134 const char *rhs_name = flags & OBJ_KEY ? rhs : rhs_app->name;
136 if (strcmp(lhs_app->name, rhs_name) == 0) {
137 return CMP_MATCH | CMP_STOP;
144 * \brief Send a message to the given application.
145 * \param app App to send the message to.
146 * \param message Message to send.
148 static void app_send(struct app *app, struct ast_json *message)
150 app->handler(app->data, app->name, message);
153 typedef void* (*stasis_app_command_cb)(struct stasis_app_control *control,
154 struct ast_channel *chan,
157 struct stasis_app_command {
159 ast_cond_t condition;
160 stasis_app_command_cb callback;
166 static void command_dtor(void *obj)
168 struct stasis_app_command *command = obj;
169 ast_mutex_destroy(&command->lock);
170 ast_cond_destroy(&command->condition);
173 static struct stasis_app_command *command_create(stasis_app_command_cb callback,
176 RAII_VAR(struct stasis_app_command *, command, NULL, ao2_cleanup);
178 command = ao2_alloc(sizeof(*command), command_dtor);
183 ast_mutex_init(&command->lock);
184 ast_cond_init(&command->condition, 0);
185 command->callback = callback;
186 command->data = data;
188 ao2_ref(command, +1);
192 static void command_complete(struct stasis_app_command *command, void *retval)
194 SCOPED_MUTEX(lock, &command->lock);
196 command->is_done = 1;
197 command->retval = retval;
198 ast_cond_signal(&command->condition);
201 static void *wait_for_command(struct stasis_app_command *command)
203 SCOPED_MUTEX(lock, &command->lock);
204 while (!command->is_done) {
205 ast_cond_wait(&command->condition, &command->lock);
208 return command->retval;
211 struct stasis_app_control {
212 /*! Queue of commands to dispatch on the channel */
213 struct ao2_container *command_queue;
215 * When set, /c app_stasis should exit and continue in the dialplan.
217 int continue_to_dialplan:1;
218 /*! Uniqueid of the associated channel */
222 static struct stasis_app_control *control_create(const char *uniqueid)
224 struct stasis_app_control *control;
227 size = sizeof(*control) + strlen(uniqueid) + 1;
228 control = ao2_alloc(size, NULL);
233 control->command_queue = ao2_container_alloc_list(0, 0, NULL, NULL);
235 strncpy(control->channel_id, uniqueid, size - sizeof(*control));
240 static void *exec_command(struct stasis_app_control *control,
241 struct stasis_app_command *command)
244 ao2_ref(command, +1);
245 ao2_link(control->command_queue, command);
248 return wait_for_command(command);
251 /*! AO2 hash function for \ref stasis_app_control */
252 static int control_hash(const void *obj, const int flags)
254 const struct stasis_app_control *control = obj;
255 const char *id = flags & OBJ_KEY ? obj : control->channel_id;
257 return ast_str_hash(id);
260 /*! AO2 comparison function for \ref stasis_app_control */
261 static int control_compare(void *lhs, void *rhs, int flags)
263 const struct stasis_app_control *lhs_control = lhs;
264 const struct stasis_app_control *rhs_control = rhs;
265 const char *rhs_name =
266 flags & OBJ_KEY ? rhs : rhs_control->channel_id;
268 if (strcmp(lhs_control->channel_id, rhs_name) == 0) {
269 return CMP_MATCH | CMP_STOP;
275 struct stasis_app_control *stasis_app_control_find_by_channel(
276 const struct ast_channel *chan)
282 return stasis_app_control_find_by_channel_id(
283 ast_channel_uniqueid(chan));
286 struct stasis_app_control *stasis_app_control_find_by_channel_id(
287 const char *channel_id)
289 RAII_VAR(struct ao2_container *, controls, NULL, ao2_cleanup);
290 controls = app_controls();
291 return ao2_find(controls, channel_id, OBJ_KEY);
295 * \brief Test the \c continue_to_dialplan bit for the given \a app.
297 * The bit is also reset for the next call.
299 * \param app Application to check the \c continue_to_dialplan bit.
300 * \return Zero to remain in \c Stasis
301 * \return Non-zero to continue in the dialplan
303 static int control_continue_test_and_reset(struct stasis_app_control *control)
306 SCOPED_AO2LOCK(lock, control);
308 r = control->continue_to_dialplan;
309 control->continue_to_dialplan = 0;
313 void stasis_app_control_continue(struct stasis_app_control *control)
315 SCOPED_AO2LOCK(lock, control);
316 control->continue_to_dialplan = 1;
320 static int FAIL = -1;
322 static void *__app_control_answer(struct stasis_app_control *control,
323 struct ast_channel *chan, void *data)
325 ast_debug(3, "%s: Answering", control->channel_id);
326 return __ast_answer(chan, 0, 1) == 0 ? &OK : &FAIL;
329 int stasis_app_control_answer(struct stasis_app_control *control)
331 RAII_VAR(struct stasis_app_command *, command, NULL, ao2_cleanup);
334 ast_debug(3, "%s: Sending answer command\n", control->channel_id);
336 command = command_create(__app_control_answer, NULL);
337 retval = exec_command(control, command);
340 ast_log(LOG_WARNING, "Failed to answer channel");
346 static struct ast_json *app_event_create(
347 const char *event_name,
348 const struct ast_channel_snapshot *snapshot,
349 const struct ast_json *extra_info)
351 RAII_VAR(struct ast_json *, message, NULL, ast_json_unref);
352 RAII_VAR(struct ast_json *, event, NULL, ast_json_unref);
355 event = ast_json_deep_copy(extra_info);
357 event = ast_json_object_create();
363 /* Mustn't already have a channel field */
364 ast_assert(ast_json_object_get(event, "channel") == NULL);
366 ret = ast_json_object_set(
368 "channel", ast_channel_snapshot_to_json(snapshot));
374 message = ast_json_pack("{s: o}", event_name, ast_json_ref(event));
376 return ast_json_ref(message);
379 static int send_start_msg(struct app *app, struct ast_channel *chan,
380 int argc, char *argv[])
382 RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
383 RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
385 struct ast_json *json_args;
388 ast_assert(chan != NULL);
390 /* Set channel info */
391 snapshot = ast_channel_snapshot_create(chan);
396 msg = ast_json_pack("{s: {s: [], s: o}}",
399 "channel", ast_channel_snapshot_to_json(snapshot));
405 /* Append arguments to args array */
406 json_args = ast_json_object_get(
407 ast_json_object_get(msg, "stasis-start"),
409 ast_assert(json_args != NULL);
410 for (i = 0; i < argc; ++i) {
411 int r = ast_json_array_append(json_args,
412 ast_json_string_create(argv[i]));
414 ast_log(LOG_ERROR, "Error appending start message\n");
423 static int send_end_msg(struct app *app, struct ast_channel *chan)
425 RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
426 RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
428 ast_assert(chan != NULL);
430 /* Set channel info */
431 snapshot = ast_channel_snapshot_create(chan);
432 if (snapshot == NULL) {
435 msg = app_event_create("stasis-end", snapshot, NULL);
444 static void dtmf_handler(struct app *app, struct ast_channel_blob *obj)
446 RAII_VAR(struct ast_json *, extra, NULL, ast_json_unref);
447 RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
448 const char *direction;
450 /* To simplify events, we'll only generate on receive */
451 direction = ast_json_string_get(
452 ast_json_object_get(obj->blob, "direction"));
454 if (strcmp("Received", direction) != 0) {
458 extra = ast_json_pack(
460 "digit", ast_json_ref(ast_json_object_get(obj->blob, "digit")));
465 msg = app_event_create("dtmf-received", obj->snapshot, extra);
473 static void blob_handler(struct app *app, struct ast_channel_blob *blob)
475 /* To simplify events, we'll only generate on DTMF end */
476 if (strcmp(ast_channel_blob_json_type(blob), "dtmf_end") == 0) {
477 dtmf_handler(app, blob);
481 static void sub_handler(void *data, struct stasis_subscription *sub,
482 struct stasis_topic *topic,
483 struct stasis_message *message)
485 struct app *app = data;
486 if (ast_channel_snapshot_type() == stasis_message_type(message)) {
487 RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
488 struct ast_channel_snapshot *snapshot =
489 stasis_message_data(message);
491 msg = app_event_create("channel-state-change", snapshot, NULL);
496 } else if (ast_channel_blob_type() == stasis_message_type(message)) {
497 struct ast_channel_blob *blob = stasis_message_data(message);
498 blob_handler(app, blob);
500 if (stasis_subscription_final_message(sub, message)) {
506 * \brief In addition to running ao2_cleanup(), this function also removes the
507 * object from the app_controls() container.
509 static void control_unlink(struct stasis_app_control *control)
511 RAII_VAR(struct ao2_container *, controls, NULL, ao2_cleanup);
517 controls = app_controls();
518 ao2_unlink_flags(controls, control,
519 OBJ_POINTER | OBJ_UNLINK | OBJ_NODATA);
520 ao2_cleanup(control);
523 static void dispatch_commands(struct stasis_app_control *control,
524 struct ast_channel *chan)
526 struct ao2_iterator i;
529 SCOPED_AO2LOCK(lock, control);
531 i = ao2_iterator_init(control->command_queue, AO2_ITERATOR_UNLINK);
533 while ((obj = ao2_iterator_next(&i))) {
534 RAII_VAR(struct stasis_app_command *, command, obj, ao2_cleanup);
535 void *retval = command->callback(control, chan, command->data);
536 command_complete(command, retval);
539 ao2_iterator_destroy(&i);
543 /*! /brief Stasis dialplan application callback */
544 int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc,
547 RAII_VAR(struct ao2_container *, apps, apps_registry(), ao2_cleanup);
548 RAII_VAR(struct app *, app, NULL, ao2_cleanup);
549 RAII_VAR(struct stasis_app_control *, control, NULL, control_unlink);
550 RAII_VAR(struct stasis_subscription *, subscription, NULL,
555 ast_assert(chan != NULL);
557 app = ao2_find(apps, app_name, OBJ_KEY);
560 "Stasis app '%s' not registered\n", app_name);
565 RAII_VAR(struct ao2_container *, controls, NULL, ao2_cleanup);
567 controls = app_controls();
568 control = control_create(ast_channel_uniqueid(chan));
570 ast_log(LOG_ERROR, "Allocated failed\n");
573 ao2_link(controls, control);
577 stasis_subscribe(ast_channel_topic(chan), sub_handler, app);
578 if (subscription == NULL) {
579 ast_log(LOG_ERROR, "Error subscribing app %s to channel %s\n",
580 app_name, ast_channel_name(chan));
583 ao2_ref(app, +1); /* subscription now has a reference */
585 res = send_start_msg(app, chan, argc, argv);
587 ast_log(LOG_ERROR, "Error sending start message to %s\n", app_name);
592 RAII_VAR(struct ast_frame *, f, NULL, ast_frame_dtor);
596 ast_debug(3, "%s: Hangup\n",
597 ast_channel_uniqueid(chan));
601 if (control_continue_test_and_reset(control)) {
602 ast_debug(3, "%s: Continue\n",
603 ast_channel_uniqueid(chan));
607 r = ast_waitfor(chan, MAX_WAIT_MS);
610 ast_debug(3, "%s: Poll error\n",
611 ast_channel_uniqueid(chan));
615 dispatch_commands(control, chan);
624 ast_debug(3, "%s: No more frames. Must be done, I guess.\n", ast_channel_uniqueid(chan));
628 switch (f->frametype) {
629 case AST_FRAME_CONTROL:
630 if (f->subclass.integer == AST_CONTROL_HANGUP) {
635 /* Not handled; discard */
640 res = send_end_msg(app, chan);
643 "Error sending end message to %s\n", app_name);
650 int stasis_app_send(const char *app_name, struct ast_json *message)
652 RAII_VAR(struct ao2_container *, apps, apps_registry(), ao2_cleanup);
653 RAII_VAR(struct app *, app, NULL, ao2_cleanup);
655 app = ao2_find(apps, app_name, OBJ_KEY);
658 /* XXX We can do a better job handling late binding, queueing up
659 * the call for a few seconds to wait for the app to register.
662 "Stasis app '%s' not registered\n", app_name);
666 app_send(app, message);
670 int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data)
672 RAII_VAR(struct ao2_container *, apps, apps_registry(), ao2_cleanup);
673 RAII_VAR(struct app *, app, NULL, ao2_cleanup);
675 SCOPED_LOCK(apps_lock, apps, ao2_lock, ao2_unlock);
677 app = ao2_find(apps, app_name, OBJ_KEY | OBJ_NOLOCK);
680 RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
681 SCOPED_LOCK(app_lock, app, ao2_lock, ao2_unlock);
683 msg = app_event_create("application-replaced", NULL, NULL);
684 app->handler(app->data, app_name, msg);
686 app->handler = handler;
687 ao2_cleanup(app->data);
691 app = app_create(app_name, handler, data);
693 ao2_link_flags(apps, app, OBJ_NOLOCK);
702 void stasis_app_unregister(const char *app_name)
704 RAII_VAR(struct ao2_container *, apps, NULL, ao2_cleanup);
707 apps = apps_registry();
708 ao2_cleanup(ao2_find(apps, app_name, OBJ_KEY | OBJ_UNLINK));
712 static int load_module(void)
717 ao2_container_alloc(APPS_NUM_BUCKETS, app_hash, app_compare);
718 if (__apps_registry == NULL) {
719 return AST_MODULE_LOAD_FAILURE;
722 __app_controls = ao2_container_alloc(CONTROLS_NUM_BUCKETS,
723 control_hash, control_compare);
724 if (__app_controls == NULL) {
725 return AST_MODULE_LOAD_FAILURE;
731 static int unload_module(void)
735 ao2_cleanup(__apps_registry);
736 __apps_registry = NULL;
738 ao2_cleanup(__app_controls);
739 __app_controls = NULL;
744 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS,
745 "Stasis application support",
747 .unload = unload_module);