#include "asterisk/stasis_channels.h"
#include "asterisk/stasis_message_router.h"
#include "asterisk/bridge_after.h"
+#include "asterisk/stasis_bridges.h"
+#include "asterisk/core_local.h"
+#include "asterisk/mixmonitor.h"
+#include "asterisk/core_unreal.h"
+#include "asterisk/bridge_basic.h"
/* Define, to debug reference counts on queues, without debugging reference counts on queue members */
/* #define REF_DEBUG_ONLY_QUEUES */
static struct member *interface_exists(struct call_queue *q, const char *interface);
static int set_member_paused(const char *queuename, const char *interface, const char *reason, int paused);
-#if 0 // BUGBUG
-static void queue_transfer_fixup(void *data, struct ast_channel *old_chan, struct ast_channel *new_chan);
-#endif // BUGBUG
-
static struct member *find_member_by_queuename_and_interface(const char *queuename, const char *interface);
/*! \brief sets the QUEUESTATUS channel variable */
static void set_queue_result(struct ast_channel *chan, enum queue_result res)
static inline struct call_queue *_queue_unref(struct call_queue *q, const char *tag, const char *file, int line, const char *filename)
{
- __ao2_ref_debug(q, -1, tag, file, line, filename);
+ if (q) {
+ __ao2_ref_debug(q, -1, tag, file, line, filename);
+ }
return NULL;
}
static inline struct call_queue *queue_unref(struct call_queue *q)
{
- ao2_ref(q, -1);
+ if (q) {
+ ao2_ref(q, -1);
+ }
return NULL;
}
#endif
"%s", ast_str_buffer(event_string));
}
-static void queue_publish_multi_channel_blob(struct ast_channel *caller, struct ast_channel *agent, struct stasis_message_type *type, struct ast_json *blob)
+static void queue_publish_multi_channel_snapshot_blob(struct stasis_topic *topic,
+ struct ast_channel_snapshot *caller_snapshot,
+ struct ast_channel_snapshot *agent_snapshot,
+ struct stasis_message_type *type, struct ast_json *blob)
{
RAII_VAR(struct ast_multi_channel_blob *, payload, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
- struct ast_channel_snapshot *caller_snapshot;
- struct ast_channel_snapshot *agent_snapshot;
payload = ast_multi_channel_blob_create(blob);
if (!payload) {
return;
}
- caller_snapshot = ast_channel_snapshot_create(caller);
- agent_snapshot = ast_channel_snapshot_create(agent);
-
- if (!caller_snapshot || !agent_snapshot) {
- return;
- }
-
ast_multi_channel_blob_add_channel(payload, "caller", caller_snapshot);
ast_multi_channel_blob_add_channel(payload, "agent", agent_snapshot);
return;
}
- stasis_publish(ast_channel_topic(caller), msg);
+ stasis_publish(topic, msg);
+}
+
+static void queue_publish_multi_channel_blob(struct ast_channel *caller, struct ast_channel *agent,
+ struct stasis_message_type *type, struct ast_json *blob)
+{
+ RAII_VAR(struct ast_channel_snapshot *, caller_snapshot, NULL, ao2_cleanup);
+ RAII_VAR(struct ast_channel_snapshot *, agent_snapshot, NULL, ao2_cleanup);
+
+ caller_snapshot = ast_channel_snapshot_create(caller);
+ agent_snapshot = ast_channel_snapshot_create(agent);
+
+ if (!caller_snapshot || !agent_snapshot) {
+ return;
+ }
+
+ queue_publish_multi_channel_snapshot_blob(ast_channel_topic(caller), caller_snapshot,
+ agent_snapshot, type, blob);
}
static void queue_publish_member_blob(struct stasis_message_type *type, struct ast_json *blob)
member_call_pending_clear(tmp->member);
- /* BUGBUG: Raise a BUSY dial end message here */
+ publish_dial_end_event(qe->chan, tmp, NULL, "BUSY");
tmp->stillgoing = 0;
++*busies;
return 0;
if (pos == 1 /* not found */) {
if (numlines == (numbusies + numnochan)) {
ast_debug(1, "Everyone is busy at this time\n");
- /* BUGBUG: We shouldn't have to set anything here, as each
- * individual dial attempt should have set that CDR to busy
- */
} else {
ast_debug(3, "No one is answering queue '%s' (%d numlines / %d busies / %d failed channels)\n", queue, numlines, numbusies, numnochan);
- /* BUGBUG: We shouldn't have to set anything here, as each
- * individual dial attempt should have set that CDR to busy
- */
}
*to = 0;
return NULL;
return res;
}
-#if 0 // BUGBUG
/*!
* \brief update the queue status
* \retval Always 0
ao2_unlock(q);
return 0;
}
-#endif // BUGBUG
/*! \brief Calculate the metric of each member in the outgoing callattempts
*
TRANSFER
};
-#if 0 // BUGBUG
/*! \brief Send out AMI message with member call completion status information */
-static void send_agent_complete(const struct queue_ent *qe, const char *queuename,
- const struct ast_channel *peer, const struct member *member, time_t callstart,
- char *vars, size_t vars_len, enum agent_complete_reason rsn)
+static void send_agent_complete(const char *queuename, struct ast_channel_snapshot *caller,
+ struct ast_channel_snapshot *peer, const struct member *member, time_t holdstart,
+ time_t callstart, enum agent_complete_reason rsn)
{
const char *reason = NULL; /* silence dumb compilers */
RAII_VAR(struct ast_json *, blob, NULL, ast_json_unref);
+ ast_assert(peer != NULL);
+ ast_assert(caller != NULL);
+
switch (rsn) {
case CALLER:
reason = "caller";
"Queue", queuename,
"Interface", member->interface,
"MemberName", member->membername,
- "HoldTime", (long)(callstart - qe->start)
- "TalkTime", (long)(time(NULL) - callstart)
+ "HoldTime", (long)(callstart - holdstart),
+ "TalkTime", (long)(time(NULL) - callstart),
"Reason", reason);
- queue_publish_multi_channel_blob(qe->chan, peer, queue_agent_complete_type(), blob);
+
+ queue_publish_multi_channel_snapshot_blob(ast_queue_topic(queuename), caller, peer,
+ queue_agent_complete_type(), blob);
}
-#endif // BUGBUG
-#if 0 // BUGBUG
-struct queue_transfer_ds {
- struct queue_ent *qe;
+static void queue_agent_cb(void *userdata, struct stasis_subscription *sub,
+ struct stasis_topic *topic, struct stasis_message *msg)
+{
+ struct ast_channel_blob *agent_blob;
+
+ agent_blob = stasis_message_data(msg);
+
+ if (ast_channel_agent_login_type() == stasis_message_type(msg)) {
+ ast_queue_log("NONE", agent_blob->snapshot->uniqueid,
+ ast_json_string_get(ast_json_object_get(agent_blob->blob, "agent")),
+ "AGENTLOGIN", "%s", agent_blob->snapshot->name);
+ } else if (ast_channel_agent_logoff_type() == stasis_message_type(msg)) {
+ ast_queue_log("NONE", agent_blob->snapshot->uniqueid,
+ ast_json_string_get(ast_json_object_get(agent_blob->blob, "agent")),
+ "AGENTLOGOFF", "%s|%ld", agent_blob->snapshot->name,
+ (long) ast_json_integer_get(ast_json_object_get(agent_blob->blob, "logintime")));
+ }
+}
+
+/*!
+ * \brief Structure representing relevant data during a local channel optimization
+ *
+ * The reason we care about local channel optimizations is that we want to be able
+ * to accurately report when the caller and queue member have stopped talking to
+ * each other. A local channel optimization can cause it to appear that the conversation
+ * has stopped immediately after it has begun. By tracking that the relevant channels
+ * to monitor have changed due to a local channel optimization, we can give accurate
+ * reports.
+ *
+ * Local channel optimizations for queues are restricted from their normal operation.
+ * Bridges created by queues can only be the destination of local channel optimizations,
+ * not the source. In addition, move-swap local channel optimizations are the only
+ * permitted types of local channel optimization.
+ *
+ * This data is populated when we are told that a local channel optimization begin
+ * is occurring. When we get told the optimization has ended successfully, we then
+ * apply the data here into the queue_stasis_data.
+ */
+struct local_optimization {
+ /*! The uniqueid of the channel that will be taking the place of the caller or member */
+ const char *source_chan_uniqueid;
+ /*! Indication of whether we think there is a local channel optimization in progress */
+ int in_progress;
+ /*! The identifier for this local channel optimization */
+ unsigned int id;
+};
+
+/*!
+ * \brief User data for stasis subscriptions used for queue calls.
+ *
+ * app_queue subscribes to channel and bridge events for all bridged calls.
+ * app_queue cares about the following events:
+ *
+ * \li bridge enter: To determine the unique ID of the bridge created for the call.
+ * \li blind transfer: To send an appropriate agent complete event.
+ * \li attended transfer: To send an appropriate agent complete event.
+ * \li local optimization: To update caller and member unique IDs for the call.
+ * \li hangup: To send an appropriate agent complete event.
+ *
+ * The stasis subscriptions last until we determine that the caller and the member
+ * are no longer bridged with each other.
+ */
+struct queue_stasis_data {
+ AST_DECLARE_STRING_FIELDS(
+ /*! The unique ID of the caller's channel. */
+ AST_STRING_FIELD(caller_uniqueid);
+ /*! The unique ID of the queue member's channel */
+ AST_STRING_FIELD(member_uniqueid);
+ /*! The unique ID of the bridge created by the queue */
+ AST_STRING_FIELD(bridge_uniqueid);
+ );
+ /*! The relevant queue */
+ struct call_queue *queue;
+ /*! The queue member that has answered the call */
struct member *member;
+ /*! The time at which the caller entered the queue. Start of the caller's hold time */
+ time_t holdstart;
+ /*! The time at which the member answered the call. */
time_t starttime;
+ /*! The original position of the caller when he entered the queue */
+ int caller_pos;
+ /*! Indication if the call was answered within the configured service level of the queue */
int callcompletedinsl;
+ /*! Indicates if the stasis subscriptions are shutting down */
+ int dying;
+ /*! The stasis message router for bridge events */
+ struct stasis_message_router *bridge_router;
+ /*! The stasis message router for channel events */
+ struct stasis_message_router *channel_router;
+ /*! Local channel optimization details for the caller */
+ struct local_optimization caller_optimize;
+ /*! Local channel optimization details for the member */
+ struct local_optimization member_optimize;
};
-#endif // BUGBUG
-#if 0 // BUGBUG
-static void queue_transfer_destroy(void *data)
+/*!
+ * \internal
+ * \brief Free memory for a queue_stasis_data
+ */
+static void queue_stasis_data_destructor(void *obj)
{
- struct queue_transfer_ds *qtds = data;
- ast_free(qtds);
+ struct queue_stasis_data *queue_data = obj;
+
+ /* This can only happen if refcounts for this object have got severely messed up */
+ ast_assert(queue_data->bridge_router == NULL);
+ ast_assert(queue_data->channel_router == NULL);
+
+ ao2_cleanup(queue_data->member);
+ queue_unref(queue_data->queue);
+ ast_string_field_free_memory(queue_data);
}
-#endif // BUGBUG
-#if 0 // BUGBUG
-/*! \brief a datastore used to help correctly log attended transfers of queue callers
+/*!
+ * \internal
+ * \brief End all stasis subscriptions on a queue_stasis_data
*/
-static const struct ast_datastore_info queue_transfer_info = {
- .type = "queue_transfer",
- .chan_fixup = queue_transfer_fixup,
- .destroy = queue_transfer_destroy,
-};
-#endif // BUGBUG
+static void remove_stasis_subscriptions(struct queue_stasis_data *queue_data)
+{
+ SCOPED_AO2LOCK(lock, queue_data);
+
+ queue_data->dying = 1;
+ stasis_message_router_unsubscribe(queue_data->bridge_router);
+ queue_data->bridge_router = NULL;
+ stasis_message_router_unsubscribe(queue_data->channel_router);
+ queue_data->channel_router = NULL;
+}
+
+/*!
+ * \internal
+ * \brief Allocate a queue_stasis_data and initialize its data.
+ */
+static struct queue_stasis_data *queue_stasis_data_alloc(struct queue_ent *qe,
+ struct ast_channel *peer, struct member *mem, time_t holdstart,
+ time_t starttime, int callcompletedinsl)
+{
+ struct queue_stasis_data *queue_data;
+
+ queue_data = ao2_alloc(sizeof(*queue_data), queue_stasis_data_destructor);
+ if (!queue_data) {
+ return NULL;
+ }
+
+ if (ast_string_field_init(queue_data, 64)) {
+ ao2_cleanup(queue_data);
+ return NULL;
+ }
-#if 0 // BUGBUG
-/*! \brief Log an attended transfer when a queue caller channel is masqueraded
+ ast_string_field_set(queue_data, caller_uniqueid, ast_channel_uniqueid(qe->chan));
+ ast_string_field_set(queue_data, member_uniqueid, ast_channel_uniqueid(peer));
+ queue_data->queue = queue_ref(qe->parent);
+ queue_data->starttime = starttime;
+ queue_data->holdstart = holdstart;
+ queue_data->callcompletedinsl = callcompletedinsl;
+ queue_data->caller_pos = qe->opos;
+ ao2_ref(mem, +1);
+ queue_data->member = mem;
+ return queue_data;
+}
+
+/*!
+ * \internal
+ * \brief Log an attended transfer in the queue log.
*
- * When a caller is masqueraded, we want to log a transfer. Fixup time is the closest we can come to when
- * the actual transfer occurs. This happens during the masquerade after datastores are moved from old_chan
- * to new_chan. This is why new_chan is referenced for exten, context, and datastore information.
+ * Attended transfer queue log messages vary based on the method by which the
+ * attended transfer was completed.
*
- * At the end of this, we want to remove the datastore so that this fixup function is not called on any
- * future masquerades of the caller during the current call.
+ * \param queue_data Data pertaining to the particular call in the queue.
+ * \param caller The channel snapshot for the caller channel in the queue.
+ * \param atxfer_msg The stasis attended transfer message data.
*/
-static void queue_transfer_fixup(void *data, struct ast_channel *old_chan, struct ast_channel *new_chan)
+static void log_attended_transfer(struct queue_stasis_data *queue_data, struct ast_channel_snapshot *caller,
+ struct ast_attended_transfer_message *atxfer_msg)
{
- struct queue_transfer_ds *qtds = data;
- struct queue_ent *qe = qtds->qe;
- struct member *member = qtds->member;
- time_t callstart = qtds->starttime;
- int callcompletedinsl = qtds->callcompletedinsl;
- struct ast_datastore *datastore;
+ RAII_VAR(struct ast_str *, transfer_str, ast_str_create(32), ast_free);
+
+ if (!transfer_str) {
+ ast_log(LOG_WARNING, "Unable to log attended transfer to queue log\n");
+ return;
+ }
+
+ switch (atxfer_msg->dest_type) {
+ case AST_ATTENDED_TRANSFER_DEST_BRIDGE_MERGE:
+ ast_str_set(&transfer_str, 0, "BRIDGE|%s", atxfer_msg->dest.bridge);
+ break;
+ case AST_ATTENDED_TRANSFER_DEST_APP:
+ ast_str_set(&transfer_str, 0, "APP|%s", atxfer_msg->dest.app);
+ break;
+ case AST_ATTENDED_TRANSFER_DEST_LINK:
+ ast_str_set(&transfer_str, 0, "LINK|%s|%s", atxfer_msg->dest.links[0]->name,
+ atxfer_msg->dest.links[1]->name);
+ break;
+ case AST_ATTENDED_TRANSFER_DEST_THREEWAY:
+ case AST_ATTENDED_TRANSFER_DEST_FAIL:
+ /* Threeways are headed off and should not be logged here */
+ ast_assert(0);
+ return;
+ }
+
+ ast_queue_log(queue_data->queue->name, caller->uniqueid, queue_data->member->membername, "ATTENDEDTRANSFER", "%s|%ld|%ld|%d",
+ ast_str_buffer(transfer_str),
+ (long) queue_data->starttime - queue_data->holdstart,
+ (long) time(NULL) - queue_data->starttime, queue_data->caller_pos);
+}
- ast_queue_log(qe->parent->name, ast_channel_uniqueid(qe->chan), member->membername, "TRANSFER", "%s|%s|%ld|%ld|%d",
- ast_channel_exten(new_chan), ast_channel_context(new_chan), (long) (callstart - qe->start),
- (long) (time(NULL) - callstart), qe->opos);
+/*!
+ * \internal
+ * \brief Handle a stasis bridge enter event.
+ *
+ * We track this particular event in order to learn what bridge
+ * was created for the queue call.
+ *
+ * \param userdata Data pertaining to the particular call in the queue.
+ * \param sub The stasis subscription on which the message occurred.
+ * \param topic The topic for this event.
+ * \param msg The stasis message for the bridge enter event
+ */
+static void handle_bridge_enter(void *userdata, struct stasis_subscription *sub,
+ struct stasis_topic *topic, struct stasis_message *msg)
+{
+ struct queue_stasis_data *queue_data = userdata;
+ struct ast_bridge_blob *enter_blob = stasis_message_data(msg);
+
+ if (queue_data->dying) {
+ return;
+ }
+
+ if (!ast_strlen_zero(queue_data->bridge_uniqueid)) {
+ return;
+ }
- update_queue(qe->parent, member, callcompletedinsl, (time(NULL) - callstart));
+ if (!strcmp(enter_blob->channel->uniqueid, queue_data->caller_uniqueid)) {
+ ast_string_field_set(queue_data, bridge_uniqueid,
+ enter_blob->bridge->uniqueid);
+ ast_debug(3, "Detected entry of caller channel %s into bridge %s\n",
+ enter_blob->channel->name, queue_data->bridge_uniqueid);
+ }
+}
- /* No need to lock the channels because they are already locked in ast_do_masquerade */
- if ((datastore = ast_channel_datastore_find(old_chan, &queue_transfer_info, NULL))) {
- ast_channel_datastore_remove(old_chan, datastore);
+/*!
+ * \brief Handle a blind transfer event
+ *
+ * This event is important in order to be able to log the end of the
+ * call to the queue log and to stasis.
+ *
+ * \param userdata Data pertaining to the particular call in the queue.
+ * \param sub The stasis subscription on which the message occurred.
+ * \param topic The topic for this event.
+ * \param msg The stasis message for the blind transfer event
+ */
+static void handle_blind_transfer(void *userdata, struct stasis_subscription *sub,
+ struct stasis_topic *topic, struct stasis_message *msg)
+{
+ struct queue_stasis_data *queue_data = userdata;
+ struct ast_bridge_blob *blind_blob = stasis_message_data(msg);
+ struct ast_json *result_blob;
+ struct ast_json *exten_blob;
+ struct ast_json *context_blob;
+ const char *exten;
+ const char *context;
+ RAII_VAR(struct ast_channel_snapshot *, caller_snapshot, NULL, ao2_cleanup);
+ RAII_VAR(struct ast_channel_snapshot *, member_snapshot, NULL, ao2_cleanup);
+
+ if (queue_data->dying) {
+ return;
+ }
+
+ result_blob = ast_json_object_get(blind_blob->blob, "result");
+ if (!result_blob) {
+ return;
+ }
+
+ if (ast_json_integer_get(result_blob) == AST_BRIDGE_TRANSFER_FAIL) {
+ return;
+ }
+
+ ao2_lock(queue_data);
+
+ if (ast_strlen_zero(queue_data->bridge_uniqueid) ||
+ strcmp(queue_data->bridge_uniqueid, blind_blob->bridge->uniqueid)) {
+ ao2_unlock(queue_data);
+ return;
+ }
+
+ caller_snapshot = ast_channel_snapshot_get_latest(queue_data->caller_uniqueid);
+ member_snapshot = ast_channel_snapshot_get_latest(queue_data->member_uniqueid);
+
+ ao2_unlock(queue_data);
+
+ exten_blob = ast_json_object_get(blind_blob->blob, "exten");
+ exten = exten_blob ? ast_json_string_get(exten_blob) : "<unknown>";
+ context_blob = ast_json_object_get(blind_blob->blob, "context");
+ context = context_blob ? ast_json_string_get(context_blob) : "<unknown>";
+
+ ast_debug(3, "Detected blind transfer in queue %s\n", queue_data->queue->name);
+ ast_queue_log(queue_data->queue->name, caller_snapshot->uniqueid, queue_data->member->membername,
+ "BLINDTRANSFER", "%s|%s|%ld|%ld|%d",
+ exten, context,
+ (long) queue_data->starttime - queue_data->holdstart,
+ (long) time(NULL) - queue_data->starttime, queue_data->caller_pos);
+
+ send_agent_complete(queue_data->queue->name, caller_snapshot, member_snapshot, queue_data->member,
+ queue_data->holdstart, queue_data->starttime, TRANSFER);
+ update_queue(queue_data->queue, queue_data->member, queue_data->callcompletedinsl,
+ time(NULL) - queue_data->starttime);
+ remove_stasis_subscriptions(queue_data);
+}
+
+/*!
+ * \brief Handle an attended transfer event
+ *
+ * This event is important in order to be able to log the end of the
+ * call to the queue log and to stasis.
+ *
+ * \param userdata Data pertaining to the particular call in the queue.
+ * \param sub The stasis subscription on which the message occurred.
+ * \param topic The topic for this event.
+ * \param msg The stasis message for the attended transfer event.
+ */
+static void handle_attended_transfer(void *userdata, struct stasis_subscription *sub,
+ struct stasis_topic *topic, struct stasis_message *msg)
+{
+ struct queue_stasis_data *queue_data = userdata;
+ struct ast_attended_transfer_message *atxfer_msg = stasis_message_data(msg);
+ RAII_VAR(struct ast_channel_snapshot *, caller_snapshot, NULL, ao2_cleanup);
+ RAII_VAR(struct ast_channel_snapshot *, member_snapshot, NULL, ao2_cleanup);
+
+ if (queue_data->dying) {
+ return;
+ }
+
+ if (atxfer_msg->result == AST_BRIDGE_TRANSFER_FAIL ||
+ atxfer_msg->dest_type == AST_ATTENDED_TRANSFER_DEST_THREEWAY) {
+ return;
+ }
+
+ ao2_lock(queue_data);
+
+ if (ast_strlen_zero(queue_data->bridge_uniqueid)) {
+ ao2_unlock(queue_data);
+ return;
+ }
+
+ if ((!atxfer_msg->to_transferee.bridge_snapshot || strcmp(queue_data->bridge_uniqueid,
+ atxfer_msg->to_transferee.bridge_snapshot->uniqueid)) &&
+ (!atxfer_msg->to_transfer_target.bridge_snapshot || strcmp(queue_data->bridge_uniqueid,
+ atxfer_msg->to_transfer_target.bridge_snapshot->uniqueid))) {
+ ao2_unlock(queue_data);
+ return;
+ }
+
+ caller_snapshot = ast_channel_snapshot_get_latest(queue_data->caller_uniqueid);
+ member_snapshot = ast_channel_snapshot_get_latest(queue_data->member_uniqueid);
+
+ ao2_unlock(queue_data);
+
+ ast_debug(3, "Detected attended transfer in queue %s\n", queue_data->queue->name);
+
+ log_attended_transfer(queue_data, caller_snapshot, atxfer_msg);
+
+ send_agent_complete(queue_data->queue->name, caller_snapshot, member_snapshot, queue_data->member,
+ queue_data->holdstart, queue_data->starttime, TRANSFER);
+ update_queue(queue_data->queue, queue_data->member, queue_data->callcompletedinsl,
+ time(NULL) - queue_data->starttime);
+ remove_stasis_subscriptions(queue_data);
+}
+
+/*!
+ * \internal
+ * \brief Callback for all stasis bridge events
+ *
+ * Based on the event and what bridge it is on, the task is farmed out to relevant
+ * subroutines for further processing.
+ */
+static void queue_bridge_cb(void *userdata, struct stasis_subscription *sub,
+ struct stasis_topic *topic, struct stasis_message *msg)
+{
+ if (stasis_subscription_final_message(sub, msg)) {
+ ao2_cleanup(userdata);
+ }
+}
+
+/*!
+ * \internal
+ * \brief Handler for the beginning of a local channel optimization
+ *
+ * This method gathers data relevant to the local channel optimization and stores
+ * it to be used once the local optimization completes.
+ *
+ * \param userdata Data pertaining to the particular call in the queue.
+ * \param sub The stasis subscription on which the message occurred.
+ * \param topic The topic for this event.
+ * \param msg The stasis message for the local optimization begin event
+ */
+static void handle_local_optimization_begin(void *userdata, struct stasis_subscription *sub,
+ struct stasis_topic *topic, struct stasis_message *msg)
+{
+ struct queue_stasis_data *queue_data = userdata;
+ struct ast_multi_channel_blob *optimization_blob = stasis_message_data(msg);
+ struct ast_channel_snapshot *local_one = ast_multi_channel_blob_get_channel(optimization_blob, "1");
+ struct ast_channel_snapshot *local_two = ast_multi_channel_blob_get_channel(optimization_blob, "2");
+ struct ast_channel_snapshot *source = ast_multi_channel_blob_get_channel(optimization_blob, "source");
+ struct local_optimization *optimization;
+ unsigned int id;
+ SCOPED_AO2LOCK(lock, queue_data);
+
+ if (queue_data->dying) {
+ return;
+ }
+
+ if (!strcmp(local_one->uniqueid, queue_data->member_uniqueid)) {
+ optimization = &queue_data->member_optimize;
+ } else if (!strcmp(local_two->uniqueid, queue_data->caller_uniqueid)) {
+ optimization = &queue_data->caller_optimize;
} else {
- ast_log(LOG_WARNING, "Can't find the queue_transfer datastore.\n");
+ return;
+ }
+
+ /* We only allow move-swap optimizations, so there had BETTER be a source */
+ ast_assert(source != NULL);
+
+ optimization->source_chan_uniqueid = ast_strdup(source->uniqueid);
+ if (!optimization->source_chan_uniqueid) {
+ ast_log(LOG_ERROR, "Unable to track local channel optimization for channel %s. Expect further errors\n", local_one->name);
+ return;
}
+ id = ast_json_integer_get(ast_json_object_get(ast_multi_channel_blob_get_json(optimization_blob), "id"));
+
+ optimization->id = id;
+ optimization->in_progress = 1;
}
-#endif // BUGBUG
-#if 0 // BUGBUG
-/*! \brief mechanism to tell if a queue caller was atxferred by a queue member.
+/*!
+ * \internal
+ * \brief Handler for the end of a local channel optimization
*
- * When a caller is atxferred, then the queue_transfer_info datastore
- * is removed from the channel. If it's still there after the bridge is
- * broken, then the caller was not atxferred.
+ * This method takes the data gathered during the local channel optimization begin
+ * event and applies it to the queue stasis data appropriately. This generally involves
+ * updating the caller or member unique ID with the channel that is taking the place of
+ * the previous caller or member.
*
- * \note Only call this with chan locked
+ * \param userdata Data pertaining to the particular call in the queue.
+ * \param sub The stasis subscription on which the message occurred.
+ * \param topic The topic for this event.
+ * \param msg The stasis message for the local optimization end event
*/
-static int attended_transfer_occurred(struct ast_channel *chan)
+static void handle_local_optimization_end(void *userdata, struct stasis_subscription *sub,
+ struct stasis_topic *topic, struct stasis_message *msg)
+{
+ struct queue_stasis_data *queue_data = userdata;
+ struct ast_multi_channel_blob *optimization_blob = stasis_message_data(msg);
+ struct ast_channel_snapshot *local_one = ast_multi_channel_blob_get_channel(optimization_blob, "1");
+ struct ast_channel_snapshot *local_two = ast_multi_channel_blob_get_channel(optimization_blob, "2");
+ struct local_optimization *optimization;
+ int is_caller;
+ unsigned int id;
+ SCOPED_AO2LOCK(lock, queue_data);
+
+ if (queue_data->dying) {
+ return;
+ }
+
+ if (!strcmp(local_one->uniqueid, queue_data->member_uniqueid)) {
+ optimization = &queue_data->member_optimize;
+ is_caller = 0;
+ } else if (!strcmp(local_two->uniqueid, queue_data->caller_uniqueid)) {
+ optimization = &queue_data->caller_optimize;
+ is_caller = 1;
+ } else {
+ return;
+ }
+
+ id = ast_json_integer_get(ast_json_object_get(ast_multi_channel_blob_get_json(optimization_blob), "id"));
+
+ if (!optimization->in_progress) {
+ ast_log(LOG_WARNING, "Told of a local optimization end when we had no previous begin\n");
+ return;
+ }
+
+ if (id != optimization->id) {
+ ast_log(LOG_WARNING, "Local optimization end event ID does not match begin (%u != %u)\n",
+ id, optimization->id);
+ return;
+ }
+
+ if (is_caller) {
+ ast_debug(3, "Local optimization: Changing queue caller uniqueid from %s to %s\n",
+ queue_data->caller_uniqueid, optimization->source_chan_uniqueid);
+ ast_string_field_set(queue_data, caller_uniqueid, optimization->source_chan_uniqueid);
+ } else {
+ ast_debug(3, "Local optimization: Changing queue member uniqueid from %s to %s\n",
+ queue_data->member_uniqueid, optimization->source_chan_uniqueid);
+ ast_string_field_set(queue_data, member_uniqueid, optimization->source_chan_uniqueid);
+ }
+
+ optimization->in_progress = 0;
+}
+
+/*!
+ * \internal
+ * \brief Handler for hangup stasis event
+ *
+ * This is how we determine that the caller or member has hung up and the call
+ * has ended. An appropriate queue log and stasis message are raised in this
+ * callback.
+ *
+ * \param userdata Data pertaining to the particular call in the queue.
+ * \param sub The stasis subscription on which the message occurred.
+ * \param topic The topic for this event.
+ * \param msg The stasis message for the hangup event.
+ */
+static void handle_hangup(void *userdata, struct stasis_subscription *sub,
+ struct stasis_topic *topic, struct stasis_message *msg)
{
- return ast_channel_datastore_find(chan, &queue_transfer_info, NULL) ? 0 : 1;
+ struct queue_stasis_data *queue_data = userdata;
+ struct ast_channel_blob *channel_blob = stasis_message_data(msg);
+ RAII_VAR(struct ast_channel_snapshot *, caller_snapshot, NULL, ao2_cleanup);
+ RAII_VAR(struct ast_channel_snapshot *, member_snapshot, NULL, ao2_cleanup);
+ RAII_VAR(struct ast_channel *, chan, NULL, ao2_cleanup);
+ enum agent_complete_reason reason;
+
+ if (queue_data->dying) {
+ return;
+ }
+
+ ao2_lock(queue_data);
+
+ if (!strcmp(channel_blob->snapshot->uniqueid, queue_data->caller_uniqueid)) {
+ reason = CALLER;
+ } else if (!strcmp(channel_blob->snapshot->uniqueid, queue_data->member_uniqueid)) {
+ reason = AGENT;
+ } else {
+ ao2_unlock(queue_data);
+ return;
+ }
+
+ chan = ast_channel_get_by_name(channel_blob->snapshot->name);
+ if (chan && ast_channel_has_role(chan, AST_TRANSFERER_ROLE_NAME)) {
+ /* Channel that is hanging up is doing it as part of a transfer.
+ * We'll get a transfer event later
+ */
+ ao2_unlock(queue_data);
+ return;
+ }
+
+ caller_snapshot = ast_channel_snapshot_get_latest(queue_data->caller_uniqueid);
+ member_snapshot = ast_channel_snapshot_get_latest(queue_data->member_uniqueid);
+
+ ao2_unlock(queue_data);
+
+ ast_debug(3, "Detected hangup of queue %s channel %s\n", reason == CALLER ? "caller" : "member",
+ channel_blob->snapshot->name);
+
+ ast_queue_log(queue_data->queue->name, caller_snapshot->uniqueid, queue_data->member->membername,
+ reason == CALLER ? "COMPLETECALLER" : "COMPLETEAGENT", "%ld|%ld|%d",
+ (long) (queue_data->starttime - queue_data->holdstart),
+ (long) (time(NULL) - queue_data->starttime), queue_data->caller_pos);
+
+ send_agent_complete(queue_data->queue->name, caller_snapshot, member_snapshot, queue_data->member,
+ queue_data->holdstart, queue_data->starttime, reason);
+ update_queue(queue_data->queue, queue_data->member, queue_data->callcompletedinsl,
+ time(NULL) - queue_data->starttime);
+ remove_stasis_subscriptions(queue_data);
}
-#endif // BUGBUG
-#if 0 // BUGBUG
-/*! \brief create a datastore for storing relevant info to log attended transfers in the queue_log
+/*!
+ * \internal
+ * \brief Callback for all stasis channel events
+ *
+ * Based on the event and the channels involved, the work is farmed out into
+ * subroutines for further processing.
*/
-static struct ast_datastore *setup_transfer_datastore(struct queue_ent *qe, struct member *member, time_t starttime, int callcompletedinsl)
+static void queue_channel_cb(void *userdata, struct stasis_subscription *sub,
+ struct stasis_topic *topic, struct stasis_message *msg)
{
- struct ast_datastore *ds;
- struct queue_transfer_ds *qtds = ast_calloc(1, sizeof(*qtds));
+ if (stasis_subscription_final_message(sub, msg)) {
+ ao2_cleanup(userdata);
+ }
+}
- if (!qtds) {
- ast_log(LOG_WARNING, "Memory allocation error!\n");
- return NULL;
+/*!
+ * \internal
+ * \brief Create stasis subscriptions for a particular call in the queue.
+ *
+ * These subscriptions are created once the call has been answered. The subscriptions
+ * are put in place so that call progress may be tracked. Once the call can be determined
+ * to have ended, then messages are logged to the queue log and stasis events are emitted.
+ *
+ * \param qe The queue entry representing the caller
+ * \param peer The channel that has answered the call
+ * \param mem The queue member that answered the call
+ * \param holdstart The time at which the caller entered the queue
+ * \param starttime The time at which the call was answered
+ * \param callcompletedinsl Indicates if the call was answered within the configured service level of the queue.
+ * \retval 0 Success
+ * \retval non-zero Failure
+ */
+static int setup_stasis_subs(struct queue_ent *qe, struct ast_channel *peer, struct member *mem,
+ time_t holdstart, time_t starttime, int callcompletedinsl)
+{
+ struct queue_stasis_data *queue_data = queue_stasis_data_alloc(qe, peer, mem, holdstart, starttime, callcompletedinsl);
+
+ if (!queue_data) {
+ return -1;
}
- ast_channel_lock(qe->chan);
- if (!(ds = ast_datastore_alloc(&queue_transfer_info, NULL))) {
- ast_channel_unlock(qe->chan);
- ast_free(qtds);
- ast_log(LOG_WARNING, "Unable to create transfer datastore. queue_log will not show attended transfer\n");
- return NULL;
+ queue_data->bridge_router = stasis_message_router_create(ast_bridge_topic_all());
+ if (!queue_data->bridge_router) {
+ ao2_ref(queue_data, -1);
+ return -1;
}
- qtds->qe = qe;
- /* This member is refcounted in try_calling, so no need to add it here, too */
- qtds->member = member;
- qtds->starttime = starttime;
- qtds->callcompletedinsl = callcompletedinsl;
- ds->data = qtds;
- ast_channel_datastore_add(qe->chan, ds);
- ast_channel_unlock(qe->chan);
- return ds;
+ stasis_message_router_add(queue_data->bridge_router, ast_channel_entered_bridge_type(),
+ handle_bridge_enter, queue_data);
+ stasis_message_router_add(queue_data->bridge_router, ast_blind_transfer_type(),
+ handle_blind_transfer, queue_data);
+ stasis_message_router_add(queue_data->bridge_router, ast_attended_transfer_type(),
+ handle_attended_transfer, queue_data);
+ stasis_message_router_set_default(queue_data->bridge_router,
+ queue_bridge_cb, queue_data);
+
+ queue_data->channel_router = stasis_message_router_create(ast_channel_topic_all());
+ if (!queue_data->channel_router) {
+ /* Unsubscribing from the bridge router will remove the only ref of queue_data,
+ * thus beginning the destruction process
+ */
+ stasis_message_router_unsubscribe(queue_data->bridge_router);
+ queue_data->bridge_router = NULL;
+ return -1;
+ }
+
+ ao2_ref(queue_data, +1);
+ stasis_message_router_add(queue_data->channel_router, ast_local_optimization_begin_type(),
+ handle_local_optimization_begin, queue_data);
+ stasis_message_router_add(queue_data->channel_router, ast_local_optimization_end_type(),
+ handle_local_optimization_end, queue_data);
+ stasis_message_router_add(queue_data->channel_router, ast_channel_hangup_request_type(),
+ handle_hangup, queue_data);
+ stasis_message_router_set_default(queue_data->channel_router,
+ queue_channel_cb, queue_data);
+
+ return 0;
}
-#endif // BUGBUG
struct queue_end_bridge {
struct call_queue *q;
}
}
+static void escape_and_substitute(struct ast_channel *chan, const char *input,
+ char *output, size_t size)
+{
+ const char *m = input;
+ char escaped[size];
+ char *p;
+
+ for (p = escaped; p < escaped + size - 1; p++, m++) {
+ switch (*m) {
+ case '^':
+ if (*(m + 1) == '{') {
+ *p = '$';
+ }
+ break;
+ case ',':
+ *p++ = '\\';
+ /* Fall through */
+ default:
+ *p = *m;
+ }
+ if (*m == '\0')
+ break;
+ }
+
+ if (p == escaped + size) {
+ escaped[size - 1] = '\0';
+ }
+
+ pbx_substitute_variables_helper(chan, escaped, output, size - 1);
+}
+
+static void setup_mixmonitor(struct queue_ent *qe, const char *filename)
+{
+ char escaped_filename[256];
+ char file_with_ext[256];
+ char mixmonargs[1512];
+ char escaped_monitor_exec[1024];
+ const char *monitor_options;
+ const char *monitor_exec;
+
+ if (filename) {
+ escape_and_substitute(qe->chan, filename, escaped_filename, sizeof(escaped_filename));
+ } else {
+ ast_copy_string(escaped_filename, ast_channel_uniqueid(qe->chan), sizeof(escaped_filename));
+ }
+
+ ast_channel_lock(qe->chan);
+ if ((monitor_exec = pbx_builtin_getvar_helper(qe->chan, "MONITOR_EXEC"))) {
+ monitor_exec = ast_strdupa(monitor_exec);
+ }
+ if ((monitor_options = pbx_builtin_getvar_helper(qe->chan, "MONITOR_OPTIONS"))) {
+ monitor_options = ast_strdupa(monitor_options);
+ } else {
+ monitor_options = "";
+ }
+ ast_channel_unlock(qe->chan);
+
+ if (monitor_exec) {
+ escape_and_substitute(qe->chan, monitor_exec, escaped_monitor_exec, sizeof(escaped_monitor_exec));
+ }
+
+ snprintf(file_with_ext, sizeof(file_with_ext), "%s.%s", escaped_filename, qe->parent->monfmt);
+
+ if (!ast_strlen_zero(escaped_monitor_exec)) {
+ snprintf(mixmonargs, sizeof(mixmonargs), "b%s,%s", monitor_options, escaped_monitor_exec);
+ } else {
+ snprintf(mixmonargs, sizeof(mixmonargs), "b%s", monitor_options);
+ }
+
+ ast_debug(1, "Arguments being passed to MixMonitor: %s,%s\n", file_with_ext, mixmonargs);
+
+ if (ast_start_mixmonitor(qe->chan, file_with_ext, mixmonargs)) {
+ ast_log(LOG_WARNING, "Unable to start mixmonitor. Is the MixMonitor app loaded?\n");
+ }
+}
+
/*!
* \internal
* \brief A large function which calls members, updates statistics, and bridges the caller and a member
int x=0;
char *announce = NULL;
char digit = 0;
-#if 0 // BUGBUG
time_t callstart;
-#endif // BUGBUG
time_t now = time(NULL);
struct ast_bridge_config bridge_config;
char nondataquality = 1;
char *macroexec = NULL;
char *gosubexec = NULL;
const char *monitorfilename;
- const char *monitor_exec;
- const char *monitor_options;
- char tmpid[256], tmpid2[256];
- char meid[1024], meid2[1024];
- char mixmonargs[1512];
- struct ast_app *mixmonapp = NULL;
- char *p;
+ char tmpid[256];
+ char meid[1024];
int forwardsallowed = 1;
int block_connected_line = 0;
-#if 0 // BUGBUG
int callcompletedinsl;
-#endif // BUGBUG
struct ao2_iterator memi;
struct ast_datastore *datastore;
-#if 0 // BUGBUG
- struct ast_datastore *transfer_ds;
-#endif // BUGBUG
struct queue_end_bridge *queue_end_bridge = NULL;
ast_channel_lock(qe->chan);
time(&now);
recalc_holdtime(qe, (now - qe->start));
ao2_lock(qe->parent);
-#if 0 // BUGBUG
callcompletedinsl = ((now - qe->start) <= qe->parent->servicelevel);
-#endif // BUGBUG
ao2_unlock(qe->parent);
member = lpeer->member;
/* Increment the refcount for this member, since we're going to be using it for awhile in here. */
ast_monitor_setjoinfiles(which, 1);
}
} else {
- mixmonapp = pbx_findapp("MixMonitor");
-
- if (mixmonapp) {
- ast_debug(1, "Starting MixMonitor as requested.\n");
- if (!monitorfilename) {
- if (qe->chan) {
- ast_copy_string(tmpid, ast_channel_uniqueid(qe->chan), sizeof(tmpid));
- } else {
- snprintf(tmpid, sizeof(tmpid), "chan-%lx", ast_random());
- }
- } else {
- const char *m = monitorfilename;
- for (p = tmpid2; p < tmpid2 + sizeof(tmpid2) - 1; p++, m++) {
- switch (*m) {
- case '^':
- if (*(m + 1) == '{')
- *p = '$';
- break;
- case ',':
- *p++ = '\\';
- /* Fall through */
- default:
- *p = *m;
- }
- if (*m == '\0')
- break;
- }
- if (p == tmpid2 + sizeof(tmpid2))
- tmpid2[sizeof(tmpid2) - 1] = '\0';
-
- pbx_substitute_variables_helper(qe->chan, tmpid2, tmpid, sizeof(tmpid) - 1);
- }
-
- ast_channel_lock(qe->chan);
- if ((monitor_exec = pbx_builtin_getvar_helper(qe->chan, "MONITOR_EXEC"))) {
- monitor_exec = ast_strdupa(monitor_exec);
- }
- if ((monitor_options = pbx_builtin_getvar_helper(qe->chan, "MONITOR_OPTIONS"))) {
- monitor_options = ast_strdupa(monitor_options);
- } else {
- monitor_options = "";
- }
- ast_channel_unlock(qe->chan);
-
- if (monitor_exec) {
- const char *m = monitor_exec;
- for (p = meid2; p < meid2 + sizeof(meid2) - 1; p++, m++) {
- switch (*m) {
- case '^':
- if (*(m + 1) == '{')
- *p = '$';
- break;
- case ',':
- *p++ = '\\';
- /* Fall through */
- default:
- *p = *m;
- }
- if (*m == '\0') {
- break;
- }
- }
- if (p == meid2 + sizeof(meid2)) {
- meid2[sizeof(meid2) - 1] = '\0';
- }
-
- pbx_substitute_variables_helper(qe->chan, meid2, meid, sizeof(meid) - 1);
- }
-
- snprintf(tmpid2, sizeof(tmpid2), "%s.%s", tmpid, qe->parent->monfmt);
-
- if (!ast_strlen_zero(monitor_exec)) {
- snprintf(mixmonargs, sizeof(mixmonargs), "%s,b%s,%s", tmpid2, monitor_options, monitor_exec);
- } else {
- snprintf(mixmonargs, sizeof(mixmonargs), "%s,b%s", tmpid2, monitor_options);
- }
-
- ast_debug(1, "Arguments being passed to MixMonitor: %s\n", mixmonargs);
- /* BUGBUG
- * This needs to be done differently. We need to start a MixMonitor on
- * the actual queue bridge itself, not drop some channel out and pull it
- * back. Once the media channel work is done, start a media channel on
- * the bridge.
- *
- * Alternatively, don't use pbx_exec to put an audio hook on a channel.
- */
- pbx_exec(qe->chan, mixmonapp, mixmonargs);
- } else {
- ast_log(LOG_WARNING, "Asked to run MixMonitor on this call, but cannot find the MixMonitor app!\n");
- }
+ setup_mixmonitor(qe, monitorfilename);
}
}
/* Drop out of the queue at this point, to prepare for next caller */
queue_t_ref(qe->parent, "For bridge_config reference");
}
-#if 0 // BUGBUG
time(&callstart);
- transfer_ds = setup_transfer_datastore(qe, member, callstart, callcompletedinsl);
-#endif // BUGBUG
- bridge = ast_bridge_call(qe->chan, peer, &bridge_config);
-
-/* BUGBUG need to do this queue logging a different way because we cannot reference peer anymore. Likely needs to be made a subscriber of stasis transfer events. */
-#if 0 // BUGBUG
- /* If the queue member did an attended transfer, then the TRANSFER already was logged in the queue_log
- * when the masquerade occurred. These other "ending" queue_log messages are unnecessary, except for
- * the AgentComplete manager event
- */
- ast_channel_lock(qe->chan);
- if (!attended_transfer_occurred(qe->chan)) {
- struct ast_datastore *tds;
-
- /* detect a blind transfer */
- if (!(ast_channel_softhangup_internal_flag(qe->chan) | ast_channel_softhangup_internal_flag(peer)) && (strcasecmp(oldcontext, ast_channel_context(qe->chan)) || strcasecmp(oldexten, ast_channel_exten(qe->chan)))) {
- ast_queue_log(queuename, ast_channel_uniqueid(qe->chan), member->membername, "TRANSFER", "%s|%s|%ld|%ld|%d",
- ast_channel_exten(qe->chan), ast_channel_context(qe->chan), (long) (callstart - qe->start),
- (long) (time(NULL) - callstart), qe->opos);
- send_agent_complete(qe, queuename, peer, member, callstart, vars, sizeof(vars), TRANSFER);
- } else if (ast_check_hangup(qe->chan) && !ast_check_hangup(peer)) {
- ast_queue_log(queuename, ast_channel_uniqueid(qe->chan), member->membername, "COMPLETECALLER", "%ld|%ld|%d",
- (long) (callstart - qe->start), (long) (time(NULL) - callstart), qe->opos);
- send_agent_complete(qe, queuename, peer, member, callstart, vars, sizeof(vars), CALLER);
- } else {
- ast_queue_log(queuename, ast_channel_uniqueid(qe->chan), member->membername, "COMPLETEAGENT", "%ld|%ld|%d",
- (long) (callstart - qe->start), (long) (time(NULL) - callstart), qe->opos);
- send_agent_complete(qe, queuename, peer, member, callstart, vars, sizeof(vars), AGENT);
- }
- if ((tds = ast_channel_datastore_find(qe->chan, &queue_transfer_info, NULL))) {
- ast_channel_datastore_remove(qe->chan, tds);
- }
- ast_channel_unlock(qe->chan);
- update_queue(qe->parent, member, callcompletedinsl, (time(NULL) - callstart));
- } else {
- ast_channel_unlock(qe->chan);
-
- /* We already logged the TRANSFER on the queue_log, but we still need to send the AgentComplete event */
- send_agent_complete(qe, queuename, peer, member, callstart, vars, sizeof(vars), TRANSFER);
- }
-
- if (transfer_ds) {
- ast_datastore_free(transfer_ds);
- }
-#endif // BUGBUG
+ setup_stasis_subs(qe, peer, member, qe->start, callstart, callcompletedinsl);
+ bridge = ast_bridge_call_with_flags(qe->chan, peer, &bridge_config,
+ AST_BRIDGE_FLAG_MERGE_INHIBIT_FROM | AST_BRIDGE_FLAG_MERGE_INHIBIT_TO | AST_BRIDGE_FLAG_SWAP_INHIBIT_FROM);
res = bridge ? bridge : 1;
ao2_ref(member, -1);
AST_DATA_ENTRY("asterisk/application/queue/list", &queues_data_provider),
};
+static struct stasis_message_router *agent_router;
+static struct stasis_subscription *topic_forwarder;
+
static int unload_module(void)
{
int res;
stasis_message_router_remove(message_router, queue_agent_dump_type());
stasis_message_router_remove(message_router, queue_agent_ringnoanswer_type());
}
+ stasis_message_router_unsubscribe_and_join(agent_router);
+ topic_forwarder = stasis_unsubscribe(topic_forwarder);
STASIS_MESSAGE_TYPE_CLEANUP(queue_caller_join_type);
STASIS_MESSAGE_TYPE_CLEANUP(queue_caller_leave_type);
int res;
struct ast_flags mask = {AST_FLAGS_ALL, };
struct ast_config *member_config;
- struct stasis_message_router *message_router;
+ struct stasis_message_router *manager_router;
+ struct stasis_topic *queue_topic;
+ struct stasis_topic *manager_topic;
queues = ao2_container_alloc(MAX_QUEUE_BUCKETS, queue_hash_cb, queue_cmp_cb);
res = -1;
}
- message_router = ast_manager_get_message_router();
- if (!message_router) {
+ manager_topic = ast_manager_get_topic();
+ if (!manager_topic) {
+ return AST_MODULE_LOAD_DECLINE;
+ }
+ manager_router = ast_manager_get_message_router();
+ if (!manager_router) {
+ return AST_MODULE_LOAD_DECLINE;
+ }
+ queue_topic = ast_queue_topic_all();
+ if (!queue_topic) {
+ return AST_MODULE_LOAD_DECLINE;
+ }
+ topic_forwarder = stasis_forward_all(queue_topic, manager_topic);
+ if (!topic_forwarder) {
+ return AST_MODULE_LOAD_DECLINE;
+ }
+
+ agent_router = stasis_message_router_create(ast_channel_topic_all());
+ if (!agent_router) {
return AST_MODULE_LOAD_DECLINE;
}
STASIS_MESSAGE_TYPE_INIT(queue_agent_dump_type);
STASIS_MESSAGE_TYPE_INIT(queue_agent_ringnoanswer_type);
- stasis_message_router_add(message_router,
+ stasis_message_router_add(manager_router,
queue_caller_join_type(),
queue_channel_manager_event,
"QueueCallerJoin");
- stasis_message_router_add(message_router,
+ stasis_message_router_add(manager_router,
queue_caller_leave_type(),
queue_channel_manager_event,
"QueueCallerLeave");
- stasis_message_router_add(message_router,
+ stasis_message_router_add(manager_router,
queue_caller_abandon_type(),
queue_channel_manager_event,
"QueueCallerAbandon");
- stasis_message_router_add(message_router,
+ stasis_message_router_add(manager_router,
queue_member_status_type(),
queue_member_manager_event,
"QueueMemberStatus");
- stasis_message_router_add(message_router,
+ stasis_message_router_add(manager_router,
queue_member_added_type(),
queue_member_manager_event,
"QueueMemberAdded");
- stasis_message_router_add(message_router,
+ stasis_message_router_add(manager_router,
queue_member_removed_type(),
queue_member_manager_event,
"QueueMemberRemoved");
- stasis_message_router_add(message_router,
+ stasis_message_router_add(manager_router,
queue_member_pause_type(),
queue_member_manager_event,
"QueueMemberPause");
- stasis_message_router_add(message_router,
+ stasis_message_router_add(manager_router,
queue_member_penalty_type(),
queue_member_manager_event,
"QueueMemberPenalty");
- stasis_message_router_add(message_router,
+ stasis_message_router_add(manager_router,
queue_member_ringinuse_type(),
queue_member_manager_event,
"QueueMemberRinginuse");
- stasis_message_router_add(message_router,
+ stasis_message_router_add(manager_router,
queue_agent_called_type(),
queue_multi_channel_manager_event,
"AgentCalled");
- stasis_message_router_add(message_router,
+ stasis_message_router_add(manager_router,
queue_agent_connect_type(),
queue_multi_channel_manager_event,
"AgentConnect");
- stasis_message_router_add(message_router,
+ stasis_message_router_add(manager_router,
queue_agent_complete_type(),
queue_multi_channel_manager_event,
"AgentComplete");
- stasis_message_router_add(message_router,
+ stasis_message_router_add(manager_router,
queue_agent_dump_type(),
queue_multi_channel_manager_event,
"AgentDump");
- stasis_message_router_add(message_router,
+ stasis_message_router_add(manager_router,
queue_agent_ringnoanswer_type(),
queue_multi_channel_manager_event,
"AgentRingNoAnswer");
+ stasis_message_router_add(agent_router,
+ ast_channel_agent_login_type(),
+ queue_agent_cb,
+ NULL);
+
+ stasis_message_router_add(agent_router,
+ ast_channel_agent_logoff_type(),
+ queue_agent_cb,
+ NULL);
+
ast_extension_state_add(NULL, NULL, extension_state_cb, NULL);
return res ? AST_MODULE_LOAD_DECLINE : 0;