#include "asterisk/module.h"
#include "asterisk/app.h"
#include "asterisk/stasis.h"
+#include "asterisk/stasis_message_router.h"
/*** DOCUMENTATION
<application name="NoCDR" language="en_US">
static int publish_app_cdr_message(struct ast_channel *chan, struct app_cdr_message_payload *payload)
{
RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
- RAII_VAR(struct stasis_subscription *, subscription, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message_router *, router, ast_cdr_message_router(), ao2_cleanup);
- message = stasis_message_create(appcdr_message_type(), payload);
- if (!message) {
- ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: unable to create message\n",
- payload->channel_name);
+ if (!router) {
+ ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: no message router\n",
+ ast_channel_name(chan));
return -1;
}
- subscription = stasis_subscribe(ast_channel_topic(chan), appcdr_callback, NULL);
- if (!subscription) {
- ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: unable to create subscription\n",
+ message = stasis_message_create(appcdr_message_type(), payload);
+ if (!message) {
+ ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: unable to create message\n",
payload->channel_name);
return -1;
}
+ stasis_message_router_publish_sync(router, message);
- stasis_publish(ast_channel_topic(chan), message);
-
- subscription = stasis_unsubscribe_and_join(subscription);
return 0;
}
static int unload_module(void)
{
+ RAII_VAR(struct stasis_message_router *, router, ast_cdr_message_router(), ao2_cleanup);
+
+ if (router) {
+ stasis_message_router_remove(router, appcdr_message_type());
+ }
STASIS_MESSAGE_TYPE_CLEANUP(appcdr_message_type);
ast_unregister_application(nocdr_app);
ast_unregister_application(resetcdr_app);
static int load_module(void)
{
+ RAII_VAR(struct stasis_message_router *, router, ast_cdr_message_router(), ao2_cleanup);
int res = 0;
+ if (!router) {
+ return AST_MODULE_LOAD_FAILURE;
+ }
+
res |= STASIS_MESSAGE_TYPE_INIT(appcdr_message_type);
res |= ast_register_application_xml(nocdr_app, nocdr_exec);
res |= ast_register_application_xml(resetcdr_app, resetcdr_exec);
+ res |= stasis_message_router_add(router, appcdr_message_type(),
+ appcdr_callback, NULL);
if (res) {
return AST_MODULE_LOAD_FAILURE;
#include "asterisk/app.h"
#include "asterisk/module.h"
#include "asterisk/stasis.h"
+#include "asterisk/stasis_message_router.h"
/*** DOCUMENTATION
<application name="ForkCDR" language="en_US">
{
RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
RAII_VAR(struct fork_cdr_message_payload *, payload, ao2_alloc(sizeof(*payload), NULL), ao2_cleanup);
- RAII_VAR(struct stasis_subscription *, subscription, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message_router *, router, ast_cdr_message_router(), ao2_cleanup);
char *parse;
struct ast_flags flags = { 0, };
return -1;
}
+ if (!router) {
+ ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: no message router\n",
+ ast_channel_name(chan));
+ return -1;
+ }
+
payload->channel_name = ast_channel_name(chan);
payload->flags = &flags;
message = stasis_message_create(forkcdr_message_type(), payload);
ast_channel_name(chan));
return -1;
}
-
- subscription = stasis_subscribe(ast_channel_topic(chan), forkcdr_callback, NULL);
- if (!subscription) {
- ast_log(AST_LOG_WARNING, "Failed to fork CDR for channel %s: unable to create subscription\n",
- payload->channel_name);
- return -1;
- }
-
- stasis_publish(ast_channel_topic(chan), message);
-
- subscription = stasis_unsubscribe_and_join(subscription);
+ stasis_message_router_publish_sync(router, message);
return 0;
}
static int unload_module(void)
{
- STASIS_MESSAGE_TYPE_CLEANUP(forkcdr_message_type);
+ RAII_VAR(struct stasis_message_router *, router, ast_cdr_message_router(), ao2_cleanup);
- return ast_unregister_application(app);
+ if (router) {
+ stasis_message_router_remove(router, forkcdr_message_type());
+ }
+ STASIS_MESSAGE_TYPE_CLEANUP(forkcdr_message_type);
+ ast_unregister_application(app);
+ return 0;
}
static int load_module(void)
{
+ RAII_VAR(struct stasis_message_router *, router, ast_cdr_message_router(), ao2_cleanup);
int res = 0;
+ if (!router) {
+ return AST_MODULE_LOAD_FAILURE;
+ }
+
res |= STASIS_MESSAGE_TYPE_INIT(forkcdr_message_type);
res |= ast_register_application_xml(app, forkcdr_exec);
+ res |= stasis_message_router_add(router, forkcdr_message_type(),
+ forkcdr_callback, NULL);
- return res;
+ if (res) {
+ return AST_MODULE_LOAD_FAILURE;
+ }
+ return AST_MODULE_LOAD_SUCCESS;
}
AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "Fork The CDR into 2 separate entities");
#include "asterisk/app.h"
#include "asterisk/cdr.h"
#include "asterisk/stasis.h"
+#include "asterisk/stasis_message_router.h"
/*** DOCUMENTATION
<function name="CDR" language="en_US">
const char *cmd;
const char *arguments;
const char *value;
+ void *data;
};
struct cdr_func_data {
static void cdr_read_callback(void *data, struct stasis_subscription *sub, struct stasis_message *message)
{
- struct cdr_func_data *output = data;
struct cdr_func_payload *payload = stasis_message_data(message);
+ struct cdr_func_data *output;
char *info;
char *value = NULL;
struct ast_flags flags = { 0 };
return;
}
- if (!payload || !output) {
- return;
- }
+ ast_assert(payload != NULL);
+ output = payload->data;
+ ast_assert(output != NULL);
if (ast_strlen_zero(payload->arguments)) {
ast_log(AST_LOG_WARNING, "%s requires a variable (%s(variable[,option]))\n)",
payload->chan = chan;
payload->cmd = cmd;
payload->arguments = parse;
+ payload->data = &output;
buf[0] = '\0';/* Ensure the buffer is initialized. */
output.buf = buf;
if (ast_strlen_zero(ast_channel_name(chan))) {
cdr_read_callback(NULL, NULL, message);
} else {
- RAII_VAR(struct stasis_subscription *, subscription, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message_router *, router, ast_cdr_message_router(), ao2_cleanup);
- subscription = stasis_subscribe(ast_channel_topic(chan), cdr_read_callback, &output);
- if (!subscription) {
- ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: unable to create subscription\n",
+ if (!router) {
+ ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: no message router\n",
ast_channel_name(chan));
return -1;
}
-
- stasis_publish(ast_channel_topic(chan), message);
-
- subscription = stasis_unsubscribe_and_join(subscription);
+ stasis_message_router_publish_sync(router, message);
}
return 0;
{
RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
RAII_VAR(struct cdr_func_payload *, payload,
- ao2_alloc(sizeof(*payload), NULL), ao2_cleanup);
- RAII_VAR(struct stasis_subscription *, subscription, NULL, ao2_cleanup);
+ ao2_alloc(sizeof(*payload), NULL), ao2_cleanup);
+ RAII_VAR(struct stasis_message_router *, router,
+ ast_cdr_message_router(), ao2_cleanup);
+
+ if (!router) {
+ ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: no message router\n",
+ ast_channel_name(chan));
+ return -1;
+ }
if (!payload) {
return -1;
ast_channel_name(chan));
return -1;
}
-
- subscription = stasis_subscribe(ast_channel_topic(chan), cdr_write_callback, NULL);
- if (!subscription) {
- ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: unable to create subscription\n",
- ast_channel_name(chan));
- return -1;
- }
-
- stasis_publish(ast_channel_topic(chan), message);
-
- subscription = stasis_unsubscribe_and_join(subscription);
+ stasis_message_router_publish_sync(router, message);
return 0;
}
RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
RAII_VAR(struct cdr_func_payload *, payload,
ao2_alloc(sizeof(*payload), NULL), ao2_cleanup);
- RAII_VAR(struct stasis_subscription *, subscription, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message_router *, router, ast_cdr_message_router(), ao2_cleanup);
+
+ if (!router) {
+ ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: no message router\n",
+ ast_channel_name(chan));
+ return -1;
+ }
if (!payload) {
return -1;
ast_channel_name(chan));
return -1;
}
-
- subscription = stasis_subscribe(ast_channel_topic(chan), cdr_prop_write_callback, NULL);
- if (!subscription) {
- ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: unable to create subscription\n",
- ast_channel_name(chan));
- return -1;
- }
-
- stasis_publish(ast_channel_topic(chan), message);
-
- subscription = stasis_unsubscribe_and_join(subscription);
+ stasis_message_router_publish_sync(router, message);
return 0;
}
static int unload_module(void)
{
+ RAII_VAR(struct stasis_message_router *, router, ast_cdr_message_router(), ao2_cleanup);
int res = 0;
+ if (router) {
+ stasis_message_router_remove(router, cdr_prop_write_message_type());
+ stasis_message_router_remove(router, cdr_write_message_type());
+ stasis_message_router_remove(router, cdr_read_message_type());
+ }
STASIS_MESSAGE_TYPE_CLEANUP(cdr_read_message_type);
STASIS_MESSAGE_TYPE_CLEANUP(cdr_write_message_type);
STASIS_MESSAGE_TYPE_CLEANUP(cdr_prop_write_message_type);
static int load_module(void)
{
+ RAII_VAR(struct stasis_message_router *, router, ast_cdr_message_router(), ao2_cleanup);
int res = 0;
+ if (!router) {
+ return AST_MODULE_LOAD_FAILURE;
+ }
+
res |= STASIS_MESSAGE_TYPE_INIT(cdr_read_message_type);
res |= STASIS_MESSAGE_TYPE_INIT(cdr_write_message_type);
res |= STASIS_MESSAGE_TYPE_INIT(cdr_prop_write_message_type);
res |= ast_custom_function_register(&cdr_function);
res |= ast_custom_function_register(&cdr_prop_function);
-
- return res;
+ res |= stasis_message_router_add(router, cdr_prop_write_message_type(),
+ cdr_prop_write_callback, NULL);
+ res |= stasis_message_router_add(router, cdr_write_message_type(),
+ cdr_write_callback, NULL);
+ res |= stasis_message_router_add(router, cdr_read_message_type(),
+ cdr_read_callback, NULL);
+
+ if (res) {
+ return AST_MODULE_LOAD_FAILURE;
+ }
+ return AST_MODULE_LOAD_SUCCESS;
}
AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "Call Detail Record (CDR) dialplan functions");
*/
struct ast_cdr *ast_cdr_alloc(void);
+struct stasis_message_router;
+
+/*!
+ * \brief Return the message router for the CDR engine
+ *
+ * This returns the \ref stasis_message_router that the CDR engine
+ * uses for dispatching \ref stasis messages. The reference on the
+ * message router is bumped and must be released by the caller of
+ * this function.
+ *
+ * \retval NULL if the CDR engine is disabled or unavailable
+ * \retval the \ref stasis_message_router otherwise
+ */
+struct stasis_message_router *ast_cdr_message_router(void);
/*!
* \brief Duplicate a public CDR
ast_cdr_engine_term();
}
+struct stasis_message_router *ast_cdr_message_router(void)
+{
+ if (!stasis_router) {
+ return NULL;
+ }
+
+ ao2_bump(stasis_router);
+ return stasis_router;
+}
+
/*!
- * \brief Destroy the active Stasis subscriptions/router/topics
+ * \brief Destroy the active Stasis subscriptions
*/
static void destroy_subscriptions(void)
{
- stasis_message_router_unsubscribe_and_join(stasis_router);
- stasis_router = NULL;
-
- ao2_cleanup(cdr_topic);
- cdr_topic = NULL;
-
channel_subscription = stasis_forward_cancel(channel_subscription);
bridge_subscription = stasis_forward_cancel(bridge_subscription);
parking_subscription = stasis_forward_cancel(parking_subscription);
*/
static int create_subscriptions(void)
{
- /* Use the CDR topic to determine if we've already created this */
- if (cdr_topic) {
- return 0;
- }
-
- cdr_topic = stasis_topic_create("cdr_engine");
if (!cdr_topic) {
return -1;
}
+ if (channel_subscription || bridge_subscription || parking_subscription) {
+ return 0;
+ }
+
channel_subscription = stasis_forward_all(ast_channel_topic_all_cached(), cdr_topic);
if (!channel_subscription) {
return -1;
return -1;
}
- stasis_router = stasis_message_router_create(cdr_topic);
- if (!stasis_router) {
- return -1;
- }
- stasis_message_router_add_cache_update(stasis_router, ast_channel_snapshot_type(), handle_channel_cache_message, NULL);
- stasis_message_router_add(stasis_router, ast_channel_dial_type(), handle_dial_message, NULL);
- stasis_message_router_add(stasis_router, ast_channel_entered_bridge_type(), handle_bridge_enter_message, NULL);
- stasis_message_router_add(stasis_router, ast_channel_left_bridge_type(), handle_bridge_leave_message, NULL);
- stasis_message_router_add(stasis_router, ast_parked_call_type(), handle_parked_call_message, NULL);
-
return 0;
}
static void cdr_engine_shutdown(void)
{
+ stasis_message_router_unsubscribe_and_join(stasis_router);
+ stasis_router = NULL;
+
+ ao2_cleanup(cdr_topic);
+ cdr_topic = NULL;
+
ao2_callback(active_cdrs_by_channel, OBJ_NODATA, cdr_object_dispatch_all_cb,
NULL);
finalize_batch_mode();
return -1;
}
+ cdr_topic = stasis_topic_create("cdr_engine");
+ if (!cdr_topic) {
+ return -1;
+ }
+
+ stasis_router = stasis_message_router_create(cdr_topic);
+ if (!stasis_router) {
+ return -1;
+ }
+ stasis_message_router_add_cache_update(stasis_router, ast_channel_snapshot_type(), handle_channel_cache_message, NULL);
+ stasis_message_router_add(stasis_router, ast_channel_dial_type(), handle_dial_message, NULL);
+ stasis_message_router_add(stasis_router, ast_channel_entered_bridge_type(), handle_bridge_enter_message, NULL);
+ stasis_message_router_add(stasis_router, ast_channel_left_bridge_type(), handle_bridge_leave_message, NULL);
+ stasis_message_router_add(stasis_router, ast_parked_call_type(), handle_parked_call_message, NULL);
+
active_cdrs_by_channel = ao2_container_alloc(NUM_CDR_BUCKETS,
cdr_object_channel_hash_fn, cdr_object_channel_cmp_fn);
if (!active_cdrs_by_channel) {