#include "asterisk.h"
-ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
-
#include "asterisk/astobj2.h"
#include "asterisk/stasis_internal.h"
#include "asterisk/stasis.h"
#include "asterisk/taskprocessor.h"
+#include "asterisk/threadpool.h"
#include "asterisk/utils.h"
#include "asterisk/uuid.h"
#include "asterisk/vector.h"
+#include "asterisk/stasis_channels.h"
+#include "asterisk/stasis_bridges.h"
+#include "asterisk/stasis_endpoints.h"
+#include "asterisk/config_options.h"
+
+/*** DOCUMENTATION
+ <managerEvent language="en_US" name="UserEvent">
+ <managerEventInstance class="EVENT_FLAG_USER">
+ <synopsis>A user defined event raised from the dialplan.</synopsis>
+ <syntax>
+ <channel_snapshot/>
+ <parameter name="UserEvent">
+ <para>The event name, as specified in the dialplan.</para>
+ </parameter>
+ </syntax>
+ <description>
+ <para>Event may contain additional arbitrary parameters in addition to optional bridge and endpoint snapshots. Multiple snapshots of the same type are prefixed with a numeric value.</para>
+ </description>
+ <see-also>
+ <ref type="application">UserEvent</ref>
+ <ref type="managerEvent">UserEvent</ref>
+ </see-also>
+ </managerEventInstance>
+ </managerEvent>
+ <configInfo name="stasis" language="en_US">
+ <configFile name="stasis.conf">
+ <configObject name="threadpool">
+ <synopsis>Settings that configure the threadpool Stasis uses to deliver some messages.</synopsis>
+ <configOption name="initial_size" default="5">
+ <synopsis>Initial number of threads in the message bus threadpool.</synopsis>
+ </configOption>
+ <configOption name="idle_timeout_sec" default="20">
+ <synopsis>Number of seconds before an idle thread is disposed of.</synopsis>
+ </configOption>
+ <configOption name="max_size" default="50">
+ <synopsis>Maximum number of threads in the threadpool.</synopsis>
+ </configOption>
+ </configObject>
+ <configObject name="declined_message_types">
+ <synopsis>Stasis message types for which to decline creation.</synopsis>
+ <configOption name="decline">
+ <synopsis>The message type to decline.</synopsis>
+ <description>
+ <para>This configuration option defines the name of the Stasis
+ message type that Asterisk is forbidden from creating and can be
+ specified as many times as necessary to achieve the desired result.</para>
+ <enumlist>
+ <enum name="stasis_app_recording_snapshot_type" />
+ <enum name="stasis_app_playback_snapshot_type" />
+ <enum name="stasis_test_message_type" />
+ <enum name="confbridge_start_type" />
+ <enum name="confbridge_end_type" />
+ <enum name="confbridge_join_type" />
+ <enum name="confbridge_leave_type" />
+ <enum name="confbridge_start_record_type" />
+ <enum name="confbridge_stop_record_type" />
+ <enum name="confbridge_mute_type" />
+ <enum name="confbridge_unmute_type" />
+ <enum name="confbridge_talking_type" />
+ <enum name="cel_generic_type" />
+ <enum name="ast_bridge_snapshot_type" />
+ <enum name="ast_bridge_merge_message_type" />
+ <enum name="ast_channel_entered_bridge_type" />
+ <enum name="ast_channel_left_bridge_type" />
+ <enum name="ast_blind_transfer_type" />
+ <enum name="ast_attended_transfer_type" />
+ <enum name="ast_endpoint_snapshot_type" />
+ <enum name="ast_endpoint_state_type" />
+ <enum name="ast_device_state_message_type" />
+ <enum name="ast_test_suite_message_type" />
+ <enum name="ast_mwi_state_type" />
+ <enum name="ast_mwi_vm_app_type" />
+ <enum name="ast_format_register_type" />
+ <enum name="ast_format_unregister_type" />
+ <enum name="ast_manager_get_generic_type" />
+ <enum name="ast_parked_call_type" />
+ <enum name="ast_channel_snapshot_type" />
+ <enum name="ast_channel_dial_type" />
+ <enum name="ast_channel_varset_type" />
+ <enum name="ast_channel_hangup_request_type" />
+ <enum name="ast_channel_dtmf_begin_type" />
+ <enum name="ast_channel_dtmf_end_type" />
+ <enum name="ast_channel_hold_type" />
+ <enum name="ast_channel_unhold_type" />
+ <enum name="ast_channel_chanspy_start_type" />
+ <enum name="ast_channel_chanspy_stop_type" />
+ <enum name="ast_channel_fax_type" />
+ <enum name="ast_channel_hangup_handler_type" />
+ <enum name="ast_channel_moh_start_type" />
+ <enum name="ast_channel_moh_stop_type" />
+ <enum name="ast_channel_monitor_start_type" />
+ <enum name="ast_channel_monitor_stop_type" />
+ <enum name="ast_channel_agent_login_type" />
+ <enum name="ast_channel_agent_logoff_type" />
+ <enum name="ast_channel_talking_start" />
+ <enum name="ast_channel_talking_stop" />
+ <enum name="ast_security_event_type" />
+ <enum name="ast_named_acl_change_type" />
+ <enum name="ast_local_bridge_type" />
+ <enum name="ast_local_optimization_begin_type" />
+ <enum name="ast_local_optimization_end_type" />
+ <enum name="stasis_subscription_change_type" />
+ <enum name="ast_multi_user_event_type" />
+ <enum name="stasis_cache_clear_type" />
+ <enum name="stasis_cache_update_type" />
+ <enum name="ast_network_change_type" />
+ <enum name="ast_system_registry_type" />
+ <enum name="ast_cc_available_type" />
+ <enum name="ast_cc_offertimerstart_type" />
+ <enum name="ast_cc_requested_type" />
+ <enum name="ast_cc_requestacknowledged_type" />
+ <enum name="ast_cc_callerstopmonitoring_type" />
+ <enum name="ast_cc_callerstartmonitoring_type" />
+ <enum name="ast_cc_callerrecalling_type" />
+ <enum name="ast_cc_recallcomplete_type" />
+ <enum name="ast_cc_failure_type" />
+ <enum name="ast_cc_monitorfailed_type" />
+ <enum name="ast_presence_state_message_type" />
+ <enum name="ast_rtp_rtcp_sent_type" />
+ <enum name="ast_rtp_rtcp_received_type" />
+ <enum name="ast_call_pickup_type" />
+ <enum name="aoc_s_type" />
+ <enum name="aoc_d_type" />
+ <enum name="aoc_e_type" />
+ <enum name="dahdichannel_type" />
+ <enum name="mcid_type" />
+ <enum name="session_timeout_type" />
+ <enum name="cdr_read_message_type" />
+ <enum name="cdr_write_message_type" />
+ <enum name="cdr_prop_write_message_type" />
+ <enum name="corosync_ping_message_type" />
+ <enum name="agi_exec_start_type" />
+ <enum name="agi_exec_end_type" />
+ <enum name="agi_async_start_type" />
+ <enum name="agi_async_exec_type" />
+ <enum name="agi_async_end_type" />
+ <enum name="queue_caller_join_type" />
+ <enum name="queue_caller_leave_type" />
+ <enum name="queue_caller_abandon_type" />
+ <enum name="queue_member_status_type" />
+ <enum name="queue_member_added_type" />
+ <enum name="queue_member_removed_type" />
+ <enum name="queue_member_pause_type" />
+ <enum name="queue_member_penalty_type" />
+ <enum name="queue_member_ringinuse_type" />
+ <enum name="queue_agent_called_type" />
+ <enum name="queue_agent_connect_type" />
+ <enum name="queue_agent_complete_type" />
+ <enum name="queue_agent_dump_type" />
+ <enum name="queue_agent_ringnoanswer_type" />
+ <enum name="meetme_join_type" />
+ <enum name="meetme_leave_type" />
+ <enum name="meetme_end_type" />
+ <enum name="meetme_mute_type" />
+ <enum name="meetme_talking_type" />
+ <enum name="meetme_talk_request_type" />
+ <enum name="appcdr_message_type" />
+ <enum name="forkcdr_message_type" />
+ <enum name="cdr_sync_message_type" />
+ </enumlist>
+ </description>
+ </configOption>
+ </configObject>
+ </configFile>
+ </configInfo>
+***/
/*!
* \page stasis-impl Stasis Implementation Notes
/*! The number of buckets to use for topic pools */
#define TOPIC_POOL_BUCKETS 57
+/*! Thread pool for topics that don't want a dedicated taskprocessor */
+static struct ast_threadpool *pool;
+
STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type);
/*! \internal */
struct stasis_topic *stasis_topic_create(const char *name)
{
- RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
+ struct stasis_topic *topic;
int res = 0;
- topic = ao2_alloc(sizeof(*topic), topic_dtor);
-
+ topic = ao2_t_alloc(sizeof(*topic), topic_dtor, name);
if (!topic) {
return NULL;
}
topic->name = ast_strdup(name);
- if (!topic->name) {
- return NULL;
- }
-
res |= AST_VECTOR_INIT(&topic->subscribers, INITIAL_SUBSCRIBERS_MAX);
res |= AST_VECTOR_INIT(&topic->upstream_topics, 0);
-
- if (res != 0) {
+ if (!topic->name || res) {
+ ao2_cleanup(topic);
return NULL;
}
- ao2_ref(topic, +1);
return topic;
}
/*! Data pointer to be handed to the callback. */
void *data;
- /*! Lock for completion flags \c final_message_{rxed,processed}. */
- ast_mutex_t join_lock;
/*! Condition for joining with subscription. */
ast_cond_t join_cond;
/*! Flag set when final message for sub has been received.
sub->topic = NULL;
ast_taskprocessor_unreference(sub->mailbox);
sub->mailbox = NULL;
- ast_mutex_destroy(&sub->join_lock);
ast_cond_destroy(&sub->join_cond);
}
{
/* Notify that the final message has been received */
if (stasis_subscription_final_message(sub, message)) {
- SCOPED_MUTEX(lock, &sub->join_lock);
+ ao2_lock(sub);
sub->final_message_rxed = 1;
ast_cond_signal(&sub->join_cond);
+ ao2_unlock(sub);
}
/* Since sub is mostly immutable, no need to lock sub */
/* Notify that the final message has been processed */
if (stasis_subscription_final_message(sub, message)) {
- SCOPED_MUTEX(lock, &sub->join_lock);
+ ao2_lock(sub);
sub->final_message_processed = 1;
ast_cond_signal(&sub->join_cond);
+ ao2_unlock(sub);
}
}
static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
+void stasis_subscription_cb_noop(void *data, struct stasis_subscription *sub, struct stasis_message *message)
+{
+}
+
struct stasis_subscription *internal_stasis_subscribe(
struct stasis_topic *topic,
stasis_subscription_cb callback,
void *data,
- int needs_mailbox)
+ int needs_mailbox,
+ int use_thread_pool)
{
- RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup);
+ struct stasis_subscription *sub;
if (!topic) {
return NULL;
}
- sub = ao2_alloc(sizeof(*sub), subscription_dtor);
+ /* The ao2 lock is used for join_cond. */
+ sub = ao2_t_alloc(sizeof(*sub), subscription_dtor, stasis_topic_name(topic));
if (!sub) {
return NULL;
}
-
ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid));
if (needs_mailbox) {
- /* With a small number of subscribers, a thread-per-sub is
- * acceptable. If our usage changes so that we have larger
- * numbers of subscribers, we'll probably want to consider
- * a threadpool. We had that originally, but with so few
- * subscribers it was actually a performance loss instead of
- * a gain.
+ char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
+
+ /* Create name with seq number appended. */
+ ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "sub%c:%s",
+ use_thread_pool ? 'p' : 'm',
+ stasis_topic_name(topic));
+
+ /*
+ * With a small number of subscribers, a thread-per-sub is
+ * acceptable. For a large number of subscribers, a thread
+ * pool should be used.
*/
- sub->mailbox = ast_taskprocessor_get(sub->uniqueid,
- TPS_REF_DEFAULT);
+ if (use_thread_pool) {
+ sub->mailbox = ast_threadpool_serializer(tps_name, pool);
+ } else {
+ sub->mailbox = ast_taskprocessor_get(tps_name, TPS_REF_DEFAULT);
+ }
if (!sub->mailbox) {
+ ao2_ref(sub, -1);
+
return NULL;
}
ast_taskprocessor_set_local(sub->mailbox, sub);
sub->topic = topic;
sub->callback = callback;
sub->data = data;
- ast_mutex_init(&sub->join_lock);
ast_cond_init(&sub->join_cond, NULL);
if (topic_add_subscription(topic, sub) != 0) {
+ ao2_ref(sub, -1);
+
return NULL;
}
send_subscription_subscribe(topic, sub);
- ao2_ref(sub, +1);
return sub;
}
stasis_subscription_cb callback,
void *data)
{
- return internal_stasis_subscribe(topic, callback, data, 1);
+ return internal_stasis_subscribe(topic, callback, data, 1, 0);
+}
+
+struct stasis_subscription *stasis_subscribe_pool(
+ struct stasis_topic *topic,
+ stasis_subscription_cb callback,
+ void *data)
+{
+ return internal_stasis_subscribe(topic, callback, data, 1, 1);
}
static int sub_cleanup(void *data)
{
/* The subscription may be the last ref to this topic. Hold
* the topic ref open until after the unlock. */
- RAII_VAR(struct stasis_topic *, topic,
- ao2_bump(sub ? sub->topic : NULL), ao2_cleanup);
+ struct stasis_topic *topic;
if (!sub) {
return NULL;
}
+ topic = ao2_bump(sub->topic);
+
/* We have to remove the subscription first, to ensure the unsubscribe
* is the final message */
if (topic_remove_subscription(sub->topic, sub) != 0) {
ast_log(LOG_ERROR,
"Internal error: subscription has invalid topic\n");
+ ao2_cleanup(topic);
+
return NULL;
}
/* Unsubscribing unrefs the subscription */
ao2_cleanup(sub);
+ ao2_cleanup(topic);
+
return NULL;
}
+int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscription,
+ long low_water, long high_water)
+{
+ int res = -1;
+
+ if (subscription) {
+ res = ast_taskprocessor_alert_set_levels(subscription->mailbox,
+ low_water, high_water);
+ }
+ return res;
+}
+
void stasis_subscription_join(struct stasis_subscription *subscription)
{
if (subscription) {
- SCOPED_MUTEX(lock, &subscription->join_lock);
+ ao2_lock(subscription);
/* Wait until the processed flag has been set */
while (!subscription->final_message_processed) {
ast_cond_wait(&subscription->join_cond,
- &subscription->join_lock);
+ ao2_object_get_lockaddr(subscription));
}
+ ao2_unlock(subscription);
}
}
int stasis_subscription_is_done(struct stasis_subscription *subscription)
{
if (subscription) {
- SCOPED_MUTEX(lock, &subscription->join_lock);
- return subscription->final_message_rxed;
+ int ret;
+
+ ao2_lock(subscription);
+ ret = subscription->final_message_rxed;
+ ao2_unlock(subscription);
+
+ return ret;
}
/* Null subscription is about as done as you can get */
if (sub) {
size_t i;
struct stasis_topic *topic = sub->topic;
- SCOPED_AO2LOCK(lock_topic, topic);
+ ao2_lock(topic);
for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
if (AST_VECTOR_GET(&topic->subscribers, i) == sub) {
+ ao2_unlock(topic);
return 1;
}
}
+ ao2_unlock(topic);
}
return 0;
int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
{
struct stasis_subscription_change *change;
+
if (stasis_message_type(msg) != stasis_subscription_change_type()) {
return 0;
}
static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
{
size_t idx;
- SCOPED_AO2LOCK(lock, topic);
+ ao2_lock(topic);
/* The reference from the topic to the subscription is shared with
* the owner of the subscription, which will explicitly unsubscribe
* to release it.
topic_add_subscription(
AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
}
+ ao2_unlock(topic);
return 0;
}
static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
{
size_t idx;
- SCOPED_AO2LOCK(lock_topic, topic);
+ int res;
+ ao2_lock(topic);
for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
topic_remove_subscription(
AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
}
-
- return AST_VECTOR_REMOVE_ELEM_UNORDERED(&topic->subscribers, sub,
+ res = AST_VECTOR_REMOVE_ELEM_UNORDERED(&topic->subscribers, sub,
AST_VECTOR_ELEM_CLEANUP_NOOP);
+ ao2_unlock(topic);
+
+ return res;
}
/*!
*/
ao2_bump(message);
if (!synchronous) {
- if (ast_taskprocessor_push_local(sub->mailbox,
- dispatch_exec_async,
- message) != 0) {
+ if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec_async, message)) {
/* Push failed; ugh. */
ast_log(LOG_ERROR, "Dropping async dispatch\n");
ao2_cleanup(message);
std.complete = 0;
std.task_data = message;
- if (ast_taskprocessor_push_local(sub->mailbox,
- dispatch_exec_sync,
- &std)) {
+ if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec_sync, &std)) {
/* Push failed; ugh. */
ast_log(LOG_ERROR, "Dropping sync dispatch\n");
ao2_cleanup(message);
+ ast_mutex_destroy(&std.lock);
+ ast_cond_destroy(&std.cond);
return;
}
from = forward->from_topic;
to = forward->to_topic;
- topic_lock_both(to, from);
- AST_VECTOR_REMOVE_ELEM_UNORDERED(&to->upstream_topics, from,
- AST_VECTOR_ELEM_CLEANUP_NOOP);
+ if (from && to) {
+ topic_lock_both(to, from);
+ AST_VECTOR_REMOVE_ELEM_UNORDERED(&to->upstream_topics, from,
+ AST_VECTOR_ELEM_CLEANUP_NOOP);
- for (idx = 0; idx < AST_VECTOR_SIZE(&to->subscribers); ++idx) {
- topic_remove_subscription(from, AST_VECTOR_GET(&to->subscribers, idx));
+ for (idx = 0; idx < AST_VECTOR_SIZE(&to->subscribers); ++idx) {
+ topic_remove_subscription(from, AST_VECTOR_GET(&to->subscribers, idx));
+ }
+ ao2_unlock(from);
+ ao2_unlock(to);
}
- ao2_unlock(from);
- ao2_unlock(to);
ao2_cleanup(forward);
{
int res;
size_t idx;
- RAII_VAR(struct stasis_forward *, forward, NULL, ao2_cleanup);
+ struct stasis_forward *forward;
if (!from_topic || !to_topic) {
return NULL;
}
- forward = ao2_alloc(sizeof(*forward), forward_dtor);
+ forward = ao2_alloc_options(sizeof(*forward), forward_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
if (!forward) {
return NULL;
}
+ /* Forwards to ourselves are implicit. */
+ if (to_topic == from_topic) {
+ return forward;
+ }
+
forward->from_topic = ao2_bump(from_topic);
forward->to_topic = ao2_bump(to_topic);
if (res != 0) {
ao2_unlock(from_topic);
ao2_unlock(to_topic);
+ ao2_ref(forward, -1);
return NULL;
}
ao2_unlock(from_topic);
ao2_unlock(to_topic);
- return ao2_bump(forward);
+ return forward;
}
static void subscription_change_dtor(void *obj)
{
struct stasis_subscription_change *change = obj;
+
ast_string_field_free_memory(change);
ao2_cleanup(change->topic);
}
static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, const char *uniqueid, const char *description)
{
- RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
+ struct stasis_subscription_change *change;
change = ao2_alloc(sizeof(struct stasis_subscription_change), subscription_change_dtor);
- if (ast_string_field_init(change, 128)) {
+ if (!change || ast_string_field_init(change, 128)) {
+ ao2_cleanup(change);
return NULL;
}
ao2_ref(topic, +1);
change->topic = topic;
- ao2_ref(change, +1);
return change;
}
static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
{
- RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
- RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
+ struct stasis_subscription_change *change;
+ struct stasis_message *msg;
/* This assumes that we have already unsubscribed */
ast_assert(stasis_subscription_is_subscribed(sub));
- change = subscription_change_alloc(topic, sub->uniqueid, "Subscribe");
+ if (!stasis_subscription_change_type()) {
+ return;
+ }
+ change = subscription_change_alloc(topic, sub->uniqueid, "Subscribe");
if (!change) {
return;
}
msg = stasis_message_create(stasis_subscription_change_type(), change);
-
if (!msg) {
+ ao2_cleanup(change);
return;
}
stasis_publish(topic, msg);
+ ao2_cleanup(msg);
+ ao2_cleanup(change);
}
static void send_subscription_unsubscribe(struct stasis_topic *topic,
struct stasis_subscription *sub)
{
- RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
- RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
+ struct stasis_subscription_change *change;
+ struct stasis_message *msg;
/* This assumes that we have already unsubscribed */
ast_assert(!stasis_subscription_is_subscribed(sub));
- change = subscription_change_alloc(topic, sub->uniqueid, "Unsubscribe");
+ if (!stasis_subscription_change_type()) {
+ return;
+ }
+ change = subscription_change_alloc(topic, sub->uniqueid, "Unsubscribe");
if (!change) {
return;
}
msg = stasis_message_create(stasis_subscription_change_type(), change);
-
if (!msg) {
+ ao2_cleanup(change);
return;
}
/* Now we have to dispatch to the subscription itself */
dispatch_message(sub, msg, 0);
+
+ ao2_cleanup(msg);
+ ao2_cleanup(change);
}
struct topic_pool_entry {
static void topic_pool_entry_dtor(void *obj)
{
struct topic_pool_entry *entry = obj;
+
entry->forward = stasis_forward_cancel(entry->forward);
ao2_cleanup(entry->topic);
entry->topic = NULL;
static struct topic_pool_entry *topic_pool_entry_alloc(void)
{
- return ao2_alloc(sizeof(struct topic_pool_entry), topic_pool_entry_dtor);
+ return ao2_alloc_options(sizeof(struct topic_pool_entry), topic_pool_entry_dtor,
+ AO2_ALLOC_OPT_LOCK_NOLOCK);
}
struct stasis_topic_pool {
static void topic_pool_dtor(void *obj)
{
struct stasis_topic_pool *pool = obj;
+
ao2_cleanup(pool->pool_container);
pool->pool_container = NULL;
ao2_cleanup(pool->pool_topic);
static int topic_pool_entry_hash(const void *obj, const int flags)
{
- const char *topic_name = (flags & OBJ_KEY) ? obj : stasis_topic_name(((struct topic_pool_entry*) obj)->topic);
- return ast_str_case_hash(topic_name);
+ const struct topic_pool_entry *object;
+ const char *key;
+
+ switch (flags & OBJ_SEARCH_MASK) {
+ case OBJ_SEARCH_KEY:
+ key = obj;
+ break;
+ case OBJ_SEARCH_OBJECT:
+ object = obj;
+ key = stasis_topic_name(object->topic);
+ break;
+ default:
+ /* Hash can only work on something with a full key. */
+ ast_assert(0);
+ return 0;
+ }
+ return ast_str_case_hash(key);
}
static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
{
- struct topic_pool_entry *opt1 = obj, *opt2 = arg;
- const char *topic_name = (flags & OBJ_KEY) ? arg : stasis_topic_name(opt2->topic);
- return strcasecmp(stasis_topic_name(opt1->topic), topic_name) ? 0 : CMP_MATCH | CMP_STOP;
+ const struct topic_pool_entry *object_left = obj;
+ const struct topic_pool_entry *object_right = arg;
+ const char *right_key = arg;
+ int cmp;
+
+ switch (flags & OBJ_SEARCH_MASK) {
+ case OBJ_SEARCH_OBJECT:
+ right_key = stasis_topic_name(object_right->topic);
+ /* Fall through */
+ case OBJ_SEARCH_KEY:
+ cmp = strcasecmp(stasis_topic_name(object_left->topic), right_key);
+ break;
+ case OBJ_SEARCH_PARTIAL_KEY:
+ /* Not supported by container */
+ ast_assert(0);
+ cmp = -1;
+ break;
+ default:
+ /*
+ * What arg points to is specific to this traversal callback
+ * and has no special meaning to astobj2.
+ */
+ cmp = 0;
+ break;
+ }
+ if (cmp) {
+ return 0;
+ }
+ /*
+ * At this point the traversal callback is identical to a sorted
+ * container.
+ */
+ return CMP_MATCH;
}
struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic)
{
- RAII_VAR(struct stasis_topic_pool *, pool, ao2_alloc(sizeof(*pool), topic_pool_dtor), ao2_cleanup);
+ struct stasis_topic_pool *pool;
+
+ pool = ao2_alloc_options(sizeof(*pool), topic_pool_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
if (!pool) {
return NULL;
}
- pool->pool_container = ao2_container_alloc(TOPIC_POOL_BUCKETS, topic_pool_entry_hash, topic_pool_entry_cmp);
+
+ pool->pool_container = ao2_container_alloc(TOPIC_POOL_BUCKETS,
+ topic_pool_entry_hash, topic_pool_entry_cmp);
+ if (!pool->pool_container) {
+ ao2_cleanup(pool);
+ return NULL;
+ }
ao2_ref(pooled_topic, +1);
pool->pool_topic = pooled_topic;
- ao2_ref(pool, +1);
return pool;
}
{
RAII_VAR(struct topic_pool_entry *, topic_pool_entry, NULL, ao2_cleanup);
SCOPED_AO2LOCK(topic_container_lock, pool->pool_container);
- topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_KEY | OBJ_NOLOCK);
+ topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (topic_pool_entry) {
return topic_pool_entry->topic;
}
topic_pool_entry = topic_pool_entry_alloc();
-
if (!topic_pool_entry) {
return NULL;
}
return NULL;
}
- ao2_link_flags(pool->pool_container, topic_pool_entry, OBJ_NOLOCK);
+ if (!ao2_link_flags(pool->pool_container, topic_pool_entry, OBJ_NOLOCK)) {
+ return NULL;
+ }
return topic_pool_entry->topic;
}
+int stasis_topic_pool_topic_exists(const struct stasis_topic_pool *pool, const char *topic_name)
+{
+ struct topic_pool_entry *topic_pool_entry;
+
+ topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY);
+ if (!topic_pool_entry) {
+ return 0;
+ }
+
+ ao2_ref(topic_pool_entry, -1);
+ return 1;
+}
+
void stasis_log_bad_type_access(const char *name)
{
- ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name);
+#ifdef AST_DEVMODE
+ if (!stasis_message_type_declined(name)) {
+ ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name);
+ }
+#endif
+}
+
+/*! \brief A multi object blob data structure to carry user event stasis messages */
+struct ast_multi_object_blob {
+ struct ast_json *blob; /*< A blob of JSON data */
+ AST_VECTOR(, void *) snapshots[STASIS_UMOS_MAX]; /*< Vector of snapshots for each type */
+};
+
+/*!
+ * \internal
+ * \brief Destructor for \ref ast_multi_object_blob objects
+ */
+static void multi_object_blob_dtor(void *obj)
+{
+ struct ast_multi_object_blob *multi = obj;
+ int type;
+ int i;
+
+ for (type = 0; type < STASIS_UMOS_MAX; ++type) {
+ for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
+ ao2_cleanup(AST_VECTOR_GET(&multi->snapshots[type], i));
+ }
+ AST_VECTOR_FREE(&multi->snapshots[type]);
+ }
+ ast_json_unref(multi->blob);
+}
+
+/*! \brief Create a stasis user event multi object blob */
+struct ast_multi_object_blob *ast_multi_object_blob_create(struct ast_json *blob)
+{
+ int type;
+ struct ast_multi_object_blob *multi;
+
+ ast_assert(blob != NULL);
+
+ multi = ao2_alloc(sizeof(*multi), multi_object_blob_dtor);
+ if (!multi) {
+ return NULL;
+ }
+
+ for (type = 0; type < STASIS_UMOS_MAX; ++type) {
+ if (AST_VECTOR_INIT(&multi->snapshots[type], 0)) {
+ ao2_ref(multi, -1);
+
+ return NULL;
+ }
+ }
+
+ multi->blob = ast_json_ref(blob);
+
+ return multi;
+}
+
+/*! \brief Add an object (snapshot) to the blob */
+void ast_multi_object_blob_add(struct ast_multi_object_blob *multi,
+ enum stasis_user_multi_object_snapshot_type type, void *object)
+{
+ if (!multi || !object || AST_VECTOR_APPEND(&multi->snapshots[type], object)) {
+ ao2_cleanup(object);
+ }
+}
+
+/*! \brief Publish single channel user event (for app_userevent compatibility) */
+void ast_multi_object_blob_single_channel_publish(struct ast_channel *chan,
+ struct stasis_message_type *type, struct ast_json *blob)
+{
+ struct stasis_message *message;
+ struct ast_channel_snapshot *channel_snapshot;
+ struct ast_multi_object_blob *multi;
+
+ if (!type) {
+ return;
+ }
+
+ multi = ast_multi_object_blob_create(blob);
+ if (!multi) {
+ return;
+ }
+
+ channel_snapshot = ast_channel_snapshot_create(chan);
+ if (!channel_snapshot) {
+ ao2_ref(multi, -1);
+ return;
+ }
+
+ /* this call steals the channel_snapshot reference */
+ ast_multi_object_blob_add(multi, STASIS_UMOS_CHANNEL, channel_snapshot);
+
+ message = stasis_message_create(type, multi);
+ ao2_ref(multi, -1);
+ if (message) {
+ /* app_userevent still publishes to channel */
+ stasis_publish(ast_channel_topic(chan), message);
+ ao2_ref(message, -1);
+ }
+}
+
+/*! \internal \brief convert multi object blob to ari json */
+static struct ast_json *multi_user_event_to_json(
+ struct stasis_message *message,
+ const struct stasis_message_sanitizer *sanitize)
+{
+ struct ast_json *out;
+ struct ast_multi_object_blob *multi = stasis_message_data(message);
+ struct ast_json *blob = multi->blob;
+ const struct timeval *tv = stasis_message_timestamp(message);
+ enum stasis_user_multi_object_snapshot_type type;
+ int i;
+
+ out = ast_json_object_create();
+ if (!out) {
+ return NULL;
+ }
+
+ ast_json_object_set(out, "type", ast_json_string_create("ChannelUserevent"));
+ ast_json_object_set(out, "timestamp", ast_json_timeval(*tv, NULL));
+ ast_json_object_set(out, "eventname", ast_json_string_create(ast_json_string_get((ast_json_object_get(blob, "eventname")))));
+ ast_json_object_set(out, "userevent", ast_json_deep_copy(blob));
+
+ for (type = 0; type < STASIS_UMOS_MAX; ++type) {
+ for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
+ struct ast_json *json_object = NULL;
+ char *name = NULL;
+ void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
+
+ switch (type) {
+ case STASIS_UMOS_CHANNEL:
+ json_object = ast_channel_snapshot_to_json(snapshot, sanitize);
+ name = "channel";
+ break;
+ case STASIS_UMOS_BRIDGE:
+ json_object = ast_bridge_snapshot_to_json(snapshot, sanitize);
+ name = "bridge";
+ break;
+ case STASIS_UMOS_ENDPOINT:
+ json_object = ast_endpoint_snapshot_to_json(snapshot, sanitize);
+ name = "endpoint";
+ break;
+ }
+ if (json_object) {
+ ast_json_object_set(out, name, json_object);
+ }
+ }
+ }
+
+ return out;
+}
+
+/*! \internal \brief convert multi object blob to ami string */
+static struct ast_str *multi_object_blob_to_ami(void *obj)
+{
+ struct ast_str *ami_str=ast_str_create(1024);
+ struct ast_str *ami_snapshot;
+ const struct ast_multi_object_blob *multi = obj;
+ enum stasis_user_multi_object_snapshot_type type;
+ int i;
+
+ if (!ami_str) {
+ return NULL;
+ }
+ if (!multi) {
+ ast_free(ami_str);
+ return NULL;
+ }
+
+ for (type = 0; type < STASIS_UMOS_MAX; ++type) {
+ for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
+ char *name = NULL;
+ void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
+ ami_snapshot = NULL;
+
+ if (i > 0) {
+ ast_asprintf(&name, "%d", i + 1);
+ }
+
+ switch (type) {
+ case STASIS_UMOS_CHANNEL:
+ ami_snapshot = ast_manager_build_channel_state_string_prefix(snapshot, name ?: "");
+ break;
+
+ case STASIS_UMOS_BRIDGE:
+ ami_snapshot = ast_manager_build_bridge_state_string_prefix(snapshot, name ?: "");
+ break;
+
+ case STASIS_UMOS_ENDPOINT:
+ /* currently not sending endpoint snapshots to AMI */
+ break;
+ }
+ if (ami_snapshot) {
+ ast_str_append(&ami_str, 0, "%s", ast_str_buffer(ami_snapshot));
+ ast_free(ami_snapshot);
+ }
+ ast_free(name);
+ }
+ }
+
+ return ami_str;
}
+/*! \internal \brief Callback to pass only user defined parameters from blob */
+static int userevent_exclusion_cb(const char *key)
+{
+ if (!strcmp("eventname", key)) {
+ return 1;
+ }
+ return 0;
+}
+
+static struct ast_manager_event_blob *multi_user_event_to_ami(
+ struct stasis_message *message)
+{
+ RAII_VAR(struct ast_str *, object_string, NULL, ast_free);
+ RAII_VAR(struct ast_str *, body, NULL, ast_free);
+ struct ast_multi_object_blob *multi = stasis_message_data(message);
+ const char *eventname;
+
+ eventname = ast_json_string_get(ast_json_object_get(multi->blob, "eventname"));
+ body = ast_manager_str_from_json_object(multi->blob, userevent_exclusion_cb);
+ object_string = multi_object_blob_to_ami(multi);
+ if (!object_string || !body) {
+ return NULL;
+ }
+
+ return ast_manager_event_blob_create(EVENT_FLAG_USER, "UserEvent",
+ "%s"
+ "UserEvent: %s\r\n"
+ "%s",
+ ast_str_buffer(object_string),
+ eventname,
+ ast_str_buffer(body));
+}
+
+/*! \brief A structure to hold global configuration-related options */
+struct stasis_declined_config {
+ /*! The list of message types to decline */
+ struct ao2_container *declined;
+};
+
+/*! \brief Threadpool configuration options */
+struct stasis_threadpool_conf {
+ /*! Initial size of the thread pool */
+ int initial_size;
+ /*! Time, in seconds, before we expire a thread */
+ int idle_timeout_sec;
+ /*! Maximum number of thread to allow */
+ int max_size;
+};
+
+struct stasis_config {
+ /*! Thread pool configuration options */
+ struct stasis_threadpool_conf *threadpool_options;
+ /*! Declined message types */
+ struct stasis_declined_config *declined_message_types;
+};
+
+static struct aco_type threadpool_option = {
+ .type = ACO_GLOBAL,
+ .name = "threadpool",
+ .item_offset = offsetof(struct stasis_config, threadpool_options),
+ .category = "threadpool",
+ .category_match = ACO_WHITELIST_EXACT,
+};
+
+static struct aco_type *threadpool_options[] = ACO_TYPES(&threadpool_option);
+
+/*! \brief An aco_type structure to link the "declined_message_types" category to the stasis_declined_config type */
+static struct aco_type declined_option = {
+ .type = ACO_GLOBAL,
+ .name = "declined_message_types",
+ .item_offset = offsetof(struct stasis_config, declined_message_types),
+ .category_match = ACO_WHITELIST_EXACT,
+ .category = "declined_message_types",
+};
+
+struct aco_type *declined_options[] = ACO_TYPES(&declined_option);
+
+struct aco_file stasis_conf = {
+ .filename = "stasis.conf",
+ .types = ACO_TYPES(&declined_option, &threadpool_option),
+};
+
+/*! \brief A global object container that will contain the stasis_config that gets swapped out on reloads */
+static AO2_GLOBAL_OBJ_STATIC(globals);
+
+static void *stasis_config_alloc(void);
+
+/*! \brief Register information about the configs being processed by this module */
+CONFIG_INFO_CORE("stasis", cfg_info, globals, stasis_config_alloc,
+ .files = ACO_FILES(&stasis_conf),
+);
+
+static void stasis_declined_config_destructor(void *obj)
+{
+ struct stasis_declined_config *declined = obj;
+
+ ao2_cleanup(declined->declined);
+}
+
+static void stasis_config_destructor(void *obj)
+{
+ struct stasis_config *cfg = obj;
+
+ ao2_cleanup(cfg->declined_message_types);
+ ast_free(cfg->threadpool_options);
+}
+
+static void *stasis_config_alloc(void)
+{
+ struct stasis_config *cfg;
+
+ if (!(cfg = ao2_alloc(sizeof(*cfg), stasis_config_destructor))) {
+ return NULL;
+ }
+
+ cfg->threadpool_options = ast_calloc(1, sizeof(*cfg->threadpool_options));
+ if (!cfg->threadpool_options) {
+ ao2_ref(cfg, -1);
+ return NULL;
+ }
+
+ cfg->declined_message_types = ao2_alloc(sizeof(*cfg->declined_message_types),
+ stasis_declined_config_destructor);
+ if (!cfg->declined_message_types) {
+ ao2_ref(cfg, -1);
+ return NULL;
+ }
+
+ cfg->declined_message_types->declined = ast_str_container_alloc(13);
+ if (!cfg->declined_message_types->declined) {
+ ao2_ref(cfg, -1);
+ return NULL;
+ }
+
+ return cfg;
+}
+
+int stasis_message_type_declined(const char *name)
+{
+ struct stasis_config *cfg = ao2_global_obj_ref(globals);
+ char *name_in_declined;
+ int res;
+
+ if (!cfg || !cfg->declined_message_types) {
+ ao2_cleanup(cfg);
+ return 0;
+ }
+
+ name_in_declined = ao2_find(cfg->declined_message_types->declined, name, OBJ_SEARCH_KEY);
+ res = name_in_declined ? 1 : 0;
+ ao2_cleanup(name_in_declined);
+ ao2_ref(cfg, -1);
+ if (res) {
+ ast_log(LOG_NOTICE, "Declining to allocate Stasis message type '%s' due to configuration\n", name);
+ }
+ return res;
+}
+
+static int declined_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
+{
+ struct stasis_declined_config *declined = obj;
+
+ if (ast_strlen_zero(var->value)) {
+ return 0;
+ }
+
+ if (ast_str_container_add(declined->declined, var->value)) {
+ return -1;
+ }
+
+ return 0;
+}
+
+/*!
+ * @{ \brief Define multi user event message type(s).
+ */
+
+STASIS_MESSAGE_TYPE_DEFN(ast_multi_user_event_type,
+ .to_json = multi_user_event_to_json,
+ .to_ami = multi_user_event_to_ami,
+ );
+
+/*! @} */
+
/*! \brief Cleanup function for graceful shutdowns */
static void stasis_cleanup(void)
{
+ ast_threadpool_shutdown(pool);
+ pool = NULL;
STASIS_MESSAGE_TYPE_CLEANUP(stasis_subscription_change_type);
+ STASIS_MESSAGE_TYPE_CLEANUP(ast_multi_user_event_type);
+ aco_info_destroy(&cfg_info);
+ ao2_global_obj_release(globals);
}
int stasis_init(void)
{
+ struct stasis_config *cfg;
int cache_init;
+ struct ast_threadpool_options threadpool_opts = { 0, };
/* Be sure the types are cleaned up after the message bus */
ast_register_cleanup(stasis_cleanup);
+ if (aco_info_init(&cfg_info)) {
+ return -1;
+ }
+
+ aco_option_register_custom(&cfg_info, "decline", ACO_EXACT,
+ declined_options, "", declined_handler, 0);
+ aco_option_register(&cfg_info, "initial_size", ACO_EXACT,
+ threadpool_options, "5", OPT_INT_T, PARSE_IN_RANGE,
+ FLDSET(struct stasis_threadpool_conf, initial_size), 0,
+ INT_MAX);
+ aco_option_register(&cfg_info, "idle_timeout_sec", ACO_EXACT,
+ threadpool_options, "20", OPT_INT_T, PARSE_IN_RANGE,
+ FLDSET(struct stasis_threadpool_conf, idle_timeout_sec), 0,
+ INT_MAX);
+ aco_option_register(&cfg_info, "max_size", ACO_EXACT,
+ threadpool_options, "50", OPT_INT_T, PARSE_IN_RANGE,
+ FLDSET(struct stasis_threadpool_conf, max_size), 0,
+ INT_MAX);
+
+ if (aco_process_config(&cfg_info, 0) == ACO_PROCESS_ERROR) {
+ struct stasis_config *default_cfg = stasis_config_alloc();
+
+ if (!default_cfg) {
+ return -1;
+ }
+
+ if (aco_set_defaults(&threadpool_option, "threadpool", default_cfg->threadpool_options)) {
+ ast_log(LOG_ERROR, "Failed to initialize defaults on Stasis configuration object\n");
+ ao2_ref(default_cfg, -1);
+
+ return -1;
+ }
+
+ if (aco_set_defaults(&declined_option, "declined_message_types", default_cfg->declined_message_types)) {
+ ast_log(LOG_ERROR, "Failed to load stasis.conf and failed to initialize defaults.\n");
+ ao2_ref(default_cfg, -1);
+
+ return -1;
+ }
+
+ ast_log(LOG_NOTICE, "Could not load Stasis configuration; using defaults\n");
+ ao2_global_obj_replace_unref(globals, default_cfg);
+ cfg = default_cfg;
+ } else {
+ cfg = ao2_global_obj_ref(globals);
+ if (!cfg) {
+ ast_log(LOG_ERROR, "Failed to obtain Stasis configuration object\n");
+
+ return -1;
+ }
+ }
+
+ threadpool_opts.version = AST_THREADPOOL_OPTIONS_VERSION;
+ threadpool_opts.initial_size = cfg->threadpool_options->initial_size;
+ threadpool_opts.auto_increment = 1;
+ threadpool_opts.max_size = cfg->threadpool_options->max_size;
+ threadpool_opts.idle_timeout = cfg->threadpool_options->idle_timeout_sec;
+ pool = ast_threadpool_create("stasis-core", NULL, &threadpool_opts);
+ ao2_ref(cfg, -1);
+ if (!pool) {
+ ast_log(LOG_ERROR, "Failed to create 'stasis-core' threadpool\n");
+
+ return -1;
+ }
+
cache_init = stasis_cache_init();
if (cache_init != 0) {
return -1;
if (STASIS_MESSAGE_TYPE_INIT(stasis_subscription_change_type) != 0) {
return -1;
}
+ if (STASIS_MESSAGE_TYPE_INIT(ast_multi_user_event_type) != 0) {
+ return -1;
+ }
return 0;
}