Merge "lock: Improve performance of DEBUG_THREADS."
[asterisk/asterisk.git] / main / cdr.c
index def3871..1c47e24 100644 (file)
@@ -53,6 +53,7 @@
 #include "asterisk/cdr.h"
 #include "asterisk/callerid.h"
 #include "asterisk/manager.h"
+#include "asterisk/module.h"
 #include "asterisk/causes.h"
 #include "asterisk/linkedlists.h"
 #include "asterisk/utils.h"
@@ -238,8 +239,29 @@ static struct aco_type general_option = {
        .type = ACO_GLOBAL,
        .name = "general",
        .item_offset = offsetof(struct module_config, general),
-       .category = "^general$",
-       .category_match = ACO_WHITELIST,
+       .category = "general",
+       .category_match = ACO_WHITELIST_EXACT,
+};
+
+/*! Config sections used by existing modules. Do not add to this list. */
+static const char *ignore_categories[] = {
+       "csv",
+       "custom",
+       "manager",
+       "odbc",
+       "pgsql",
+       "radius",
+       "sqlite",
+       "tds",
+       "mysql",
+       NULL,
+};
+
+static struct aco_type ignore_option = {
+       .type = ACO_IGNORE,
+       .name = "modules",
+       .category = (const char*)ignore_categories,
+       .category_match = ACO_WHITELIST_ARRAY,
 };
 
 static void *module_config_alloc(void);
@@ -249,8 +271,7 @@ static void module_config_post_apply(void);
 /*! \brief The file definition */
 static struct aco_file module_file_conf = {
        .filename = "cdr.conf",
-       .skip_category = "(^csv$|^custom$|^manager$|^odbc$|^pgsql$|^radius$|^sqlite$|^tds$|^mysql$)",
-       .types = ACO_TYPES(&general_option),
+       .types = ACO_TYPES(&general_option, &ignore_option),
 };
 
 CONFIG_INFO_CORE("cdr", cfg_info, module_configs, module_config_alloc,
@@ -348,8 +369,11 @@ AST_MUTEX_DEFINE_STATIC(cdr_batch_lock);
 AST_MUTEX_DEFINE_STATIC(cdr_pending_lock);
 static ast_cond_t cdr_pending_cond;
 
-/*! \brief A container of the active CDRs indexed by Party A channel id */
-static struct ao2_container *active_cdrs_by_channel;
+/*! \brief A container of the active master CDRs indexed by Party A channel uniqueid */
+static struct ao2_container *active_cdrs_master;
+
+/*! \brief A container of all active CDRs with a Party B indexed by Party B channel name */
+static struct ao2_container *active_cdrs_all;
 
 /*! \brief Message router for stasis messages regarding channel state */
 static struct stasis_message_router *stasis_router;
@@ -706,13 +730,14 @@ struct cdr_object {
        struct ast_flags flags;                 /*!< Flags on the CDR */
        AST_DECLARE_STRING_FIELDS(
                AST_STRING_FIELD(linkedid);         /*!< Linked ID. Cached here as it may change out from party A, which must be immutable */
-               AST_STRING_FIELD(uniqueid);                     /*!< Unique id of party A. Cached here as it is the primary key of this CDR */
+               AST_STRING_FIELD(uniqueid);                     /*!< Unique id of party A. Cached here as it is the master CDR container key */
                AST_STRING_FIELD(name);             /*!< Channel name of party A. Cached here as the party A address may change */
                AST_STRING_FIELD(bridge);           /*!< The bridge the party A happens to be in. */
                AST_STRING_FIELD(appl);             /*!< The last accepted application party A was in */
                AST_STRING_FIELD(data);             /*!< The data for the last accepted application party A was in */
                AST_STRING_FIELD(context);          /*!< The accepted context for Party A */
                AST_STRING_FIELD(exten);            /*!< The accepted extension for Party A */
+               AST_STRING_FIELD(party_b_name);     /*!< Party B channel name. Cached here as it is the all CDRs container key */
        );
        struct cdr_object *next;                /*!< The next CDR object in the chain */
        struct cdr_object *last;                /*!< The last CDR object in the chain */
@@ -793,9 +818,12 @@ static void cdr_object_transition_state(struct cdr_object *cdr, struct cdr_objec
                cdr->fn_table->init_function(cdr);
        }
 }
-/*! \internal
- * \brief Hash function for containers of CDRs indexing by Party A uniqueid */
-static int cdr_object_channel_hash_fn(const void *obj, const int flags)
+
+/*!
+ * \internal
+ * \brief Hash function for master CDR container indexed by Party A uniqueid.
+ */
+static int cdr_master_hash_fn(const void *obj, const int flags)
 {
        const struct cdr_object *cdr;
        const char *key;
@@ -815,10 +843,11 @@ static int cdr_object_channel_hash_fn(const void *obj, const int flags)
        return ast_str_case_hash(key);
 }
 
-/*! \internal
- * \brief Comparison function for containers of CDRs indexing by Party A uniqueid
+/*!
+ * \internal
+ * \brief Comparison function for master CDR container indexed by Party A uniqueid.
  */
-static int cdr_object_channel_cmp_fn(void *obj, void *arg, int flags)
+static int cdr_master_cmp_fn(void *obj, void *arg, int flags)
 {
     struct cdr_object *left = obj;
     struct cdr_object *right = arg;
@@ -849,6 +878,118 @@ static int cdr_object_channel_cmp_fn(void *obj, void *arg, int flags)
 }
 
 /*!
+ * \internal
+ * \brief Hash function for all CDR container indexed by Party B channel name.
+ */
+static int cdr_all_hash_fn(const void *obj, const int flags)
+{
+       const struct cdr_object *cdr;
+       const char *key;
+
+       switch (flags & OBJ_SEARCH_MASK) {
+       case OBJ_SEARCH_KEY:
+               key = obj;
+               break;
+       case OBJ_SEARCH_OBJECT:
+               cdr = obj;
+               key = cdr->party_b_name;
+               break;
+       default:
+               ast_assert(0);
+               return 0;
+       }
+       return ast_str_case_hash(key);
+}
+
+/*!
+ * \internal
+ * \brief Comparison function for all CDR container indexed by Party B channel name.
+ */
+static int cdr_all_cmp_fn(void *obj, void *arg, int flags)
+{
+    struct cdr_object *left = obj;
+    struct cdr_object *right = arg;
+    const char *right_key = arg;
+    int cmp;
+
+    switch (flags & OBJ_SEARCH_MASK) {
+    case OBJ_SEARCH_OBJECT:
+        right_key = right->party_b_name;
+        /* Fall through */
+    case OBJ_SEARCH_KEY:
+        cmp = strcasecmp(left->party_b_name, right_key);
+        break;
+    case OBJ_SEARCH_PARTIAL_KEY:
+        /*
+         * We could also use a partial key struct containing a length
+         * so strlen() does not get called for every comparison instead.
+         */
+        cmp = strncasecmp(left->party_b_name, right_key, strlen(right_key));
+        break;
+    default:
+        /* Sort can only work on something with a full or partial key. */
+        ast_assert(0);
+        cmp = 0;
+        break;
+    }
+    return cmp ? 0 : CMP_MATCH;
+}
+
+/*!
+ * \internal
+ * \brief Relink the CDR because Party B's snapshot changed.
+ * \since 13.19.0
+ *
+ * \return Nothing
+ */
+static void cdr_all_relink(struct cdr_object *cdr)
+{
+       ao2_lock(active_cdrs_all);
+       if (cdr->party_b.snapshot) {
+               if (strcasecmp(cdr->party_b_name, cdr->party_b.snapshot->name)) {
+                       ao2_unlink_flags(active_cdrs_all, cdr, OBJ_NOLOCK);
+                       ast_string_field_set(cdr, party_b_name, cdr->party_b.snapshot->name);
+                       ao2_link_flags(active_cdrs_all, cdr, OBJ_NOLOCK);
+               }
+       } else {
+               ao2_unlink_flags(active_cdrs_all, cdr, OBJ_NOLOCK);
+               ast_string_field_set(cdr, party_b_name, "");
+       }
+       ao2_unlock(active_cdrs_all);
+}
+
+/*!
+ * \internal
+ * \brief Unlink the master CDR and chained records from the active_cdrs_all container.
+ * \since 13.19.0
+ *
+ * \return Nothing
+ */
+static void cdr_all_unlink(struct cdr_object *cdr)
+{
+       struct cdr_object *cur;
+       struct cdr_object *next;
+
+       ast_assert(cdr->is_root);
+
+       /* Hold a ref to the root CDR to ensure the list members don't go away on us. */
+       ao2_ref(cdr, +1);
+       ao2_lock(active_cdrs_all);
+       for (cur = cdr; cur; cur = next) {
+               next = cur->next;
+               ao2_unlink_flags(active_cdrs_all, cur, OBJ_NOLOCK);
+               /*
+                * It is safe to still use cur after unlinking because the
+                * root CDR holds a ref to all the CDRs in the list and we
+                * have a ref to the root CDR.
+                */
+               ast_string_field_set(cur, party_b_name, "");
+       }
+       ao2_unlock(active_cdrs_all);
+       ao2_ref(cdr, -1);
+}
+
+/*!
  * \brief \ref cdr_object Destructor
  */
 static void cdr_object_dtor(void *obj)
@@ -1509,6 +1650,7 @@ static int single_state_process_dial_begin(struct cdr_object *cdr, struct ast_ch
                CDR_DEBUG("%p - Updated Party A %s snapshot\n", cdr,
                        cdr->party_a.snapshot->name);
                cdr_object_swap_snapshot(&cdr->party_b, peer);
+               cdr_all_relink(cdr);
                CDR_DEBUG("%p - Updated Party B %s snapshot\n", cdr,
                        cdr->party_b.snapshot->name);
 
@@ -1556,6 +1698,7 @@ static int single_state_bridge_enter_comparison(struct cdr_object *cdr,
                CDR_DEBUG("%p - Party A %s has new Party B %s\n",
                        cdr, cdr->party_a.snapshot->name, cand_cdr->party_a.snapshot->name);
                cdr_object_snapshot_copy(&cdr->party_b, &cand_cdr->party_a);
+               cdr_all_relink(cdr);
                if (!cand_cdr->party_b.snapshot) {
                        /* We just stole them - finalize their CDR. Note that this won't
                         * transition their state, it just sets the end time and the
@@ -1576,6 +1719,7 @@ static int single_state_bridge_enter_comparison(struct cdr_object *cdr,
                CDR_DEBUG("%p - Party A %s has new Party B %s\n",
                        cdr, cdr->party_a.snapshot->name, cand_cdr->party_b.snapshot->name);
                cdr_object_snapshot_copy(&cdr->party_b, &cand_cdr->party_b);
+               cdr_all_relink(cdr);
                return 0;
        }
 
@@ -1602,7 +1746,7 @@ static enum process_bridge_enter_results single_state_process_bridge_enter(struc
                struct cdr_object *cand_cdr_master;
                struct cdr_object *cand_cdr;
 
-               cand_cdr_master = ao2_find(active_cdrs_by_channel, channel_id, OBJ_SEARCH_KEY);
+               cand_cdr_master = ao2_find(active_cdrs_master, channel_id, OBJ_SEARCH_KEY);
                if (!cand_cdr_master) {
                        continue;
                }
@@ -1728,8 +1872,6 @@ static int dial_state_process_dial_end(struct cdr_object *cdr, struct ast_channe
 
 static enum process_bridge_enter_results dial_state_process_bridge_enter(struct cdr_object *cdr, struct ast_bridge_snapshot *bridge, struct ast_channel_snapshot *channel)
 {
-       struct ao2_iterator it_cdrs;
-       char *channel_id;
        int success = 0;
 
        ast_string_field_set(cdr, bridge, bridge->uniqueid);
@@ -1741,50 +1883,51 @@ static enum process_bridge_enter_results dial_state_process_bridge_enter(struct
                return BRIDGE_ENTER_ONLY_PARTY;
        }
 
-       for (it_cdrs = ao2_iterator_init(bridge->channels, 0);
-               !success && (channel_id = ao2_iterator_next(&it_cdrs));
-               ao2_ref(channel_id, -1)) {
-               struct cdr_object *cand_cdr_master;
-               struct cdr_object *cand_cdr;
+       /* If we don't have a Party B (originated channel), skip it */
+       if (cdr->party_b.snapshot) {
+               struct ao2_iterator it_cdrs;
+               char *channel_id;
 
-               cand_cdr_master = ao2_find(active_cdrs_by_channel, channel_id, OBJ_SEARCH_KEY);
-               if (!cand_cdr_master) {
-                       continue;
-               }
+               for (it_cdrs = ao2_iterator_init(bridge->channels, 0);
+                       !success && (channel_id = ao2_iterator_next(&it_cdrs));
+                       ao2_ref(channel_id, -1)) {
+                       struct cdr_object *cand_cdr_master;
+                       struct cdr_object *cand_cdr;
 
-               ao2_lock(cand_cdr_master);
-               for (cand_cdr = cand_cdr_master; cand_cdr; cand_cdr = cand_cdr->next) {
-                       /* Skip any records that are not in a bridge or in this bridge.
-                        * I'm not sure how that would happen, but it pays to be careful. */
-                       if (cand_cdr->fn_table != &bridge_state_fn_table ||
-                                       strcmp(cdr->bridge, cand_cdr->bridge)) {
+                       cand_cdr_master = ao2_find(active_cdrs_master, channel_id, OBJ_SEARCH_KEY);
+                       if (!cand_cdr_master) {
                                continue;
                        }
 
-                       /* If we don't have a Party B (originated channel), skip it */
-                       if (!cdr->party_b.snapshot) {
-                               continue;
-                       }
+                       ao2_lock(cand_cdr_master);
+                       for (cand_cdr = cand_cdr_master; cand_cdr; cand_cdr = cand_cdr->next) {
+                               /* Skip any records that are not in a bridge or in this bridge.
+                                * I'm not sure how that would happen, but it pays to be careful. */
+                               if (cand_cdr->fn_table != &bridge_state_fn_table
+                                       || strcmp(cdr->bridge, cand_cdr->bridge)) {
+                                       continue;
+                               }
 
-                       /* Skip any records that aren't our Party B */
-                       if (strcasecmp(cdr->party_b.snapshot->name, cand_cdr->party_a.snapshot->name)) {
-                               continue;
-                       }
-                       cdr_object_snapshot_copy(&cdr->party_b, &cand_cdr->party_a);
-                       /* If they have a Party B, they joined up with someone else as their
-                        * Party A. Don't finalize them as they're active. Otherwise, we
-                        * have stolen them so they need to be finalized.
-                        */
-                       if (!cand_cdr->party_b.snapshot) {
-                               cdr_object_finalize(cand_cdr);
+                               /* Skip any records that aren't our Party B */
+                               if (strcasecmp(cdr->party_b.snapshot->name, cand_cdr->party_a.snapshot->name)) {
+                                       continue;
+                               }
+                               cdr_object_snapshot_copy(&cdr->party_b, &cand_cdr->party_a);
+                               /* If they have a Party B, they joined up with someone else as their
+                                * Party A. Don't finalize them as they're active. Otherwise, we
+                                * have stolen them so they need to be finalized.
+                                */
+                               if (!cand_cdr->party_b.snapshot) {
+                                       cdr_object_finalize(cand_cdr);
+                               }
+                               success = 1;
+                               break;
                        }
-                       success = 1;
-                       break;
+                       ao2_unlock(cand_cdr_master);
+                       ao2_cleanup(cand_cdr_master);
                }
-               ao2_unlock(cand_cdr_master);
-               ao2_cleanup(cand_cdr_master);
+               ao2_iterator_destroy(&it_cdrs);
        }
-       ao2_iterator_destroy(&it_cdrs);
 
        /* We always transition state, even if we didn't get a peer */
        cdr_object_transition_state(cdr, &bridge_state_fn_table);
@@ -1963,7 +2106,12 @@ static void handle_dial_message(void *data, struct stasis_subscription *sub, str
        if (!peer && !caller) {
                return;
        }
-       if (filter_channel_snapshot(peer) || (caller && filter_channel_snapshot(caller))) {
+
+       if (peer && filter_channel_snapshot(peer)) {
+               return;
+       }
+
+       if (caller && filter_channel_snapshot(caller)) {
                return;
        }
 
@@ -1981,9 +2129,9 @@ static void handle_dial_message(void *data, struct stasis_subscription *sub, str
 
        /* Figure out who is running this show */
        if (caller) {
-               cdr = ao2_find(active_cdrs_by_channel, caller->uniqueid, OBJ_SEARCH_KEY);
+               cdr = ao2_find(active_cdrs_master, caller->uniqueid, OBJ_SEARCH_KEY);
        } else {
-               cdr = ao2_find(active_cdrs_by_channel, peer->uniqueid, OBJ_SEARCH_KEY);
+               cdr = ao2_find(active_cdrs_master, peer->uniqueid, OBJ_SEARCH_KEY);
        }
        if (!cdr) {
                ast_log(AST_LOG_WARNING, "No CDR for channel %s\n", caller ? caller->name : peer->name);
@@ -2032,38 +2180,54 @@ static void handle_dial_message(void *data, struct stasis_subscription *sub, str
        ao2_cleanup(cdr);
 }
 
-static int cdr_object_finalize_party_b(void *obj, void *arg, int flags)
+static int cdr_object_finalize_party_b(void *obj, void *arg, void *data, int flags)
 {
        struct cdr_object *cdr = obj;
-       struct ast_channel_snapshot *party_b = arg;
-       struct cdr_object *it_cdr;
 
-       for (it_cdr = cdr; it_cdr; it_cdr = it_cdr->next) {
-               if (it_cdr->party_b.snapshot
-                       && !strcasecmp(it_cdr->party_b.snapshot->name, party_b->name)) {
-                       /* Don't transition to the finalized state - let the Party A do
-                        * that when its ready
-                        */
-                       cdr_object_finalize(it_cdr);
-               }
+       if (!strcasecmp(cdr->party_b_name, arg)) {
+#ifdef AST_DEVMODE
+               struct ast_channel_snapshot *party_b = data;
+
+               /*
+                * For sanity's sake we also assert the party_b snapshot
+                * is consistent with the key.
+                */
+               ast_assert(cdr->party_b.snapshot
+                       && !strcasecmp(cdr->party_b.snapshot->name, party_b->name));
+#endif
+
+               /* Don't transition to the finalized state - let the Party A do
+                * that when its ready
+                */
+               cdr_object_finalize(cdr);
        }
        return 0;
 }
 
-static int cdr_object_update_party_b(void *obj, void *arg, int flags)
+static int cdr_object_update_party_b(void *obj, void *arg, void *data, int flags)
 {
        struct cdr_object *cdr = obj;
-       struct ast_channel_snapshot *party_b = arg;
-       struct cdr_object *it_cdr;
 
-       for (it_cdr = cdr; it_cdr; it_cdr = it_cdr->next) {
-               if (!it_cdr->fn_table->process_party_b) {
-                       continue;
-               }
-               if (it_cdr->party_b.snapshot
-                       && !strcasecmp(it_cdr->party_b.snapshot->name, party_b->name)) {
-                       it_cdr->fn_table->process_party_b(it_cdr, party_b);
+       if (cdr->fn_table->process_party_b
+               && !strcasecmp(cdr->party_b_name, arg)) {
+               struct ast_channel_snapshot *party_b = data;
+
+               /*
+                * For sanity's sake we also check the party_b snapshot
+                * for consistency with the key.  The callback needs and
+                * asserts the snapshot to be this way.
+                */
+               if (!cdr->party_b.snapshot
+                       || strcasecmp(cdr->party_b.snapshot->name, party_b->name)) {
+                       ast_log(LOG_NOTICE,
+                               "CDR for Party A %s(%s) has inconsistent Party B %s name.  Message can be ignored but this shouldn't happen.\n",
+                               cdr->linkedid,
+                               cdr->party_a.snapshot->name,
+                               cdr->party_b_name);
+                       return 0;
                }
+
+               cdr->fn_table->process_party_b(cdr, party_b);
        }
        return 0;
 }
@@ -2122,12 +2286,12 @@ static void handle_channel_cache_message(void *data, struct stasis_subscription
                        return;
                }
                cdr->is_root = 1;
-               ao2_link(active_cdrs_by_channel, cdr);
+               ao2_link(active_cdrs_master, cdr);
        } else {
                const char *uniqueid;
 
                uniqueid = new_snapshot ? new_snapshot->uniqueid : old_snapshot->uniqueid;
-               cdr = ao2_find(active_cdrs_by_channel, uniqueid, OBJ_SEARCH_KEY);
+               cdr = ao2_find(active_cdrs_master, uniqueid, OBJ_SEARCH_KEY);
        }
 
        /* Handle Party A */
@@ -2137,44 +2301,46 @@ static void handle_channel_cache_message(void *data, struct stasis_subscription
                name = new_snapshot ? new_snapshot->name : old_snapshot->name;
                ast_log(AST_LOG_WARNING, "No CDR for channel %s\n", name);
                ast_assert(0);
-       } else {
-               ao2_lock(cdr);
-               if (new_snapshot) {
-                       int all_reject = 1;
+       } else if (new_snapshot) {
+               int all_reject = 1;
 
-                       for (it_cdr = cdr; it_cdr; it_cdr = it_cdr->next) {
-                               if (!it_cdr->fn_table->process_party_a) {
-                                       continue;
-                               }
-                               all_reject &= it_cdr->fn_table->process_party_a(it_cdr, new_snapshot);
+               ao2_lock(cdr);
+               for (it_cdr = cdr; it_cdr; it_cdr = it_cdr->next) {
+                       if (!it_cdr->fn_table->process_party_a) {
+                               continue;
                        }
-                       if (all_reject && check_new_cdr_needed(old_snapshot, new_snapshot)) {
-                               /* We're not hung up and we have a new snapshot - we need a new CDR */
-                               struct cdr_object *new_cdr;
+                       all_reject &= it_cdr->fn_table->process_party_a(it_cdr, new_snapshot);
+               }
+               if (all_reject && check_new_cdr_needed(old_snapshot, new_snapshot)) {
+                       /* We're not hung up and we have a new snapshot - we need a new CDR */
+                       struct cdr_object *new_cdr;
 
-                               new_cdr = cdr_object_create_and_append(cdr);
-                               if (new_cdr) {
-                                       new_cdr->fn_table->process_party_a(new_cdr, new_snapshot);
-                               }
-                       }
-               } else {
-                       CDR_DEBUG("%p - Beginning finalize/dispatch for %s\n", cdr, old_snapshot->name);
-                       for (it_cdr = cdr; it_cdr; it_cdr = it_cdr->next) {
-                               cdr_object_finalize(it_cdr);
+                       new_cdr = cdr_object_create_and_append(cdr);
+                       if (new_cdr) {
+                               new_cdr->fn_table->process_party_a(new_cdr, new_snapshot);
                        }
-                       cdr_object_dispatch(cdr);
-                       ao2_unlink(active_cdrs_by_channel, cdr);
                }
                ao2_unlock(cdr);
+       } else {
+               ao2_lock(cdr);
+               CDR_DEBUG("%p - Beginning finalize/dispatch for %s\n", cdr, old_snapshot->name);
+               for (it_cdr = cdr; it_cdr; it_cdr = it_cdr->next) {
+                       cdr_object_finalize(it_cdr);
+               }
+               cdr_object_dispatch(cdr);
+               ao2_unlock(cdr);
+
+               cdr_all_unlink(cdr);
+               ao2_unlink(active_cdrs_master, cdr);
        }
 
        /* Handle Party B */
        if (new_snapshot) {
-               ao2_callback(active_cdrs_by_channel, OBJ_NODATA, cdr_object_update_party_b,
-                       new_snapshot);
+               ao2_callback_data(active_cdrs_all, OBJ_NODATA | OBJ_MULTIPLE | OBJ_SEARCH_KEY,
+                       cdr_object_update_party_b, (char *) new_snapshot->name, new_snapshot);
        } else {
-               ao2_callback(active_cdrs_by_channel, OBJ_NODATA, cdr_object_finalize_party_b,
-                       old_snapshot);
+               ao2_callback_data(active_cdrs_all, OBJ_NODATA | OBJ_MULTIPLE | OBJ_SEARCH_KEY,
+                       cdr_object_finalize_party_b, (char *) old_snapshot->name, old_snapshot);
        }
 
        ao2_cleanup(cdr);
@@ -2186,29 +2352,25 @@ struct bridge_leave_data {
 };
 
 /*! \brief Callback used to notify CDRs of a Party B leaving the bridge */
-static int cdr_object_party_b_left_bridge_cb(void *obj, void *arg, int flags)
+static int cdr_object_party_b_left_bridge_cb(void *obj, void *arg, void *data, int flags)
 {
        struct cdr_object *cdr = obj;
-       struct bridge_leave_data *leave_data = arg;
-       struct cdr_object *it_cdr;
+       struct bridge_leave_data *leave_data = data;
+
+       if (cdr->fn_table == &bridge_state_fn_table
+               && !strcmp(cdr->bridge, leave_data->bridge->uniqueid)
+               && !strcasecmp(cdr->party_b_name, arg)) {
+               /*
+                * For sanity's sake we also assert the party_b snapshot
+                * is consistent with the key.
+                */
+               ast_assert(cdr->party_b.snapshot
+                       && !strcasecmp(cdr->party_b.snapshot->name, leave_data->channel->name));
 
-       if (strcmp(cdr->bridge, leave_data->bridge->uniqueid)) {
-               return 0;
-       }
-       for (it_cdr = cdr; it_cdr; it_cdr = it_cdr->next) {
-               if (it_cdr->fn_table != &bridge_state_fn_table) {
-                       continue;
-               }
-               if (!it_cdr->party_b.snapshot) {
-                       continue;
-               }
-               if (strcasecmp(it_cdr->party_b.snapshot->name, leave_data->channel->name)) {
-                       continue;
-               }
                /* It is our Party B, in our bridge. Set the end time and let the handler
                 * transition our CDR appropriately when we leave the bridge.
                 */
-               cdr_object_finalize(it_cdr);
+               cdr_object_finalize(cdr);
        }
        return 0;
 }
@@ -2259,7 +2421,7 @@ static void handle_bridge_leave_message(void *data, struct stasis_subscription *
                (unsigned int)stasis_message_timestamp(message)->tv_sec,
                (unsigned int)stasis_message_timestamp(message)->tv_usec);
 
-       cdr = ao2_find(active_cdrs_by_channel, channel->uniqueid, OBJ_SEARCH_KEY);
+       cdr = ao2_find(active_cdrs_master, channel->uniqueid, OBJ_SEARCH_KEY);
        if (!cdr) {
                ast_log(AST_LOG_WARNING, "No CDR for channel %s\n", channel->name);
                ast_assert(0);
@@ -2284,8 +2446,8 @@ static void handle_bridge_leave_message(void *data, struct stasis_subscription *
        /* Party B */
        if (left_bridge
                && strcmp(bridge->subclass, "parking")) {
-               ao2_callback(active_cdrs_by_channel, OBJ_NODATA,
-                       cdr_object_party_b_left_bridge_cb,
+               ao2_callback_data(active_cdrs_all, OBJ_NODATA | OBJ_MULTIPLE | OBJ_SEARCH_KEY,
+                       cdr_object_party_b_left_bridge_cb, (char *) leave_data.channel->name,
                        &leave_data);
        }
 
@@ -2308,6 +2470,7 @@ static void bridge_candidate_add_to_cdr(struct cdr_object *cdr,
                return;
        }
        cdr_object_snapshot_copy(&new_cdr->party_b, party_b);
+       cdr_all_relink(new_cdr);
        cdr_object_check_party_a_answer(new_cdr);
        ast_string_field_set(new_cdr, bridge, cdr->bridge);
        cdr_object_transition_state(new_cdr, &bridge_state_fn_table);
@@ -2328,12 +2491,12 @@ static void bridge_candidate_add_to_cdr(struct cdr_object *cdr,
  * \param cand_cdr The \ref cdr_object that is a candidate
  *
  */
-static int bridge_candidate_process(struct cdr_object *cdr, struct cdr_object *base_cand_cdr)
+static void bridge_candidate_process(struct cdr_object *cdr, struct cdr_object *base_cand_cdr)
 {
        struct cdr_object_snapshot *party_a;
        struct cdr_object *cand_cdr;
 
-       SCOPED_AO2LOCK(lock, base_cand_cdr);
+       ao2_lock(base_cand_cdr);
 
        for (cand_cdr = base_cand_cdr; cand_cdr; cand_cdr = cand_cdr->next) {
                /* Skip any records that are not in this bridge */
@@ -2345,7 +2508,7 @@ static int bridge_candidate_process(struct cdr_object *cdr, struct cdr_object *b
                if (!strcasecmp(cdr->party_a.snapshot->name, cand_cdr->party_a.snapshot->name)
                        || (cdr->party_b.snapshot
                                && !strcasecmp(cdr->party_b.snapshot->name, cand_cdr->party_a.snapshot->name))) {
-                       return 0;
+                       break;
                }
 
                party_a = cdr_object_pick_party_a(&cdr->party_a, &cand_cdr->party_a);
@@ -2353,7 +2516,7 @@ static int bridge_candidate_process(struct cdr_object *cdr, struct cdr_object *b
                 * Party B */
                if (!strcasecmp(party_a->snapshot->name, cdr->party_a.snapshot->name)) {
                        bridge_candidate_add_to_cdr(cdr, &cand_cdr->party_a);
-                       return 0;
+                       break;
                }
 
                /* We're Party B. Check if we can add ourselves immediately or if we need
@@ -2366,15 +2529,18 @@ static int bridge_candidate_process(struct cdr_object *cdr, struct cdr_object *b
                                cand_cdr, cand_cdr->party_a.snapshot->name,
                                cdr->party_a.snapshot->name);
                        cdr_object_snapshot_copy(&cand_cdr->party_b, &cdr->party_a);
+                       cdr_all_relink(cand_cdr);
                        /* It's possible that this joined at one point and was never chosen
                         * as party A. Clear their end time, as it would be set in such a
                         * case.
                         */
                        memset(&cand_cdr->end, 0, sizeof(cand_cdr->end));
                }
-               return 0;
+
+               break;
        }
-       return 0;
+
+       ao2_unlock(base_cand_cdr);
 }
 
 /*!
@@ -2392,7 +2558,7 @@ static void handle_bridge_pairings(struct cdr_object *cdr, struct ast_bridge_sna
        while ((channel_id = ao2_iterator_next(&it_channels))) {
                struct cdr_object *cand_cdr;
 
-               cand_cdr = ao2_find(active_cdrs_by_channel, channel_id, OBJ_SEARCH_KEY);
+               cand_cdr = ao2_find(active_cdrs_master, channel_id, OBJ_SEARCH_KEY);
                if (cand_cdr) {
                        bridge_candidate_process(cdr, cand_cdr);
                        ao2_ref(cand_cdr, -1);
@@ -2457,6 +2623,7 @@ static void handle_standard_bridge_enter_message(struct cdr_object *cdr,
 
        ao2_lock(cdr);
 
+try_again:
        for (it_cdr = cdr; it_cdr; it_cdr = it_cdr->next) {
                if (it_cdr->fn_table->process_party_a) {
                        CDR_DEBUG("%p - Updating Party A %s snapshot\n", it_cdr,
@@ -2509,7 +2676,7 @@ static void handle_standard_bridge_enter_message(struct cdr_object *cdr,
                        /* This is guaranteed to succeed: the new CDR is created in the single state
                         * and will be able to handle the bridge enter message
                         */
-                       handle_standard_bridge_enter_message(cdr, bridge, channel);
+                       goto try_again;
                }
        }
        ao2_unlock(cdr);
@@ -2544,7 +2711,7 @@ static void handle_bridge_enter_message(void *data, struct stasis_subscription *
                (unsigned int)stasis_message_timestamp(message)->tv_sec,
                (unsigned int)stasis_message_timestamp(message)->tv_usec);
 
-       cdr = ao2_find(active_cdrs_by_channel, channel->uniqueid, OBJ_SEARCH_KEY);
+       cdr = ao2_find(active_cdrs_master, channel->uniqueid, OBJ_SEARCH_KEY);
        if (!cdr) {
                ast_log(AST_LOG_WARNING, "No CDR for channel %s\n", channel->name);
                ast_assert(0);
@@ -2594,7 +2761,7 @@ static void handle_parked_call_message(void *data, struct stasis_subscription *s
                (unsigned int)stasis_message_timestamp(message)->tv_sec,
                (unsigned int)stasis_message_timestamp(message)->tv_usec);
 
-       cdr = ao2_find(active_cdrs_by_channel, channel->uniqueid, OBJ_SEARCH_KEY);
+       cdr = ao2_find(active_cdrs_master, channel->uniqueid, OBJ_SEARCH_KEY);
        if (!cdr) {
                ast_log(AST_LOG_WARNING, "No CDR for channel %s\n", channel->name);
                ast_assert(0);
@@ -2718,32 +2885,39 @@ int ast_cdr_backend_unsuspend(const char *name)
 
 static int cdr_generic_register(struct be_list *generic_list, const char *name, const char *desc, ast_cdrbe be)
 {
-       struct cdr_beitem *i = NULL;
+       struct cdr_beitem *i;
+       struct cdr_beitem *cur;
 
-       if (!name)
+       if (!name) {
                return -1;
+       }
 
        if (!be) {
                ast_log(LOG_WARNING, "CDR engine '%s' lacks backend\n", name);
+
+               return -1;
+       }
+
+       i = ast_calloc(1, sizeof(*i));
+       if (!i) {
                return -1;
        }
 
+       i->be = be;
+       ast_copy_string(i->name, name, sizeof(i->name));
+       ast_copy_string(i->desc, desc, sizeof(i->desc));
+
        AST_RWLIST_WRLOCK(generic_list);
-       AST_RWLIST_TRAVERSE(generic_list, i, list) {
-               if (!strcasecmp(name, i->name)) {
+       AST_RWLIST_TRAVERSE(generic_list, cur, list) {
+               if (!strcasecmp(name, cur->name)) {
                        ast_log(LOG_WARNING, "Already have a CDR backend called '%s'\n", name);
                        AST_RWLIST_UNLOCK(generic_list);
+                       ast_free(i);
+
                        return -1;
                }
        }
 
-       if (!(i = ast_calloc(1, sizeof(*i))))
-               return -1;
-
-       i->be = be;
-       ast_copy_string(i->name, name, sizeof(i->name));
-       ast_copy_string(i->desc, desc, sizeof(i->desc));
-
        AST_RWLIST_INSERT_HEAD(generic_list, i, list);
        AST_RWLIST_UNLOCK(generic_list);
 
@@ -2777,7 +2951,7 @@ static int ast_cdr_generic_unregister(struct be_list *generic_list, const char *
                return 0;
        }
 
-       active_count = ao2_container_count(active_cdrs_by_channel);
+       active_count = ao2_container_count(active_cdrs_master);
 
        if (!match->suspended && active_count != 0) {
                AST_RWLIST_UNLOCK(generic_list);
@@ -3000,7 +3174,7 @@ int ast_cdr_setvar(const char *channel_name, const char *name, const char *value
                }
        }
 
-       it_cdrs = ao2_callback(active_cdrs_by_channel, OBJ_MULTIPLE, cdr_object_select_all_by_name_cb, arg);
+       it_cdrs = ao2_callback(active_cdrs_master, OBJ_MULTIPLE, cdr_object_select_all_by_name_cb, arg);
        if (!it_cdrs) {
                ast_log(AST_LOG_ERROR, "Unable to find CDR for channel %s\n", channel_name);
                return -1;
@@ -3128,7 +3302,7 @@ static struct cdr_object *cdr_object_get_by_name(const char *name)
        }
 
        param = ast_strdupa(name);
-       return ao2_callback(active_cdrs_by_channel, 0, cdr_object_get_by_name_cb, param);
+       return ao2_callback(active_cdrs_master, 0, cdr_object_get_by_name_cb, param);
 }
 
 int ast_cdr_getvar(const char *channel_name, const char *name, char *value, size_t length)
@@ -3262,21 +3436,25 @@ struct party_b_userfield_update {
 };
 
 /*! \brief Callback used to update the userfield on Party B on all CDRs */
-static int cdr_object_update_party_b_userfield_cb(void *obj, void *arg, int flags)
+static int cdr_object_update_party_b_userfield_cb(void *obj, void *arg, void *data, int flags)
 {
        struct cdr_object *cdr = obj;
-       struct party_b_userfield_update *info = arg;
-       struct cdr_object *it_cdr;
 
-       for (it_cdr = cdr; it_cdr; it_cdr = it_cdr->next) {
-               if (it_cdr->fn_table == &finalized_state_fn_table && it_cdr->next != NULL) {
-                       continue;
-               }
-               if (it_cdr->party_b.snapshot
-                       && !strcasecmp(it_cdr->party_b.snapshot->name, info->channel_name)) {
-                       strcpy(it_cdr->party_b.userfield, info->userfield);
-               }
+       if ((cdr->fn_table != &finalized_state_fn_table || !cdr->next)
+               && !strcasecmp(cdr->party_b_name, arg)) {
+               struct party_b_userfield_update *info = data;
+
+               /*
+                * For sanity's sake we also assert the party_b snapshot
+                * is consistent with the key.
+                */
+               ast_assert(cdr->party_b.snapshot
+                       && !strcasecmp(cdr->party_b.snapshot->name, info->channel_name));
+
+               ast_copy_string(cdr->party_b.userfield, info->userfield,
+                       sizeof(cdr->party_b.userfield));
        }
+
        return 0;
 }
 
@@ -3284,8 +3462,8 @@ void ast_cdr_setuserfield(const char *channel_name, const char *userfield)
 {
        struct cdr_object *cdr;
        struct party_b_userfield_update party_b_info = {
-                       .channel_name = channel_name,
-                       .userfield = userfield,
+               .channel_name = channel_name,
+               .userfield = userfield,
        };
        struct cdr_object *it_cdr;
 
@@ -3297,15 +3475,16 @@ void ast_cdr_setuserfield(const char *channel_name, const char *userfield)
                        if (it_cdr->fn_table == &finalized_state_fn_table && it_cdr->next != NULL) {
                                continue;
                        }
-                       ast_copy_string(it_cdr->party_a.userfield, userfield, AST_MAX_USER_FIELD);
+                       ast_copy_string(it_cdr->party_a.userfield, userfield,
+                               sizeof(it_cdr->party_a.userfield));
                }
                ao2_unlock(cdr);
        }
 
        /* Handle Party B */
-       ao2_callback(active_cdrs_by_channel, OBJ_NODATA,
-                       cdr_object_update_party_b_userfield_cb,
-                       &party_b_info);
+       ao2_callback_data(active_cdrs_all, OBJ_NODATA | OBJ_MULTIPLE | OBJ_SEARCH_KEY,
+               cdr_object_update_party_b_userfield_cb, (char *) party_b_info.channel_name,
+               &party_b_info);
 
        ao2_cleanup(cdr);
 }
@@ -3485,6 +3664,7 @@ int ast_cdr_fork(const char *channel_name, struct ast_flags *options)
                if (cdr_obj->party_b.snapshot) {
                        new_cdr->party_b.snapshot = cdr_obj->party_b.snapshot;
                        ao2_ref(new_cdr->party_b.snapshot, +1);
+                       cdr_all_relink(new_cdr);
                        strcpy(new_cdr->party_b.userfield, cdr_obj->party_b.userfield);
                        new_cdr->party_b.flags = cdr_obj->party_b.flags;
                        if (ast_test_flag(options, AST_CDR_FLAG_KEEP_VARS)) {
@@ -3605,6 +3785,7 @@ static void cdr_submit_batch(int do_shutdown)
 static int submit_scheduled_batch(const void *data)
 {
        struct module_config *mod_cfg;
+       int nextms;
 
        cdr_submit_batch(0);
 
@@ -3613,25 +3794,23 @@ static int submit_scheduled_batch(const void *data)
                return 0;
        }
 
-       /* manually reschedule from this point in time */
-       ast_mutex_lock(&cdr_sched_lock);
-       cdr_sched = ast_sched_add(sched, mod_cfg->general->batch_settings.time * 1000, submit_scheduled_batch, NULL);
-       ast_mutex_unlock(&cdr_sched_lock);
+       /* Calculate the next scheduled interval */
+       nextms = mod_cfg->general->batch_settings.time * 1000;
 
        ao2_cleanup(mod_cfg);
-       /* returning zero so the scheduler does not automatically reschedule */
-       return 0;
+
+       return nextms;
 }
 
 /*! Do not hold the batch lock while calling this function */
-static void submit_unscheduled_batch(void)
+static void start_batch_mode(void)
 {
        /* Prevent two deletes from happening at the same time */
        ast_mutex_lock(&cdr_sched_lock);
        /* this is okay since we are not being called from within the scheduler */
        AST_SCHED_DEL(sched, cdr_sched);
        /* schedule the submission to occur ASAP (1 ms) */
-       cdr_sched = ast_sched_add(sched, 1, submit_scheduled_batch, NULL);
+       cdr_sched = ast_sched_add_variable(sched, 1, submit_scheduled_batch, NULL, 1);
        ast_mutex_unlock(&cdr_sched_lock);
 
        /* signal the do_cdr thread to wakeup early and do some work (that lazy thread ;) */
@@ -3696,9 +3875,9 @@ static void cdr_detach(struct ast_cdr *cdr)
        }
        ast_mutex_unlock(&cdr_batch_lock);
 
-       /* Don't call submit_unscheduled_batch with the cdr_batch_lock held */
+       /* Don't submit a batch with cdr_batch_lock held */
        if (submit_batch) {
-               submit_unscheduled_batch();
+               start_batch_mode();
        }
 }
 
@@ -3712,7 +3891,7 @@ static void *do_cdr(void *data)
                struct timeval now;
                schedms = ast_sched_wait(sched);
                /* this shouldn't happen, but provide a 1 second default just in case */
-               if (schedms <= 0)
+               if (schedms < 0)
                        schedms = 1000;
                now = ast_tvadd(ast_tvnow(), ast_samp2tv(schedms, 1000));
                timeout.tv_sec = now.tv_sec;
@@ -3722,7 +3901,7 @@ static void *do_cdr(void *data)
                ast_cond_timedwait(&cdr_pending_cond, &cdr_pending_lock, &timeout);
                numevents = ast_sched_runq(sched);
                ast_mutex_unlock(&cdr_pending_lock);
-               ast_debug(2, "Processed %d scheduled CDR batches from the run queue\n", numevents);
+               ast_debug(2, "Processed %d CDR batches from the run queue\n", numevents);
        }
 
        return NULL;
@@ -3771,18 +3950,14 @@ static char *handle_cli_debug(struct ast_cli_entry *e, int cmd, struct ast_cli_a
 /*! \brief Complete user input for 'cdr show' */
 static char *cli_complete_show(struct ast_cli_args *a)
 {
-       char *result = NULL;
        int wordlen = strlen(a->word);
-       int which = 0;
        struct ao2_iterator it_cdrs;
        struct cdr_object *cdr;
 
-       it_cdrs = ao2_iterator_init(active_cdrs_by_channel, 0);
+       it_cdrs = ao2_iterator_init(active_cdrs_master, 0);
        while ((cdr = ao2_iterator_next(&it_cdrs))) {
-               if (!strncasecmp(a->word, cdr->party_a.snapshot->name, wordlen) &&
-                       (++which > a->n)) {
-                       result = ast_strdup(cdr->party_a.snapshot->name);
-                       if (result) {
+               if (!strncasecmp(a->word, cdr->party_a.snapshot->name, wordlen)) {
+                       if (ast_cli_completion_add(ast_strdup(cdr->party_a.snapshot->name))) {
                                ao2_ref(cdr, -1);
                                break;
                        }
@@ -3790,7 +3965,8 @@ static char *cli_complete_show(struct ast_cli_args *a)
                ao2_ref(cdr, -1);
        }
        ao2_iterator_destroy(&it_cdrs);
-       return result;
+
+       return NULL;
 }
 
 static void cli_show_channels(struct ast_cli_args *a)
@@ -3809,7 +3985,7 @@ static void cli_show_channels(struct ast_cli_args *a)
        ast_cli(a->fd, "--------------------------------------------------\n");
        ast_cli(a->fd, TITLE_STRING, "Channel", "Dst. Channel", "LastApp", "Start", "Answer", "End", "Billsec", "Duration");
 
-       it_cdrs = ao2_iterator_init(active_cdrs_by_channel, 0);
+       it_cdrs = ao2_iterator_init(active_cdrs_master, 0);
        for (; (cdr = ao2_iterator_next(&it_cdrs)); ao2_cleanup(cdr)) {
                struct cdr_object *it_cdr;
                struct timeval start_time = { 0, };
@@ -4040,10 +4216,10 @@ static char *handle_cli_submit(struct ast_cli_entry *e, int cmd, struct ast_cli_
 
        if (!ast_test_flag(&mod_cfg->general->settings, CDR_ENABLED)) {
                ast_cli(a->fd, "Cannot submit CDR batch: CDR engine disabled.\n");
-       } else if (ast_test_flag(&mod_cfg->general->settings, CDR_BATCHMODE)) {
+       } else if (!ast_test_flag(&mod_cfg->general->settings, CDR_BATCHMODE)) {
                ast_cli(a->fd, "Cannot submit CDR batch: batch mode not enabled.\n");
        } else {
-               submit_unscheduled_batch();
+               start_batch_mode();
                ast_cli(a->fd, "Submitted CDRs to backend engines for processing.  This may take a while.\n");
        }
        ao2_cleanup(mod_cfg);
@@ -4074,7 +4250,9 @@ static int cdr_object_dispatch_all_cb(void *obj, void *arg, int flags)
        cdr_object_dispatch(cdr);
        ao2_unlock(cdr);
 
-       return 0;
+       cdr_all_unlink(cdr);
+
+       return CMP_MATCH;
 }
 
 static void finalize_batch_mode(void)
@@ -4157,7 +4335,7 @@ static int process_config(int reload)
                aco_option_register(&cfg_info, "scheduleronly", ACO_EXACT, general_options, DEFAULT_BATCH_SCHEDULER_ONLY, OPT_BOOLFLAG_T, 1, FLDSET(struct ast_cdr_config, batch_settings.settings), BATCH_MODE_SCHEDULER_ONLY);
                aco_option_register(&cfg_info, "safeshutdown", ACO_EXACT, general_options, DEFAULT_BATCH_SAFE_SHUTDOWN, OPT_BOOLFLAG_T, 1, FLDSET(struct ast_cdr_config, batch_settings.settings), BATCH_MODE_SAFE_SHUTDOWN);
                aco_option_register(&cfg_info, "size", ACO_EXACT, general_options, DEFAULT_BATCH_SIZE, OPT_UINT_T, PARSE_IN_RANGE, FLDSET(struct ast_cdr_config, batch_settings.size), 0, MAX_BATCH_SIZE);
-               aco_option_register(&cfg_info, "time", ACO_EXACT, general_options, DEFAULT_BATCH_TIME, OPT_UINT_T, PARSE_IN_RANGE, FLDSET(struct ast_cdr_config, batch_settings.time), 0, MAX_BATCH_TIME);
+               aco_option_register(&cfg_info, "time", ACO_EXACT, general_options, DEFAULT_BATCH_TIME, OPT_UINT_T, PARSE_IN_RANGE, FLDSET(struct ast_cdr_config, batch_settings.time), 1, MAX_BATCH_TIME);
        }
 
        if (aco_process_config(&cfg_info, reload) == ACO_PROCESS_ERROR) {
@@ -4185,11 +4363,6 @@ static int process_config(int reload)
        return 0;
 }
 
-static void cdr_engine_cleanup(void)
-{
-       destroy_subscriptions();
-}
-
 static void cdr_engine_shutdown(void)
 {
        stasis_message_router_unsubscribe_and_join(stasis_router);
@@ -4200,8 +4373,8 @@ static void cdr_engine_shutdown(void)
 
        STASIS_MESSAGE_TYPE_CLEANUP(cdr_sync_message_type);
 
-       ao2_callback(active_cdrs_by_channel, OBJ_NODATA, cdr_object_dispatch_all_cb,
-               NULL);
+       ao2_callback(active_cdrs_master, OBJ_NODATA | OBJ_MULTIPLE | OBJ_UNLINK,
+               cdr_object_dispatch_all_cb, NULL);
        finalize_batch_mode();
        ast_cli_unregister_multiple(cli_commands, ARRAY_LEN(cli_commands));
        ast_sched_context_destroy(sched);
@@ -4212,15 +4385,17 @@ static void cdr_engine_shutdown(void)
        aco_info_destroy(&cfg_info);
        ao2_global_obj_release(module_configs);
 
-       ao2_container_unregister("cdrs_by_channel");
-       ao2_ref(active_cdrs_by_channel, -1);
-       active_cdrs_by_channel = NULL;
+       ao2_container_unregister("cdrs_master");
+       ao2_cleanup(active_cdrs_master);
+       active_cdrs_master = NULL;
+
+       ao2_container_unregister("cdrs_all");
+       ao2_cleanup(active_cdrs_all);
+       active_cdrs_all = NULL;
 }
 
 static void cdr_enable_batch_mode(struct ast_cdr_config *config)
 {
-       SCOPED_LOCK(batch, &cdr_batch_lock, ast_mutex_lock, ast_mutex_unlock);
-
        /* Only create the thread level portions once */
        if (cdr_thread == AST_PTHREADT_NULL) {
                ast_cond_init(&cdr_pending_cond, NULL);
@@ -4230,38 +4405,65 @@ static void cdr_enable_batch_mode(struct ast_cdr_config *config)
                }
        }
 
-       /* Kill the currently scheduled item */
-       AST_SCHED_DEL(sched, cdr_sched);
-       cdr_sched = ast_sched_add(sched, config->batch_settings.time * 1000, submit_scheduled_batch, NULL);
+       /* Start the batching process */
+       start_batch_mode();
+
        ast_log(LOG_NOTICE, "CDR batch mode logging enabled, first of either size %u or time %u seconds.\n",
                        config->batch_settings.size, config->batch_settings.time);
 }
 
 /*!
  * \internal
- * \brief Print channel object key (name).
+ * \brief Print master CDR container object.
  * \since 12.0.0
  *
- * \param v_obj A pointer to the object we want the key printed.
+ * \param v_obj A pointer to the object we want printed.
  * \param where User data needed by prnt to determine where to put output.
  * \param prnt Print output callback function to use.
  *
  * \return Nothing
  */
-static void cdr_container_print_fn(void *v_obj, void *where, ao2_prnt_fn *prnt)
+static void cdr_master_print_fn(void *v_obj, void *where, ao2_prnt_fn *prnt)
 {
        struct cdr_object *cdr = v_obj;
        struct cdr_object *it_cdr;
+
        if (!cdr) {
                return;
        }
        for (it_cdr = cdr; it_cdr; it_cdr = it_cdr->next) {
-               prnt(where, "Party A: %s; Party B: %s; Bridge %s\n", it_cdr->party_a.snapshot->name, it_cdr->party_b.snapshot ? it_cdr->party_b.snapshot->name : "<unknown>",
-                               it_cdr->bridge);
+               prnt(where, "Party A: %s; Party B: %s; Bridge %s\n",
+                       it_cdr->party_a.snapshot->name,
+                       it_cdr->party_b.snapshot ? it_cdr->party_b.snapshot->name : "<unknown>",
+                       it_cdr->bridge);
        }
 }
 
 /*!
+ * \internal
+ * \brief Print all CDR container object.
+ * \since 13.19.0
+ *
+ * \param v_obj A pointer to the object we want printed.
+ * \param where User data needed by prnt to determine where to put output.
+ * \param prnt Print output callback function to use.
+ *
+ * \return Nothing
+ */
+static void cdr_all_print_fn(void *v_obj, void *where, ao2_prnt_fn *prnt)
+{
+       struct cdr_object *cdr = v_obj;
+
+       if (!cdr) {
+               return;
+       }
+       prnt(where, "Party A: %s; Party B: %s; Bridge %s",
+               cdr->party_a.snapshot->name,
+               cdr->party_b.snapshot ? cdr->party_b.snapshot->name : "<unknown>",
+               cdr->bridge);
+}
+
+/*!
  * \brief Checks if CDRs are enabled and enables/disables the necessary options
  */
 static int cdr_toggle_runtime_options(void)
@@ -4291,26 +4493,33 @@ static int cdr_toggle_runtime_options(void)
        return mod_cfg ? 0 : -1;
 }
 
-int ast_cdr_engine_init(void)
+static int unload_module(void)
+{
+       destroy_subscriptions();
+
+       return 0;
+}
+
+static int load_module(void)
 {
        if (process_config(0)) {
-               return -1;
+               return AST_MODULE_LOAD_FAILURE;
        }
 
        cdr_topic = stasis_topic_create("cdr_engine");
        if (!cdr_topic) {
-               return -1;
+               return AST_MODULE_LOAD_FAILURE;
        }
 
        stasis_router = stasis_message_router_create(cdr_topic);
        if (!stasis_router) {
-               return -1;
+               return AST_MODULE_LOAD_FAILURE;
        }
        stasis_message_router_set_congestion_limits(stasis_router, -1,
                10 * AST_TASKPROCESSOR_HIGH_WATER_LEVEL);
 
        if (STASIS_MESSAGE_TYPE_INIT(cdr_sync_message_type)) {
-               return -1;
+               return AST_MODULE_LOAD_FAILURE;
        }
 
        stasis_message_router_add_cache_update(stasis_router, ast_channel_snapshot_type(), handle_channel_cache_message, NULL);
@@ -4320,24 +4529,30 @@ int ast_cdr_engine_init(void)
        stasis_message_router_add(stasis_router, ast_parked_call_type(), handle_parked_call_message, NULL);
        stasis_message_router_add(stasis_router, cdr_sync_message_type(), handle_cdr_sync_message, NULL);
 
-       active_cdrs_by_channel = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
-               NUM_CDR_BUCKETS, cdr_object_channel_hash_fn, NULL, cdr_object_channel_cmp_fn);
-       if (!active_cdrs_by_channel) {
-               return -1;
+       active_cdrs_master = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
+               NUM_CDR_BUCKETS, cdr_master_hash_fn, NULL, cdr_master_cmp_fn);
+       if (!active_cdrs_master) {
+               return AST_MODULE_LOAD_FAILURE;
+       }
+       ao2_container_register("cdrs_master", active_cdrs_master, cdr_master_print_fn);
+
+       active_cdrs_all = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
+               NUM_CDR_BUCKETS, cdr_all_hash_fn, NULL, cdr_all_cmp_fn);
+       if (!active_cdrs_all) {
+               return AST_MODULE_LOAD_FAILURE;
        }
-       ao2_container_register("cdrs_by_channel", active_cdrs_by_channel, cdr_container_print_fn);
+       ao2_container_register("cdrs_all", active_cdrs_all, cdr_all_print_fn);
 
        sched = ast_sched_context_create();
        if (!sched) {
                ast_log(LOG_ERROR, "Unable to create schedule context.\n");
-               return -1;
+               return AST_MODULE_LOAD_FAILURE;
        }
 
        ast_cli_register_multiple(cli_commands, ARRAY_LEN(cli_commands));
-       ast_register_cleanup(cdr_engine_cleanup);
        ast_register_atexit(cdr_engine_shutdown);
 
-       return cdr_toggle_runtime_options();
+       return cdr_toggle_runtime_options() ? AST_MODULE_LOAD_FAILURE : AST_MODULE_LOAD_SUCCESS;
 }
 
 void ast_cdr_engine_term(void)
@@ -4380,7 +4595,7 @@ void ast_cdr_engine_term(void)
        }
 }
 
-int ast_cdr_engine_reload(void)
+static int reload_module(void)
 {
        struct module_config *old_mod_cfg;
        struct module_config *mod_cfg;
@@ -4406,3 +4621,12 @@ int ast_cdr_engine_reload(void)
        ao2_cleanup(old_mod_cfg);
        return cdr_toggle_runtime_options();
 }
+
+AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS | AST_MODFLAG_LOAD_ORDER, "CDR Engine",
+       .support_level = AST_MODULE_SUPPORT_CORE,
+       .load = load_module,
+       .unload = unload_module,
+       .reload = reload_module,
+       .load_pri = AST_MODPRI_CORE,
+       .requires = "extconfig",
+);