Merge "lock: Improve performance of DEBUG_THREADS."
[asterisk/asterisk.git] / main / cdr.c
index 0ec6ada..1c47e24 100644 (file)
@@ -53,6 +53,7 @@
 #include "asterisk/cdr.h"
 #include "asterisk/callerid.h"
 #include "asterisk/manager.h"
+#include "asterisk/module.h"
 #include "asterisk/causes.h"
 #include "asterisk/linkedlists.h"
 #include "asterisk/utils.h"
@@ -371,7 +372,7 @@ static ast_cond_t cdr_pending_cond;
 /*! \brief A container of the active master CDRs indexed by Party A channel uniqueid */
 static struct ao2_container *active_cdrs_master;
 
-/*! \brief A container of all active CDRs indexed by Party B channel name */
+/*! \brief A container of all active CDRs with a Party B indexed by Party B channel name */
 static struct ao2_container *active_cdrs_all;
 
 /*! \brief Message router for stasis messages regarding channel state */
@@ -971,13 +972,21 @@ static void cdr_all_unlink(struct cdr_object *cdr)
 
        ast_assert(cdr->is_root);
 
+       /* Hold a ref to the root CDR to ensure the list members don't go away on us. */
+       ao2_ref(cdr, +1);
        ao2_lock(active_cdrs_all);
-       for (cur = cdr->next; cur; cur = next) {
+       for (cur = cdr; cur; cur = next) {
                next = cur->next;
                ao2_unlink_flags(active_cdrs_all, cur, OBJ_NOLOCK);
+               /*
+                * It is safe to still use cur after unlinking because the
+                * root CDR holds a ref to all the CDRs in the list and we
+                * have a ref to the root CDR.
+                */
                ast_string_field_set(cur, party_b_name, "");
        }
        ao2_unlock(active_cdrs_all);
+       ao2_ref(cdr, -1);
 }
 
 /*!
@@ -3776,6 +3785,7 @@ static void cdr_submit_batch(int do_shutdown)
 static int submit_scheduled_batch(const void *data)
 {
        struct module_config *mod_cfg;
+       int nextms;
 
        cdr_submit_batch(0);
 
@@ -3784,25 +3794,23 @@ static int submit_scheduled_batch(const void *data)
                return 0;
        }
 
-       /* manually reschedule from this point in time */
-       ast_mutex_lock(&cdr_sched_lock);
-       cdr_sched = ast_sched_add(sched, mod_cfg->general->batch_settings.time * 1000, submit_scheduled_batch, NULL);
-       ast_mutex_unlock(&cdr_sched_lock);
+       /* Calculate the next scheduled interval */
+       nextms = mod_cfg->general->batch_settings.time * 1000;
 
        ao2_cleanup(mod_cfg);
-       /* returning zero so the scheduler does not automatically reschedule */
-       return 0;
+
+       return nextms;
 }
 
 /*! Do not hold the batch lock while calling this function */
-static void submit_unscheduled_batch(void)
+static void start_batch_mode(void)
 {
        /* Prevent two deletes from happening at the same time */
        ast_mutex_lock(&cdr_sched_lock);
        /* this is okay since we are not being called from within the scheduler */
        AST_SCHED_DEL(sched, cdr_sched);
        /* schedule the submission to occur ASAP (1 ms) */
-       cdr_sched = ast_sched_add(sched, 1, submit_scheduled_batch, NULL);
+       cdr_sched = ast_sched_add_variable(sched, 1, submit_scheduled_batch, NULL, 1);
        ast_mutex_unlock(&cdr_sched_lock);
 
        /* signal the do_cdr thread to wakeup early and do some work (that lazy thread ;) */
@@ -3867,9 +3875,9 @@ static void cdr_detach(struct ast_cdr *cdr)
        }
        ast_mutex_unlock(&cdr_batch_lock);
 
-       /* Don't call submit_unscheduled_batch with the cdr_batch_lock held */
+       /* Don't submit a batch with cdr_batch_lock held */
        if (submit_batch) {
-               submit_unscheduled_batch();
+               start_batch_mode();
        }
 }
 
@@ -3883,7 +3891,7 @@ static void *do_cdr(void *data)
                struct timeval now;
                schedms = ast_sched_wait(sched);
                /* this shouldn't happen, but provide a 1 second default just in case */
-               if (schedms <= 0)
+               if (schedms < 0)
                        schedms = 1000;
                now = ast_tvadd(ast_tvnow(), ast_samp2tv(schedms, 1000));
                timeout.tv_sec = now.tv_sec;
@@ -3893,7 +3901,7 @@ static void *do_cdr(void *data)
                ast_cond_timedwait(&cdr_pending_cond, &cdr_pending_lock, &timeout);
                numevents = ast_sched_runq(sched);
                ast_mutex_unlock(&cdr_pending_lock);
-               ast_debug(2, "Processed %d scheduled CDR batches from the run queue\n", numevents);
+               ast_debug(2, "Processed %d CDR batches from the run queue\n", numevents);
        }
 
        return NULL;
@@ -3942,18 +3950,14 @@ static char *handle_cli_debug(struct ast_cli_entry *e, int cmd, struct ast_cli_a
 /*! \brief Complete user input for 'cdr show' */
 static char *cli_complete_show(struct ast_cli_args *a)
 {
-       char *result = NULL;
        int wordlen = strlen(a->word);
-       int which = 0;
        struct ao2_iterator it_cdrs;
        struct cdr_object *cdr;
 
        it_cdrs = ao2_iterator_init(active_cdrs_master, 0);
        while ((cdr = ao2_iterator_next(&it_cdrs))) {
-               if (!strncasecmp(a->word, cdr->party_a.snapshot->name, wordlen) &&
-                       (++which > a->n)) {
-                       result = ast_strdup(cdr->party_a.snapshot->name);
-                       if (result) {
+               if (!strncasecmp(a->word, cdr->party_a.snapshot->name, wordlen)) {
+                       if (ast_cli_completion_add(ast_strdup(cdr->party_a.snapshot->name))) {
                                ao2_ref(cdr, -1);
                                break;
                        }
@@ -3961,7 +3965,8 @@ static char *cli_complete_show(struct ast_cli_args *a)
                ao2_ref(cdr, -1);
        }
        ao2_iterator_destroy(&it_cdrs);
-       return result;
+
+       return NULL;
 }
 
 static void cli_show_channels(struct ast_cli_args *a)
@@ -4211,10 +4216,10 @@ static char *handle_cli_submit(struct ast_cli_entry *e, int cmd, struct ast_cli_
 
        if (!ast_test_flag(&mod_cfg->general->settings, CDR_ENABLED)) {
                ast_cli(a->fd, "Cannot submit CDR batch: CDR engine disabled.\n");
-       } else if (ast_test_flag(&mod_cfg->general->settings, CDR_BATCHMODE)) {
+       } else if (!ast_test_flag(&mod_cfg->general->settings, CDR_BATCHMODE)) {
                ast_cli(a->fd, "Cannot submit CDR batch: batch mode not enabled.\n");
        } else {
-               submit_unscheduled_batch();
+               start_batch_mode();
                ast_cli(a->fd, "Submitted CDRs to backend engines for processing.  This may take a while.\n");
        }
        ao2_cleanup(mod_cfg);
@@ -4330,7 +4335,7 @@ static int process_config(int reload)
                aco_option_register(&cfg_info, "scheduleronly", ACO_EXACT, general_options, DEFAULT_BATCH_SCHEDULER_ONLY, OPT_BOOLFLAG_T, 1, FLDSET(struct ast_cdr_config, batch_settings.settings), BATCH_MODE_SCHEDULER_ONLY);
                aco_option_register(&cfg_info, "safeshutdown", ACO_EXACT, general_options, DEFAULT_BATCH_SAFE_SHUTDOWN, OPT_BOOLFLAG_T, 1, FLDSET(struct ast_cdr_config, batch_settings.settings), BATCH_MODE_SAFE_SHUTDOWN);
                aco_option_register(&cfg_info, "size", ACO_EXACT, general_options, DEFAULT_BATCH_SIZE, OPT_UINT_T, PARSE_IN_RANGE, FLDSET(struct ast_cdr_config, batch_settings.size), 0, MAX_BATCH_SIZE);
-               aco_option_register(&cfg_info, "time", ACO_EXACT, general_options, DEFAULT_BATCH_TIME, OPT_UINT_T, PARSE_IN_RANGE, FLDSET(struct ast_cdr_config, batch_settings.time), 0, MAX_BATCH_TIME);
+               aco_option_register(&cfg_info, "time", ACO_EXACT, general_options, DEFAULT_BATCH_TIME, OPT_UINT_T, PARSE_IN_RANGE, FLDSET(struct ast_cdr_config, batch_settings.time), 1, MAX_BATCH_TIME);
        }
 
        if (aco_process_config(&cfg_info, reload) == ACO_PROCESS_ERROR) {
@@ -4358,11 +4363,6 @@ static int process_config(int reload)
        return 0;
 }
 
-static void cdr_engine_cleanup(void)
-{
-       destroy_subscriptions();
-}
-
 static void cdr_engine_shutdown(void)
 {
        stasis_message_router_unsubscribe_and_join(stasis_router);
@@ -4396,8 +4396,6 @@ static void cdr_engine_shutdown(void)
 
 static void cdr_enable_batch_mode(struct ast_cdr_config *config)
 {
-       SCOPED_LOCK(batch, &cdr_batch_lock, ast_mutex_lock, ast_mutex_unlock);
-
        /* Only create the thread level portions once */
        if (cdr_thread == AST_PTHREADT_NULL) {
                ast_cond_init(&cdr_pending_cond, NULL);
@@ -4407,9 +4405,9 @@ static void cdr_enable_batch_mode(struct ast_cdr_config *config)
                }
        }
 
-       /* Kill the currently scheduled item */
-       AST_SCHED_DEL(sched, cdr_sched);
-       cdr_sched = ast_sched_add(sched, config->batch_settings.time * 1000, submit_scheduled_batch, NULL);
+       /* Start the batching process */
+       start_batch_mode();
+
        ast_log(LOG_NOTICE, "CDR batch mode logging enabled, first of either size %u or time %u seconds.\n",
                        config->batch_settings.size, config->batch_settings.time);
 }
@@ -4495,26 +4493,33 @@ static int cdr_toggle_runtime_options(void)
        return mod_cfg ? 0 : -1;
 }
 
-int ast_cdr_engine_init(void)
+static int unload_module(void)
+{
+       destroy_subscriptions();
+
+       return 0;
+}
+
+static int load_module(void)
 {
        if (process_config(0)) {
-               return -1;
+               return AST_MODULE_LOAD_FAILURE;
        }
 
        cdr_topic = stasis_topic_create("cdr_engine");
        if (!cdr_topic) {
-               return -1;
+               return AST_MODULE_LOAD_FAILURE;
        }
 
        stasis_router = stasis_message_router_create(cdr_topic);
        if (!stasis_router) {
-               return -1;
+               return AST_MODULE_LOAD_FAILURE;
        }
        stasis_message_router_set_congestion_limits(stasis_router, -1,
                10 * AST_TASKPROCESSOR_HIGH_WATER_LEVEL);
 
        if (STASIS_MESSAGE_TYPE_INIT(cdr_sync_message_type)) {
-               return -1;
+               return AST_MODULE_LOAD_FAILURE;
        }
 
        stasis_message_router_add_cache_update(stasis_router, ast_channel_snapshot_type(), handle_channel_cache_message, NULL);
@@ -4527,28 +4532,27 @@ int ast_cdr_engine_init(void)
        active_cdrs_master = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
                NUM_CDR_BUCKETS, cdr_master_hash_fn, NULL, cdr_master_cmp_fn);
        if (!active_cdrs_master) {
-               return -1;
+               return AST_MODULE_LOAD_FAILURE;
        }
        ao2_container_register("cdrs_master", active_cdrs_master, cdr_master_print_fn);
 
        active_cdrs_all = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
                NUM_CDR_BUCKETS, cdr_all_hash_fn, NULL, cdr_all_cmp_fn);
        if (!active_cdrs_all) {
-               return -1;
+               return AST_MODULE_LOAD_FAILURE;
        }
        ao2_container_register("cdrs_all", active_cdrs_all, cdr_all_print_fn);
 
        sched = ast_sched_context_create();
        if (!sched) {
                ast_log(LOG_ERROR, "Unable to create schedule context.\n");
-               return -1;
+               return AST_MODULE_LOAD_FAILURE;
        }
 
        ast_cli_register_multiple(cli_commands, ARRAY_LEN(cli_commands));
-       ast_register_cleanup(cdr_engine_cleanup);
        ast_register_atexit(cdr_engine_shutdown);
 
-       return cdr_toggle_runtime_options();
+       return cdr_toggle_runtime_options() ? AST_MODULE_LOAD_FAILURE : AST_MODULE_LOAD_SUCCESS;
 }
 
 void ast_cdr_engine_term(void)
@@ -4591,7 +4595,7 @@ void ast_cdr_engine_term(void)
        }
 }
 
-int ast_cdr_engine_reload(void)
+static int reload_module(void)
 {
        struct module_config *old_mod_cfg;
        struct module_config *mod_cfg;
@@ -4617,3 +4621,12 @@ int ast_cdr_engine_reload(void)
        ao2_cleanup(old_mod_cfg);
        return cdr_toggle_runtime_options();
 }
+
+AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS | AST_MODFLAG_LOAD_ORDER, "CDR Engine",
+       .support_level = AST_MODULE_SUPPORT_CORE,
+       .load = load_module,
+       .unload = unload_module,
+       .reload = reload_module,
+       .load_pri = AST_MODPRI_CORE,
+       .requires = "extconfig",
+);