Make app_queue and res_agi independent of AMI being enabled.
authorRichard Mudgett <rmudgett@digium.com>
Tue, 8 Oct 2013 15:12:46 +0000 (15:12 +0000)
committerRichard Mudgett <rmudgett@digium.com>
Tue, 8 Oct 2013 15:12:46 +0000 (15:12 +0000)
The https://reviewboard.asterisk.org/r/2888/ review changes manager to not
subscribe to stasis when it is disabled for performance reasons.  When
manager is disabled app_queue and res_agi decline to load and fail to
clean up what they have already allocated.

* Made app_queue and res_agi clean up allocated resources when they
decline to load.

* Made app_queue and res_agi use their own subscriptions to the stasis
topics instead of borrowing manager's message router structure
inappropriately.

(closes issue ASTERISK-22604)
Reported by: rmudgett

Review: https://reviewboard.asterisk.org/r/2902/
........

Merged revisions 400671 from http://svn.asterisk.org/svn/asterisk/branches/12

git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@400672 65c4cc65-6c06-0410-ace0-fbb531ad65f3

apps/app_queue.c
res/res_agi.c

index e37f230..10a1330 100644 (file)
@@ -1818,54 +1818,116 @@ static inline void insert_entry(struct call_queue *q, struct queue_ent *prev, st
        new->opos = *pos;
 }
 
-STASIS_MESSAGE_TYPE_DEFN_LOCAL(queue_caller_join_type);
-STASIS_MESSAGE_TYPE_DEFN_LOCAL(queue_caller_leave_type);
-STASIS_MESSAGE_TYPE_DEFN_LOCAL(queue_caller_abandon_type);
-
-STASIS_MESSAGE_TYPE_DEFN_LOCAL(queue_member_status_type);
-STASIS_MESSAGE_TYPE_DEFN_LOCAL(queue_member_added_type);
-STASIS_MESSAGE_TYPE_DEFN_LOCAL(queue_member_removed_type);
-STASIS_MESSAGE_TYPE_DEFN_LOCAL(queue_member_pause_type);
-STASIS_MESSAGE_TYPE_DEFN_LOCAL(queue_member_penalty_type);
-STASIS_MESSAGE_TYPE_DEFN_LOCAL(queue_member_ringinuse_type);
-
-STASIS_MESSAGE_TYPE_DEFN_LOCAL(queue_agent_called_type);
-STASIS_MESSAGE_TYPE_DEFN_LOCAL(queue_agent_connect_type);
-STASIS_MESSAGE_TYPE_DEFN_LOCAL(queue_agent_complete_type);
-STASIS_MESSAGE_TYPE_DEFN_LOCAL(queue_agent_dump_type);
-STASIS_MESSAGE_TYPE_DEFN_LOCAL(queue_agent_ringnoanswer_type);
-
-static void queue_channel_manager_event(void *data,
-       struct stasis_subscription *sub,
-       struct stasis_message *message)
-{
-       const char *type = data;
+static struct ast_manager_event_blob *queue_channel_to_ami(const char *type, struct stasis_message *message)
+{
        struct ast_channel_blob *obj = stasis_message_data(message);
-       RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
+       RAII_VAR(struct ast_str *, channel_string, NULL, ast_free);
        RAII_VAR(struct ast_str *, event_string, NULL, ast_free);
 
-       channel_event_string = ast_manager_build_channel_state_string(obj->snapshot);
-       if (!channel_event_string) {
-               return;
+       channel_string = ast_manager_build_channel_state_string(obj->snapshot);
+       event_string = ast_manager_str_from_json_object(obj->blob, NULL);
+       if (!channel_string || !event_string) {
+               return NULL;
        }
 
-       event_string = ast_manager_str_from_json_object(obj->blob, NULL);
+       return ast_manager_event_blob_create(EVENT_FLAG_AGENT, type,
+               "%s"
+               "%s",
+               ast_str_buffer(channel_string),
+               ast_str_buffer(event_string));
+}
+
+static struct ast_manager_event_blob *queue_caller_join_to_ami(struct stasis_message *message)
+{
+       return queue_channel_to_ami("QueueCallerJoin", message);
+}
+
+static struct ast_manager_event_blob *queue_caller_leave_to_ami(struct stasis_message *message)
+{
+       return queue_channel_to_ami("QueueCallerLeave", message);
+}
+
+static struct ast_manager_event_blob *queue_caller_abandon_to_ami(struct stasis_message *message)
+{
+       return queue_channel_to_ami("QueueCallerAbandon", message);
+}
+
+STASIS_MESSAGE_TYPE_DEFN_LOCAL(queue_caller_join_type,
+       .to_ami = queue_caller_join_to_ami,
+       );
+STASIS_MESSAGE_TYPE_DEFN_LOCAL(queue_caller_leave_type,
+       .to_ami = queue_caller_leave_to_ami,
+       );
+STASIS_MESSAGE_TYPE_DEFN_LOCAL(queue_caller_abandon_type,
+       .to_ami = queue_caller_abandon_to_ami,
+       );
+
+static struct ast_manager_event_blob *queue_member_to_ami(const char *type, struct stasis_message *message)
+{
+       struct ast_json_payload *payload = stasis_message_data(message);
+       RAII_VAR(struct ast_str *, event_string, NULL, ast_free);
+
+       event_string = ast_manager_str_from_json_object(payload->json, NULL);
        if (!event_string) {
-               return;
+               return NULL;
        }
 
-       manager_event(EVENT_FLAG_AGENT, type,
-               "%s"
+       return ast_manager_event_blob_create(EVENT_FLAG_AGENT, type,
                "%s",
-               ast_str_buffer(channel_event_string),
                ast_str_buffer(event_string));
 }
 
-static void queue_multi_channel_manager_event(void *data,
-       struct stasis_subscription *sub,
-       struct stasis_message *message)
+static struct ast_manager_event_blob *queue_member_status_to_ami(struct stasis_message *message)
+{
+       return queue_member_to_ami("QueueMemberStatus", message);
+}
+
+static struct ast_manager_event_blob *queue_member_added_to_ami(struct stasis_message *message)
+{
+       return queue_member_to_ami("QueueMemberAdded", message);
+}
+
+static struct ast_manager_event_blob *queue_member_removed_to_ami(struct stasis_message *message)
+{
+       return queue_member_to_ami("QueueMemberRemoved", message);
+}
+
+static struct ast_manager_event_blob *queue_member_pause_to_ami(struct stasis_message *message)
+{
+       return queue_member_to_ami("QueueMemberPause", message);
+}
+
+static struct ast_manager_event_blob *queue_member_penalty_to_ami(struct stasis_message *message)
+{
+       return queue_member_to_ami("QueueMemberPenalty", message);
+}
+
+static struct ast_manager_event_blob *queue_member_ringinuse_to_ami(struct stasis_message *message)
+{
+       return queue_member_to_ami("QueueMemberRinginuse", message);
+}
+
+STASIS_MESSAGE_TYPE_DEFN_LOCAL(queue_member_status_type,
+       .to_ami = queue_member_status_to_ami,
+       );
+STASIS_MESSAGE_TYPE_DEFN_LOCAL(queue_member_added_type,
+       .to_ami = queue_member_added_to_ami,
+       );
+STASIS_MESSAGE_TYPE_DEFN_LOCAL(queue_member_removed_type,
+       .to_ami = queue_member_removed_to_ami,
+       );
+STASIS_MESSAGE_TYPE_DEFN_LOCAL(queue_member_pause_type,
+       .to_ami = queue_member_pause_to_ami,
+       );
+STASIS_MESSAGE_TYPE_DEFN_LOCAL(queue_member_penalty_type,
+       .to_ami = queue_member_penalty_to_ami,
+       );
+STASIS_MESSAGE_TYPE_DEFN_LOCAL(queue_member_ringinuse_type,
+       .to_ami = queue_member_ringinuse_to_ami,
+       );
+
+static struct ast_manager_event_blob *queue_multi_channel_to_ami(const char *type, struct stasis_message *message)
 {
-       const char *type = data;
        struct ast_multi_channel_blob *obj = stasis_message_data(message);
        struct ast_channel_snapshot *caller;
        struct ast_channel_snapshot *agent;
@@ -1877,8 +1939,8 @@ static void queue_multi_channel_manager_event(void *data,
        if (caller) {
                caller_event_string = ast_manager_build_channel_state_string(caller);
                if (!caller_event_string) {
-                       ast_log(AST_LOG_NOTICE, "No caller event string, bailing\n");
-                       return;
+                       ast_log(LOG_NOTICE, "No caller event string, bailing\n");
+                       return NULL;
                }
        }
 
@@ -1886,17 +1948,17 @@ static void queue_multi_channel_manager_event(void *data,
        if (agent) {
                agent_event_string = ast_manager_build_channel_state_string_prefix(agent, "Dest");
                if (!agent_event_string) {
-                       ast_log(AST_LOG_NOTICE, "No agent event string, bailing\n");
-                       return;
+                       ast_log(LOG_NOTICE, "No agent event string, bailing\n");
+                       return NULL;
                }
        }
 
        event_string = ast_manager_str_from_json_object(ast_multi_channel_blob_get_json(obj), NULL);
        if (!event_string) {
-               return;
+               return NULL;
        }
 
-       manager_event(EVENT_FLAG_AGENT, type,
+       return ast_manager_event_blob_create(EVENT_FLAG_AGENT, type,
                "%s"
                "%s"
                "%s",
@@ -1905,24 +1967,47 @@ static void queue_multi_channel_manager_event(void *data,
                ast_str_buffer(event_string));
 }
 
-static void queue_member_manager_event(void *data,
-       struct stasis_subscription *sub,
-       struct stasis_message *message)
+static struct ast_manager_event_blob *queue_agent_called_to_ami(struct stasis_message *message)
 {
-       const char *type = data;
-       struct ast_json_payload *payload = stasis_message_data(message);
-       struct ast_json *event = payload->json;
-       RAII_VAR(struct ast_str *, event_string, NULL, ast_free);
+       return queue_multi_channel_to_ami("AgentCalled", message);
+}
 
-       event_string = ast_manager_str_from_json_object(event, NULL);
-       if (!event_string) {
-               return;
-       }
+static struct ast_manager_event_blob *queue_agent_connect_to_ami(struct stasis_message *message)
+{
+       return queue_multi_channel_to_ami("AgentConnect", message);
+}
 
-       manager_event(EVENT_FLAG_AGENT, type,
-               "%s", ast_str_buffer(event_string));
+static struct ast_manager_event_blob *queue_agent_complete_to_ami(struct stasis_message *message)
+{
+       return queue_multi_channel_to_ami("AgentComplete", message);
 }
 
+static struct ast_manager_event_blob *queue_agent_dump_to_ami(struct stasis_message *message)
+{
+       return queue_multi_channel_to_ami("AgentDump", message);
+}
+
+static struct ast_manager_event_blob *queue_agent_ringnoanswer_to_ami(struct stasis_message *message)
+{
+       return queue_multi_channel_to_ami("AgentRingNoAnswer", message);
+}
+
+STASIS_MESSAGE_TYPE_DEFN_LOCAL(queue_agent_called_type,
+       .to_ami = queue_agent_called_to_ami,
+       );
+STASIS_MESSAGE_TYPE_DEFN_LOCAL(queue_agent_connect_type,
+       .to_ami = queue_agent_connect_to_ami,
+       );
+STASIS_MESSAGE_TYPE_DEFN_LOCAL(queue_agent_complete_type,
+       .to_ami = queue_agent_complete_to_ami,
+       );
+STASIS_MESSAGE_TYPE_DEFN_LOCAL(queue_agent_dump_type,
+       .to_ami = queue_agent_dump_to_ami,
+       );
+STASIS_MESSAGE_TYPE_DEFN_LOCAL(queue_agent_ringnoanswer_type,
+       .to_ami = queue_agent_ringnoanswer_to_ami,
+       );
+
 static void queue_publish_multi_channel_snapshot_blob(struct stasis_topic *topic,
                struct ast_channel_snapshot *caller_snapshot,
                struct ast_channel_snapshot *agent_snapshot,
@@ -10344,30 +10429,9 @@ static struct stasis_forward *topic_forwarder;
 
 static int unload_module(void)
 {
-       int res;
-       struct ao2_iterator q_iter;
-       struct call_queue *q = NULL;
-
-       struct stasis_message_router *message_router;
-
-       message_router = ast_manager_get_message_router();
-       if (message_router) {
-               stasis_message_router_remove(message_router, queue_caller_join_type());
-               stasis_message_router_remove(message_router, queue_caller_leave_type());
-               stasis_message_router_remove(message_router, queue_caller_abandon_type());
-               stasis_message_router_remove(message_router, queue_member_status_type());
-               stasis_message_router_remove(message_router, queue_member_added_type());
-               stasis_message_router_remove(message_router, queue_member_removed_type());
-               stasis_message_router_remove(message_router, queue_member_pause_type());
-               stasis_message_router_remove(message_router, queue_member_penalty_type());
-               stasis_message_router_remove(message_router, queue_member_ringinuse_type());
-               stasis_message_router_remove(message_router, queue_agent_called_type());
-               stasis_message_router_remove(message_router, queue_agent_connect_type());
-               stasis_message_router_remove(message_router, queue_agent_complete_type());
-               stasis_message_router_remove(message_router, queue_agent_dump_type());
-               stasis_message_router_remove(message_router, queue_agent_ringnoanswer_type());
-       }
        stasis_message_router_unsubscribe_and_join(agent_router);
+       agent_router = NULL;
+
        topic_forwarder = stasis_forward_cancel(topic_forwarder);
 
        STASIS_MESSAGE_TYPE_CLEANUP(queue_caller_join_type);
@@ -10388,47 +10452,42 @@ static int unload_module(void)
        STASIS_MESSAGE_TYPE_CLEANUP(queue_agent_ringnoanswer_type);
 
        ast_cli_unregister_multiple(cli_queue, ARRAY_LEN(cli_queue));
-       res = ast_manager_unregister("QueueStatus");
-       res |= ast_manager_unregister("Queues");
-       res |= ast_manager_unregister("QueueRule");
-       res |= ast_manager_unregister("QueueSummary");
-       res |= ast_manager_unregister("QueueAdd");
-       res |= ast_manager_unregister("QueueRemove");
-       res |= ast_manager_unregister("QueuePause");
-       res |= ast_manager_unregister("QueueLog");
-       res |= ast_manager_unregister("QueuePenalty");
-       res |= ast_manager_unregister("QueueReload");
-       res |= ast_manager_unregister("QueueReset");
-       res |= ast_manager_unregister("QueueMemberRingInUse");
-       res |= ast_unregister_application(app_aqm);
-       res |= ast_unregister_application(app_rqm);
-       res |= ast_unregister_application(app_pqm);
-       res |= ast_unregister_application(app_upqm);
-       res |= ast_unregister_application(app_ql);
-       res |= ast_unregister_application(app);
-       res |= ast_custom_function_unregister(&queueexists_function);
-       res |= ast_custom_function_unregister(&queuevar_function);
-       res |= ast_custom_function_unregister(&queuemembercount_function);
-       res |= ast_custom_function_unregister(&queuemembercount_dep);
-       res |= ast_custom_function_unregister(&queuememberlist_function);
-       res |= ast_custom_function_unregister(&queuewaitingcount_function);
-       res |= ast_custom_function_unregister(&queuememberpenalty_function);
-
-       res |= ast_data_unregister(NULL);
+       ast_manager_unregister("QueueStatus");
+       ast_manager_unregister("Queues");
+       ast_manager_unregister("QueueRule");
+       ast_manager_unregister("QueueSummary");
+       ast_manager_unregister("QueueAdd");
+       ast_manager_unregister("QueueRemove");
+       ast_manager_unregister("QueuePause");
+       ast_manager_unregister("QueueLog");
+       ast_manager_unregister("QueuePenalty");
+       ast_manager_unregister("QueueReload");
+       ast_manager_unregister("QueueReset");
+       ast_manager_unregister("QueueMemberRingInUse");
+       ast_unregister_application(app_aqm);
+       ast_unregister_application(app_rqm);
+       ast_unregister_application(app_pqm);
+       ast_unregister_application(app_upqm);
+       ast_unregister_application(app_ql);
+       ast_unregister_application(app);
+       ast_custom_function_unregister(&queueexists_function);
+       ast_custom_function_unregister(&queuevar_function);
+       ast_custom_function_unregister(&queuemembercount_function);
+       ast_custom_function_unregister(&queuemembercount_dep);
+       ast_custom_function_unregister(&queuememberlist_function);
+       ast_custom_function_unregister(&queuewaitingcount_function);
+       ast_custom_function_unregister(&queuememberpenalty_function);
+
+       ast_data_unregister(NULL);
 
        device_state_sub = stasis_unsubscribe_and_join(device_state_sub);
 
        ast_extension_state_del(0, extension_state_cb);
 
-       q_iter = ao2_iterator_init(queues, 0);
-       while ((q = ao2_t_iterator_next(&q_iter, "Iterate through queues"))) {
-               queues_t_unlink(queues, q, "Remove queue from container due to unload");
-               queue_t_unref(q, "Done with iterator");
-       }
-       ao2_iterator_destroy(&q_iter);
-       ao2_ref(queues, -1);
        ast_unload_realtime("queue_members");
-       return res;
+       ao2_cleanup(queues);
+       queues = NULL;
+       return 0;
 }
 
 /*!
@@ -10443,19 +10502,23 @@ static int unload_module(void)
  */
 static int load_module(void)
 {
-       int res;
+       int err = 0;
        struct ast_flags mask = {AST_FLAGS_ALL, };
        struct ast_config *member_config;
-       struct stasis_message_router *manager_router;
        struct stasis_topic *queue_topic;
        struct stasis_topic *manager_topic;
 
        queues = ao2_container_alloc(MAX_QUEUE_BUCKETS, queue_hash_cb, queue_cmp_cb);
+       if (!queues) {
+               return AST_MODULE_LOAD_DECLINE;
+       }
 
        use_weight = 0;
 
-       if (reload_handler(0, &mask, NULL))
+       if (reload_handler(0, &mask, NULL)) {
+               unload_module();
                return AST_MODULE_LOAD_DECLINE;
+       }
 
        ast_realtime_require_field("queue_members", "paused", RQ_INTEGER1, 1, "uniqueid", RQ_UINTEGER2, 5, SENTINEL);
 
@@ -10468,6 +10531,7 @@ static int load_module(void)
                realtime_ringinuse_field = "ringinuse";
        } else {
                const char *config_val;
+
                if ((config_val = ast_variable_retrieve(member_config, NULL, "ringinuse"))) {
                        ast_log(LOG_NOTICE, "ringinuse field entries found in queue_members table. Using 'ringinuse'\n");
                        realtime_ringinuse_field = "ringinuse";
@@ -10479,168 +10543,102 @@ static int load_module(void)
                        realtime_ringinuse_field = "ringinuse";
                }
        }
-
        ast_config_destroy(member_config);
 
-       if (queue_persistent_members)
+       if (queue_persistent_members) {
                reload_queue_members();
+       }
 
        ast_data_register_multiple(queue_data_providers, ARRAY_LEN(queue_data_providers));
 
-       ast_cli_register_multiple(cli_queue, ARRAY_LEN(cli_queue));
-       res = ast_register_application_xml(app, queue_exec);
-       res |= ast_register_application_xml(app_aqm, aqm_exec);
-       res |= ast_register_application_xml(app_rqm, rqm_exec);
-       res |= ast_register_application_xml(app_pqm, pqm_exec);
-       res |= ast_register_application_xml(app_upqm, upqm_exec);
-       res |= ast_register_application_xml(app_ql, ql_exec);
-       res |= ast_manager_register_xml("Queues", 0, manager_queues_show);
-       res |= ast_manager_register_xml("QueueStatus", 0, manager_queues_status);
-       res |= ast_manager_register_xml("QueueSummary", 0, manager_queues_summary);
-       res |= ast_manager_register_xml("QueueAdd", EVENT_FLAG_AGENT, manager_add_queue_member);
-       res |= ast_manager_register_xml("QueueRemove", EVENT_FLAG_AGENT, manager_remove_queue_member);
-       res |= ast_manager_register_xml("QueuePause", EVENT_FLAG_AGENT, manager_pause_queue_member);
-       res |= ast_manager_register_xml("QueueLog", EVENT_FLAG_AGENT, manager_queue_log_custom);
-       res |= ast_manager_register_xml("QueuePenalty", EVENT_FLAG_AGENT, manager_queue_member_penalty);
-       res |= ast_manager_register_xml("QueueMemberRingInUse", EVENT_FLAG_AGENT, manager_queue_member_ringinuse);
-       res |= ast_manager_register_xml("QueueRule", 0, manager_queue_rule_show);
-       res |= ast_manager_register_xml("QueueReload", 0, manager_queue_reload);
-       res |= ast_manager_register_xml("QueueReset", 0, manager_queue_reset);
-       res |= ast_custom_function_register(&queuevar_function);
-       res |= ast_custom_function_register(&queueexists_function);
-       res |= ast_custom_function_register(&queuemembercount_function);
-       res |= ast_custom_function_register(&queuemembercount_dep);
-       res |= ast_custom_function_register(&queuememberlist_function);
-       res |= ast_custom_function_register(&queuewaitingcount_function);
-       res |= ast_custom_function_register(&queuememberpenalty_function);
+       err |= ast_cli_register_multiple(cli_queue, ARRAY_LEN(cli_queue));
+       err |= ast_register_application_xml(app, queue_exec);
+       err |= ast_register_application_xml(app_aqm, aqm_exec);
+       err |= ast_register_application_xml(app_rqm, rqm_exec);
+       err |= ast_register_application_xml(app_pqm, pqm_exec);
+       err |= ast_register_application_xml(app_upqm, upqm_exec);
+       err |= ast_register_application_xml(app_ql, ql_exec);
+       err |= ast_manager_register_xml("Queues", 0, manager_queues_show);
+       err |= ast_manager_register_xml("QueueStatus", 0, manager_queues_status);
+       err |= ast_manager_register_xml("QueueSummary", 0, manager_queues_summary);
+       err |= ast_manager_register_xml("QueueAdd", EVENT_FLAG_AGENT, manager_add_queue_member);
+       err |= ast_manager_register_xml("QueueRemove", EVENT_FLAG_AGENT, manager_remove_queue_member);
+       err |= ast_manager_register_xml("QueuePause", EVENT_FLAG_AGENT, manager_pause_queue_member);
+       err |= ast_manager_register_xml("QueueLog", EVENT_FLAG_AGENT, manager_queue_log_custom);
+       err |= ast_manager_register_xml("QueuePenalty", EVENT_FLAG_AGENT, manager_queue_member_penalty);
+       err |= ast_manager_register_xml("QueueMemberRingInUse", EVENT_FLAG_AGENT, manager_queue_member_ringinuse);
+       err |= ast_manager_register_xml("QueueRule", 0, manager_queue_rule_show);
+       err |= ast_manager_register_xml("QueueReload", 0, manager_queue_reload);
+       err |= ast_manager_register_xml("QueueReset", 0, manager_queue_reset);
+       err |= ast_custom_function_register(&queuevar_function);
+       err |= ast_custom_function_register(&queueexists_function);
+       err |= ast_custom_function_register(&queuemembercount_function);
+       err |= ast_custom_function_register(&queuemembercount_dep);
+       err |= ast_custom_function_register(&queuememberlist_function);
+       err |= ast_custom_function_register(&queuewaitingcount_function);
+       err |= ast_custom_function_register(&queuememberpenalty_function);
 
        /* in the following subscribe call, do I use DEVICE_STATE, or DEVICE_STATE_CHANGE? */
-       if (!(device_state_sub = stasis_subscribe(ast_device_state_topic_all(), device_state_cb, NULL))) {
-               res = -1;
+       device_state_sub = stasis_subscribe(ast_device_state_topic_all(), device_state_cb, NULL);
+       if (!device_state_sub) {
+               err = -1;
        }
 
        manager_topic = ast_manager_get_topic();
-       if (!manager_topic) {
-               return AST_MODULE_LOAD_DECLINE;
-       }
-       manager_router = ast_manager_get_message_router();
-       if (!manager_router) {
-               return AST_MODULE_LOAD_DECLINE;
-       }
        queue_topic = ast_queue_topic_all();
-       if (!queue_topic) {
+       if (!manager_topic || !queue_topic) {
+               unload_module();
                return AST_MODULE_LOAD_DECLINE;
        }
        topic_forwarder = stasis_forward_all(queue_topic, manager_topic);
        if (!topic_forwarder) {
+               unload_module();
                return AST_MODULE_LOAD_DECLINE;
        }
 
+       if (!ast_channel_agent_login_type()
+               || !ast_channel_agent_logoff_type()) {
+               unload_module();
+               return AST_MODULE_LOAD_DECLINE;
+       }
        agent_router = stasis_message_router_create(ast_channel_topic_all());
        if (!agent_router) {
+               unload_module();
                return AST_MODULE_LOAD_DECLINE;
        }
-
-       STASIS_MESSAGE_TYPE_INIT(queue_caller_join_type);
-       STASIS_MESSAGE_TYPE_INIT(queue_caller_leave_type);
-       STASIS_MESSAGE_TYPE_INIT(queue_caller_abandon_type);
-
-       STASIS_MESSAGE_TYPE_INIT(queue_member_status_type);
-       STASIS_MESSAGE_TYPE_INIT(queue_member_added_type);
-       STASIS_MESSAGE_TYPE_INIT(queue_member_removed_type);
-       STASIS_MESSAGE_TYPE_INIT(queue_member_pause_type);
-       STASIS_MESSAGE_TYPE_INIT(queue_member_penalty_type);
-       STASIS_MESSAGE_TYPE_INIT(queue_member_ringinuse_type);
-
-       STASIS_MESSAGE_TYPE_INIT(queue_agent_called_type);
-       STASIS_MESSAGE_TYPE_INIT(queue_agent_connect_type);
-       STASIS_MESSAGE_TYPE_INIT(queue_agent_complete_type);
-       STASIS_MESSAGE_TYPE_INIT(queue_agent_dump_type);
-       STASIS_MESSAGE_TYPE_INIT(queue_agent_ringnoanswer_type);
-
-       stasis_message_router_add(manager_router,
-                                 queue_caller_join_type(),
-                                 queue_channel_manager_event,
-                                 "QueueCallerJoin");
-
-       stasis_message_router_add(manager_router,
-                                 queue_caller_leave_type(),
-                                 queue_channel_manager_event,
-                                 "QueueCallerLeave");
-
-       stasis_message_router_add(manager_router,
-                                 queue_caller_abandon_type(),
-                                 queue_channel_manager_event,
-                                 "QueueCallerAbandon");
-
-       stasis_message_router_add(manager_router,
-                                 queue_member_status_type(),
-                                 queue_member_manager_event,
-                                 "QueueMemberStatus");
-
-       stasis_message_router_add(manager_router,
-                                 queue_member_added_type(),
-                                 queue_member_manager_event,
-                                 "QueueMemberAdded");
-
-       stasis_message_router_add(manager_router,
-                                 queue_member_removed_type(),
-                                 queue_member_manager_event,
-                                 "QueueMemberRemoved");
-
-       stasis_message_router_add(manager_router,
-                                 queue_member_pause_type(),
-                                 queue_member_manager_event,
-                                 "QueueMemberPause");
-
-       stasis_message_router_add(manager_router,
-                                 queue_member_penalty_type(),
-                                 queue_member_manager_event,
-                                 "QueueMemberPenalty");
-
-       stasis_message_router_add(manager_router,
-                                 queue_member_ringinuse_type(),
-                                 queue_member_manager_event,
-                                 "QueueMemberRinginuse");
-
-       stasis_message_router_add(manager_router,
-                                 queue_agent_called_type(),
-                                 queue_multi_channel_manager_event,
-                                 "AgentCalled");
-
-       stasis_message_router_add(manager_router,
-                                 queue_agent_connect_type(),
-                                 queue_multi_channel_manager_event,
-                                 "AgentConnect");
-
-       stasis_message_router_add(manager_router,
-                                 queue_agent_complete_type(),
-                                 queue_multi_channel_manager_event,
-                                 "AgentComplete");
-
-       stasis_message_router_add(manager_router,
-                                 queue_agent_dump_type(),
-                                 queue_multi_channel_manager_event,
-                                 "AgentDump");
-
-       stasis_message_router_add(manager_router,
-                                 queue_agent_ringnoanswer_type(),
-                                 queue_multi_channel_manager_event,
-                                 "AgentRingNoAnswer");
-
-       stasis_message_router_add(agent_router,
-                       ast_channel_agent_login_type(),
-                       queue_agent_cb,
-                       NULL);
-
-       stasis_message_router_add(agent_router,
-                       ast_channel_agent_logoff_type(),
-                       queue_agent_cb,
-                       NULL);
+       err |= stasis_message_router_add(agent_router,
+               ast_channel_agent_login_type(),
+               queue_agent_cb,
+               NULL);
+       err |= stasis_message_router_add(agent_router,
+               ast_channel_agent_logoff_type(),
+               queue_agent_cb,
+               NULL);
+
+       err |= STASIS_MESSAGE_TYPE_INIT(queue_caller_join_type);
+       err |= STASIS_MESSAGE_TYPE_INIT(queue_caller_leave_type);
+       err |= STASIS_MESSAGE_TYPE_INIT(queue_caller_abandon_type);
+
+       err |= STASIS_MESSAGE_TYPE_INIT(queue_member_status_type);
+       err |= STASIS_MESSAGE_TYPE_INIT(queue_member_added_type);
+       err |= STASIS_MESSAGE_TYPE_INIT(queue_member_removed_type);
+       err |= STASIS_MESSAGE_TYPE_INIT(queue_member_pause_type);
+       err |= STASIS_MESSAGE_TYPE_INIT(queue_member_penalty_type);
+       err |= STASIS_MESSAGE_TYPE_INIT(queue_member_ringinuse_type);
+
+       err |= STASIS_MESSAGE_TYPE_INIT(queue_agent_called_type);
+       err |= STASIS_MESSAGE_TYPE_INIT(queue_agent_connect_type);
+       err |= STASIS_MESSAGE_TYPE_INIT(queue_agent_complete_type);
+       err |= STASIS_MESSAGE_TYPE_INIT(queue_agent_dump_type);
+       err |= STASIS_MESSAGE_TYPE_INIT(queue_agent_ringnoanswer_type);
 
        ast_extension_state_add(NULL, NULL, extension_state_cb, NULL);
 
-       return res ? AST_MODULE_LOAD_DECLINE : 0;
+       if (err) {
+               unload_module();
+               return AST_MODULE_LOAD_DECLINE;
+       }
+       return AST_MODULE_LOAD_SUCCESS;
 }
 
 static int reload(void)
index e69a24c..ed70356 100644 (file)
@@ -1033,38 +1033,66 @@ enum agi_result {
        AGI_RESULT_HANGUP,
 };
 
-STASIS_MESSAGE_TYPE_DEFN_LOCAL(agi_exec_start_type);
-STASIS_MESSAGE_TYPE_DEFN_LOCAL(agi_exec_end_type);
-STASIS_MESSAGE_TYPE_DEFN_LOCAL(agi_async_start_type);
-STASIS_MESSAGE_TYPE_DEFN_LOCAL(agi_async_exec_type);
-STASIS_MESSAGE_TYPE_DEFN_LOCAL(agi_async_end_type);
-
-static void agi_channel_manager_event(void *data,
-       struct stasis_subscription *sub,
-       struct stasis_message *message)
+static struct ast_manager_event_blob *agi_channel_to_ami(const char *type, struct stasis_message *message)
 {
-       const char *type = data;
        struct ast_channel_blob *obj = stasis_message_data(message);
-       RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
+       RAII_VAR(struct ast_str *, channel_string, NULL, ast_free);
        RAII_VAR(struct ast_str *, event_string, NULL, ast_free);
 
-       channel_event_string = ast_manager_build_channel_state_string(obj->snapshot);
-       if (!channel_event_string) {
-               return;
-       }
-
+       channel_string = ast_manager_build_channel_state_string(obj->snapshot);
        event_string = ast_manager_str_from_json_object(obj->blob, NULL);
-       if (!event_string) {
-               return;
+       if (!channel_string || !event_string) {
+               return NULL;
        }
 
-       manager_event(EVENT_FLAG_AGI, type,
+       return ast_manager_event_blob_create(EVENT_FLAG_AGI, type,
                "%s"
                "%s",
-               ast_str_buffer(channel_event_string),
+               ast_str_buffer(channel_string),
                ast_str_buffer(event_string));
 }
 
+static struct ast_manager_event_blob *agi_exec_start_to_ami(struct stasis_message *message)
+{
+       return agi_channel_to_ami("AGIExecStart", message);
+}
+
+static struct ast_manager_event_blob *agi_exec_end_to_ami(struct stasis_message *message)
+{
+       return agi_channel_to_ami("AGIExecEnd", message);
+}
+
+static struct ast_manager_event_blob *agi_async_start_to_ami(struct stasis_message *message)
+{
+       return agi_channel_to_ami("AsyncAGIStart", message);
+}
+
+static struct ast_manager_event_blob *agi_async_exec_to_ami(struct stasis_message *message)
+{
+       return agi_channel_to_ami("AsyncAGIExec", message);
+}
+
+static struct ast_manager_event_blob *agi_async_end_to_ami(struct stasis_message *message)
+{
+       return agi_channel_to_ami("AsyncAGIEnd", message);
+}
+
+STASIS_MESSAGE_TYPE_DEFN_LOCAL(agi_exec_start_type,
+       .to_ami = agi_exec_start_to_ami,
+       );
+STASIS_MESSAGE_TYPE_DEFN_LOCAL(agi_exec_end_type,
+       .to_ami = agi_exec_end_to_ami,
+       );
+STASIS_MESSAGE_TYPE_DEFN_LOCAL(agi_async_start_type,
+       .to_ami = agi_async_start_to_ami,
+       );
+STASIS_MESSAGE_TYPE_DEFN_LOCAL(agi_async_exec_type,
+       .to_ami = agi_async_exec_to_ami,
+       );
+STASIS_MESSAGE_TYPE_DEFN_LOCAL(agi_async_end_type,
+       .to_ami = agi_async_end_to_ami,
+       );
+
 static agi_command *find_command(const char * const cmds[], int exact);
 
 AST_THREADSTORAGE(agi_buf);
@@ -3468,10 +3496,9 @@ int AST_OPTIONAL_API_NAME(ast_agi_unregister)(struct ast_module *mod, agi_comman
        }
        AST_RWLIST_TRAVERSE_SAFE_END;
        AST_RWLIST_UNLOCK(&agi_commands);
-       if (unregistered)
+       if (unregistered) {
                ast_verb(2, "AGI Command '%s' unregistered\n",fullcmd);
-       else
-               ast_log(LOG_WARNING, "Unable to unregister command: '%s'!\n",fullcmd);
+       }
        return unregistered;
 }
 
@@ -4256,17 +4283,6 @@ AST_TEST_DEFINE(test_agi_null_docs)
 
 static int unload_module(void)
 {
-       struct stasis_message_router *message_router;
-
-       message_router = ast_manager_get_message_router();
-       if (message_router) {
-               stasis_message_router_remove(message_router, agi_exec_start_type());
-               stasis_message_router_remove(message_router, agi_exec_end_type());
-               stasis_message_router_remove(message_router, agi_async_start_type());
-               stasis_message_router_remove(message_router, agi_async_exec_type());
-               stasis_message_router_remove(message_router, agi_async_end_type());
-       }
-
        STASIS_MESSAGE_TYPE_CLEANUP(agi_exec_start_type);
        STASIS_MESSAGE_TYPE_CLEANUP(agi_exec_end_type);
        STASIS_MESSAGE_TYPE_CLEANUP(agi_async_start_type);
@@ -4274,67 +4290,39 @@ static int unload_module(void)
        STASIS_MESSAGE_TYPE_CLEANUP(agi_async_end_type);
 
        ast_cli_unregister_multiple(cli_agi, ARRAY_LEN(cli_agi));
-       /* we can safely ignore the result of ast_agi_unregister_multiple() here, since it cannot fail, as
-          we know that these commands were registered by this module and are still registered
-       */
-       (void) ast_agi_unregister_multiple(ast_module_info->self, commands, ARRAY_LEN(commands));
+       ast_agi_unregister_multiple(ast_module_info->self, commands, ARRAY_LEN(commands));
        ast_unregister_application(eapp);
        ast_unregister_application(deadapp);
        ast_manager_unregister("AGI");
+       ast_unregister_application(app);
        AST_TEST_UNREGISTER(test_agi_null_docs);
-       return ast_unregister_application(app);
+       return 0;
 }
 
 static int load_module(void)
 {
-       struct stasis_message_router *message_router;
+       int err = 0;
 
-       message_router = ast_manager_get_message_router();
-       if (!message_router) {
-               return AST_MODULE_LOAD_DECLINE;
-       }
+       err |= STASIS_MESSAGE_TYPE_INIT(agi_exec_start_type);
+       err |= STASIS_MESSAGE_TYPE_INIT(agi_exec_end_type);
+       err |= STASIS_MESSAGE_TYPE_INIT(agi_async_start_type);
+       err |= STASIS_MESSAGE_TYPE_INIT(agi_async_exec_type);
+       err |= STASIS_MESSAGE_TYPE_INIT(agi_async_end_type);
+
+       err |= ast_cli_register_multiple(cli_agi, ARRAY_LEN(cli_agi));
+       err |= ast_agi_register_multiple(ast_module_info->self, commands, ARRAY_LEN(commands));
+       err |= ast_register_application_xml(deadapp, deadagi_exec);
+       err |= ast_register_application_xml(eapp, eagi_exec);
+       err |= ast_manager_register_xml("AGI", EVENT_FLAG_AGI, action_add_agi_cmd);
+       err |= ast_register_application_xml(app, agi_exec);
 
-       STASIS_MESSAGE_TYPE_INIT(agi_exec_start_type);
-       STASIS_MESSAGE_TYPE_INIT(agi_exec_end_type);
-       STASIS_MESSAGE_TYPE_INIT(agi_async_start_type);
-       STASIS_MESSAGE_TYPE_INIT(agi_async_exec_type);
-       STASIS_MESSAGE_TYPE_INIT(agi_async_end_type);
-
-       stasis_message_router_add(message_router,
-                                 agi_exec_start_type(),
-                                 agi_channel_manager_event,
-                                 "AGIExecStart");
-
-       stasis_message_router_add(message_router,
-                                 agi_exec_end_type(),
-                                 agi_channel_manager_event,
-                                 "AGIExecEnd");
-
-       stasis_message_router_add(message_router,
-                                 agi_async_start_type(),
-                                 agi_channel_manager_event,
-                                 "AsyncAGIStart");
-
-       stasis_message_router_add(message_router,
-                                 agi_async_exec_type(),
-                                 agi_channel_manager_event,
-                                 "AsyncAGIExec");
-
-       stasis_message_router_add(message_router,
-                                 agi_async_end_type(),
-                                 agi_channel_manager_event,
-                                 "AsyncAGIEnd");
-
-       ast_cli_register_multiple(cli_agi, ARRAY_LEN(cli_agi));
-       /* we can safely ignore the result of ast_agi_register_multiple() here, since it cannot fail, as
-          no other commands have been registered yet
-       */
-       (void) ast_agi_register_multiple(ast_module_info->self, commands, ARRAY_LEN(commands));
-       ast_register_application_xml(deadapp, deadagi_exec);
-       ast_register_application_xml(eapp, eagi_exec);
-       ast_manager_register_xml("AGI", EVENT_FLAG_AGI, action_add_agi_cmd);
        AST_TEST_REGISTER(test_agi_null_docs);
-       return ast_register_application_xml(app, agi_exec);
+
+       if (err) {
+               unload_module();
+               return AST_MODULE_LOAD_DECLINE;
+       }
+       return AST_MODULE_LOAD_SUCCESS;
 }
 
 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS | AST_MODFLAG_LOAD_ORDER, "Asterisk Gateway Interface (AGI)",