Replace most uses of ast_register_atexit with ast_register_cleanup.
[asterisk/asterisk.git] / main / cdr.c
index 593f471..5e24dae 100644 (file)
@@ -98,19 +98,15 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
                                        </description>
                                </configOption>
                                <configOption name="unanswered">
-                                       <synopsis>Log calls that are never answered.</synopsis>
-                                       <description><para>Define whether or not to log unanswered calls. Setting this to "yes" will
-                                       report every attempt to ring a phone in dialing attempts, when it was not
-                                       answered. For example, if you try to dial 3 extensions, and this option is "yes",
-                                       you will get 3 CDR's, one for each phone that was rung. Some find this information horribly
-                                       useless. Others find it very valuable. Note, in "yes" mode, you will see one CDR, with one of
-                                       the call targets on one side, and the originating channel on the other, and then one CDR for
-                                       each channel attempted. This may seem redundant, but cannot be helped.</para>
-                                       <para>In brief, this option controls the reporting of unanswered calls which only have an A
-                                       party. Calls which get offered to an outgoing line, but are unanswered, are still
-                                       logged, and that is the intended behavior. (It also results in some B side CDRs being
-                                       output, as they have the B side channel as their source channel, and no destination
-                                       channel.)</para>
+                                       <synopsis>Log calls that are never answered and don't set an outgoing party.</synopsis>
+                                       <description><para>
+                                       Define whether or not to log unanswered calls that don't involve an outgoing party. Setting
+                                       this to "yes" will make calls to extensions that don't answer and don't set a side B channel
+                                       (such as by using the Dial application) receive CDR log entries. If this option is set to
+                                       "no", then those log entries will not be created. Unanswered calls which get offered to an
+                                       outgoing line will always receive log entries regardless of this option, and that is the
+                                       intended behavior.
+                                       </para>
                                        </description>
                                </configOption>
                                <configOption name="congestion">
@@ -129,7 +125,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
                                                channel. Any statistics are gathered from this new CDR. By enabling
                                                this option, no new CDR is created for the dialplan logic that is
                                                executed in <literal>h</literal> extensions or attached hangup handler
-                                               subroutines. The default value is <literal>no</literal>, indicating
+                                               subroutines. The default value is <literal>yes</literal>, indicating
                                                that a CDR will be generated during hangup logic.</para>
                                        </description>
                                </configOption>
@@ -203,7 +199,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
 #define DEFAULT_BATCHMODE "0"
 #define DEFAULT_UNANSWERED "0"
 #define DEFAULT_CONGESTION "0"
-#define DEFAULT_END_BEFORE_H_EXTEN "0"
+#define DEFAULT_END_BEFORE_H_EXTEN "1"
 #define DEFAULT_INITIATED_SECONDS "0"
 
 #define DEFAULT_BATCH_SIZE "100"
@@ -215,9 +211,10 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
 
 #define CDR_DEBUG(mod_cfg, fmt, ...) \
        do { \
-       if (ast_test_flag(&(mod_cfg)->general->settings, CDR_DEBUG)) { \
-               ast_verb(1, (fmt), ##__VA_ARGS__); \
-       } } while (0)
+               if (ast_test_flag(&(mod_cfg)->general->settings, CDR_DEBUG)) { \
+                       ast_verbose((fmt), ##__VA_ARGS__); \
+               } \
+       } while (0)
 
 static void cdr_detach(struct ast_cdr *cdr);
 static void cdr_submit_batch(int shutdown);
@@ -347,6 +344,9 @@ static struct stasis_forward *parking_subscription;
 /*! \brief The parent topic for all topics we want to aggregate for CDRs */
 static struct stasis_topic *cdr_topic;
 
+/*! \brief A message type used to synchronize with the CDR topic */
+STASIS_MESSAGE_TYPE_DEFN_LOCAL(cdr_sync_message_type);
+
 struct cdr_object;
 
 /*! \brief Return types for \ref process_bridge_enter functions */
@@ -689,6 +689,8 @@ struct cdr_object {
                AST_STRING_FIELD(bridge);           /*!< The bridge the party A happens to be in. */
                AST_STRING_FIELD(appl);             /*!< The last accepted application party A was in */
                AST_STRING_FIELD(data);             /*!< The data for the last accepted application party A was in */
+               AST_STRING_FIELD(context);          /*!< The accepted context for Party A */
+               AST_STRING_FIELD(exten);            /*!< The accepted extension for Party A */
        );
        struct cdr_object *next;                /*!< The next CDR object in the chain */
        struct cdr_object *last;                /*!< The last CDR object in the chain */
@@ -1082,14 +1084,13 @@ static struct ast_cdr *cdr_object_create_public_records(struct cdr_object *cdr)
        struct ast_var_t *it_var, *it_copy_var;
        struct ast_channel_snapshot *party_a;
        struct ast_channel_snapshot *party_b;
-       RAII_VAR(struct module_config *, mod_cfg, ao2_global_obj_ref(module_configs), ao2_cleanup);
 
        for (it_cdr = cdr; it_cdr; it_cdr = it_cdr->next) {
                struct ast_cdr *cdr_copy;
 
                /* Don't create records for CDRs where the party A was a dialed channel */
                if (snapshot_is_dialed(it_cdr->party_a.snapshot) && !it_cdr->party_b.snapshot) {
-                       CDR_DEBUG(mod_cfg, "%p - %s is dialed and has no Party B; discarding\n", it_cdr,
+                       ast_debug(1, "CDR for %s is dialed and has no Party B; discarding\n",
                                it_cdr->party_a.snapshot->name);
                        continue;
                }
@@ -1113,8 +1114,8 @@ static struct ast_cdr *cdr_object_create_public_records(struct cdr_object *cdr)
                ast_copy_string(cdr_copy->uniqueid, party_a->uniqueid, sizeof(cdr_copy->uniqueid));
                ast_copy_string(cdr_copy->lastapp, it_cdr->appl, sizeof(cdr_copy->lastapp));
                ast_copy_string(cdr_copy->lastdata, it_cdr->data, sizeof(cdr_copy->lastdata));
-               ast_copy_string(cdr_copy->dst, party_a->exten, sizeof(cdr_copy->dst));
-               ast_copy_string(cdr_copy->dcontext, party_a->context, sizeof(cdr_copy->dcontext));
+               ast_copy_string(cdr_copy->dst, it_cdr->exten, sizeof(cdr_copy->dst));
+               ast_copy_string(cdr_copy->dcontext, it_cdr->context, sizeof(cdr_copy->dcontext));
 
                /* Party B */
                if (party_b) {
@@ -1236,9 +1237,6 @@ static void cdr_object_set_disposition(struct cdr_object *cdr, int hangupcause)
  */
 static void cdr_object_finalize(struct cdr_object *cdr)
 {
-       RAII_VAR(struct module_config *, mod_cfg,
-                       ao2_global_obj_ref(module_configs), ao2_cleanup);
-
        if (!ast_tvzero(cdr->end)) {
                return;
        }
@@ -1259,11 +1257,11 @@ static void cdr_object_finalize(struct cdr_object *cdr)
        /* tv_usec is suseconds_t, which could be int or long */
        ast_debug(1, "Finalized CDR for %s - start %ld.%06ld answer %ld.%06ld end %ld.%06ld dispo %s\n",
                        cdr->party_a.snapshot->name,
-                       cdr->start.tv_sec,
+                       (long)cdr->start.tv_sec,
                        (long)cdr->start.tv_usec,
-                       cdr->answer.tv_sec,
+                       (long)cdr->answer.tv_sec,
                        (long)cdr->answer.tv_usec,
-                       cdr->end.tv_sec,
+                       (long)cdr->end.tv_sec,
                        (long)cdr->end.tv_usec,
                        ast_cdr_disp2str(cdr->disposition));
 }
@@ -1298,7 +1296,7 @@ static void cdr_object_check_party_a_answer(struct cdr_object *cdr) {
                cdr->answer = ast_tvnow();
                /* tv_usec is suseconds_t, which could be int or long */
                CDR_DEBUG(mod_cfg, "%p - Set answered time to %ld.%06ld\n", cdr,
-                       cdr->answer.tv_sec,
+                       (long)cdr->answer.tv_sec,
                        (long)cdr->answer.tv_usec);
        }
 }
@@ -1347,6 +1345,24 @@ static int base_process_party_a(struct cdr_object *cdr, struct ast_channel_snaps
        RAII_VAR(struct module_config *, mod_cfg, ao2_global_obj_ref(module_configs), ao2_cleanup);
 
        ast_assert(strcasecmp(snapshot->name, cdr->party_a.snapshot->name) == 0);
+
+       /* Ignore any snapshots from a dead or dying channel */
+       if (ast_test_flag(&snapshot->softhangup_flags, AST_SOFTHANGUP_HANGUP_EXEC)
+                       && ast_test_flag(&mod_cfg->general->settings, CDR_END_BEFORE_H_EXTEN)) {
+               cdr_object_check_party_a_hangup(cdr);
+               return 0;
+       }
+
+       /*
+        * Only record the context and extension if we aren't in a subroutine, or if
+        * we are executing hangup logic.
+        */
+       if (!ast_test_flag(&snapshot->flags, AST_FLAG_SUBROUTINE_EXEC)
+               || ast_test_flag(&snapshot->softhangup_flags, AST_SOFTHANGUP_HANGUP_EXEC)) {
+               ast_string_field_set(cdr, context, snapshot->context);
+               ast_string_field_set(cdr, exten, snapshot->exten);
+       }
+
        cdr_object_swap_snapshot(&cdr->party_a, snapshot);
 
        /* When Party A is originated to an application and the application exits, the stack
@@ -1359,6 +1375,15 @@ static int base_process_party_a(struct cdr_object *cdr, struct ast_channel_snaps
                        && !ast_test_flag(&cdr->flags, AST_CDR_LOCK_APP)) {
                ast_string_field_set(cdr, appl, snapshot->appl);
                ast_string_field_set(cdr, data, snapshot->data);
+
+               /* Dial (app_dial) is a special case. Because pre-dial handlers, which
+                * execute before the dial begins, will alter the application/data to
+                * something people typically don't want to see, if we see a channel enter
+                * into Dial here, we set the appl/data accordingly and lock it.
+                */
+               if (!strcmp(snapshot->appl, "Dial")) {
+                       ast_set_flag(&cdr->flags, AST_CDR_LOCK_APP);
+               }
        }
 
        ast_string_field_set(cdr, linkedid, snapshot->linkedid);
@@ -1428,15 +1453,21 @@ static int single_state_process_dial_begin(struct cdr_object *cdr, struct ast_ch
        RAII_VAR(struct module_config *, mod_cfg, ao2_global_obj_ref(module_configs), ao2_cleanup);
 
        if (caller && !strcasecmp(cdr->party_a.snapshot->name, caller->name)) {
-               cdr_object_swap_snapshot(&cdr->party_a, caller);
+               base_process_party_a(cdr, caller);
                CDR_DEBUG(mod_cfg, "%p - Updated Party A %s snapshot\n", cdr,
                                cdr->party_a.snapshot->name);
                cdr_object_swap_snapshot(&cdr->party_b, peer);
                CDR_DEBUG(mod_cfg, "%p - Updated Party B %s snapshot\n", cdr,
                                cdr->party_b.snapshot->name);
+
+               /* If we have two parties, lock the application that caused the
+                * two parties to be associated. This prevents mid-call event
+                * macros/gosubs from perturbing the CDR application/data
+                */
+               ast_set_flag(&cdr->flags, AST_CDR_LOCK_APP);
        } else if (!strcasecmp(cdr->party_a.snapshot->name, peer->name)) {
                /* We're the entity being dialed, i.e., outbound origination */
-               cdr_object_swap_snapshot(&cdr->party_a, peer);
+               base_process_party_a(cdr, peer);
                CDR_DEBUG(mod_cfg, "%p - Updated Party A %s snapshot\n", cdr,
                                cdr->party_a.snapshot->name);
        }
@@ -1619,8 +1650,6 @@ static enum ast_cdr_disposition dial_status_to_disposition(const char *dial_stat
 
 static int dial_state_process_dial_end(struct cdr_object *cdr, struct ast_channel_snapshot *caller, struct ast_channel_snapshot *peer, const char *dial_status)
 {
-       RAII_VAR(struct module_config *, mod_cfg,
-                       ao2_global_obj_ref(module_configs), ao2_cleanup);
        struct ast_channel_snapshot *party_a;
 
        if (caller) {
@@ -1822,9 +1851,8 @@ static int finalized_state_process_party_a(struct cdr_object *cdr, struct ast_ch
        RAII_VAR(struct module_config *, mod_cfg,
                ao2_global_obj_ref(module_configs), ao2_cleanup);
 
-       /* If we ignore hangup logic, indicate that we don't need a new CDR */
-       if (ast_test_flag(&mod_cfg->general->settings, CDR_END_BEFORE_H_EXTEN)
-               && ast_test_flag(&snapshot->softhangup_flags, AST_SOFTHANGUP_HANGUP_EXEC)) {
+       if (ast_test_flag(&snapshot->softhangup_flags, AST_SOFTHANGUP_HANGUP_EXEC)
+                       && ast_test_flag(&mod_cfg->general->settings, CDR_END_BEFORE_H_EXTEN)) {
                return 0;
        }
 
@@ -1912,6 +1940,7 @@ static void handle_dial_message(void *data, struct stasis_subscription *sub, str
 
        if (!cdr) {
                ast_log(AST_LOG_WARNING, "No CDR for channel %s\n", caller ? caller->name : peer->name);
+               ast_assert(0);
                return;
        }
 
@@ -1922,7 +1951,7 @@ static void handle_dial_message(void *data, struct stasis_subscription *sub, str
                                continue;
                        }
                        CDR_DEBUG(mod_cfg, "%p - Processing Dial Begin message for channel %s, peer %s\n",
-                                       cdr,
+                                       it_cdr,
                                        caller ? caller->name : "(none)",
                                        peer ? peer->name : "(none)");
                        res &= it_cdr->fn_table->process_dial_begin(it_cdr,
@@ -1933,7 +1962,7 @@ static void handle_dial_message(void *data, struct stasis_subscription *sub, str
                                continue;
                        }
                        CDR_DEBUG(mod_cfg, "%p - Processing Dial End message for channel %s, peer %s\n",
-                                       cdr,
+                                       it_cdr,
                                        caller ? caller->name : "(none)",
                                        peer ? peer->name : "(none)");
                        it_cdr->fn_table->process_dial_end(it_cdr,
@@ -1949,6 +1978,7 @@ static void handle_dial_message(void *data, struct stasis_subscription *sub, str
 
                new_cdr = cdr_object_create_and_append(cdr);
                if (!new_cdr) {
+                       ao2_unlock(cdr);
                        return;
                }
                new_cdr->fn_table->process_dial_begin(new_cdr,
@@ -1999,11 +2029,10 @@ static int check_new_cdr_needed(struct ast_channel_snapshot *old_snapshot,
        RAII_VAR(struct module_config *, mod_cfg,
                        ao2_global_obj_ref(module_configs), ao2_cleanup);
 
-       if (!new_snapshot) {
-               return 0;
-       }
-
-       if (ast_test_flag(&new_snapshot->flags, AST_FLAG_DEAD)) {
+       /* If we're dead, we don't need a new CDR */
+       if (!new_snapshot
+               || (ast_test_flag(&new_snapshot->softhangup_flags, AST_SOFTHANGUP_HANGUP_EXEC)
+                       && ast_test_flag(&mod_cfg->general->settings, CDR_END_BEFORE_H_EXTEN))) {
                return 0;
        }
 
@@ -2063,6 +2092,7 @@ static void handle_channel_cache_message(void *data, struct stasis_subscription
        }
        if (!cdr) {
                ast_log(AST_LOG_WARNING, "No CDR for channel %s\n", name);
+               ast_assert(0);
        } else {
                ao2_lock(cdr);
                if (new_snapshot) {
@@ -2188,6 +2218,7 @@ static void handle_bridge_leave_message(void *data, struct stasis_subscription *
 
        if (!cdr) {
                ast_log(AST_LOG_WARNING, "No CDR for channel %s\n", channel->name);
+               ast_assert(0);
                return;
        }
 
@@ -2204,11 +2235,10 @@ static void handle_bridge_leave_message(void *data, struct stasis_subscription *
                        left_bridge = 1;
                }
        }
+       ao2_unlock(cdr);
        if (!left_bridge) {
-               ao2_unlock(cdr);
                return;
        }
-       ao2_unlock(cdr);
 
        if (strcmp(bridge->subclass, "parking")) {
                /* Party B */
@@ -2302,6 +2332,7 @@ static int bridge_candidate_process(struct cdr_object *cdr, struct cdr_object *b
                         */
                        memset(&cand_cdr->end, 0, sizeof(cand_cdr->end));
                }
+               return 0;
        }
        return 0;
 }
@@ -2412,10 +2443,10 @@ static void handle_standard_bridge_enter_message(struct cdr_object *cdr,
                                if (!handled_cdr) {
                                        handled_cdr = it_cdr;
                                }
-                       break;
+                               break;
                        case BRIDGE_ENTER_NEED_CDR:
                                /* Pass */
-                       break;
+                               break;
                        case BRIDGE_ENTER_NO_PARTY_B:
                                /* We didn't win on any - end this CDR. If someone else comes in later
                                 * that is Party B to this CDR, it can re-activate this CDR.
@@ -2424,7 +2455,7 @@ static void handle_standard_bridge_enter_message(struct cdr_object *cdr,
                                        handled_cdr = it_cdr;
                                }
                                cdr_object_finalize(cdr);
-                       break;
+                               break;
                        }
                }
        }
@@ -2486,6 +2517,7 @@ static void handle_bridge_enter_message(void *data, struct stasis_subscription *
 
        if (!cdr) {
                ast_log(AST_LOG_WARNING, "No CDR for channel %s\n", channel->name);
+               ast_assert(0);
                return;
        }
 
@@ -2536,6 +2568,7 @@ static void handle_parked_call_message(void *data, struct stasis_subscription *s
        cdr = ao2_find(active_cdrs_by_channel, channel->uniqueid, OBJ_KEY);
        if (!cdr) {
                ast_log(AST_LOG_WARNING, "No CDR for channel %s\n", channel->name);
+               ast_assert(0);
                return;
        }
 
@@ -2562,6 +2595,19 @@ static void handle_parked_call_message(void *data, struct stasis_subscription *s
 
 }
 
+/*!
+ * \brief Handler for a synchronization message
+ * \param data Passed on
+ * \param sub The stasis subscription for this message callback
+ * \param topic The topic this message was published for
+ * \param message A blank ao2 object
+ * */
+static void handle_cdr_sync_message(void *data, struct stasis_subscription *sub,
+               struct stasis_message *message)
+{
+       return;
+}
+
 struct ast_cdr_config *ast_cdr_get_config(void)
 {
        RAII_VAR(struct module_config *, mod_cfg, ao2_global_obj_ref(module_configs), ao2_cleanup);
@@ -2972,7 +3018,7 @@ static int cdr_object_format_property(struct cdr_object *cdr_obj, const char *na
        } else if (!strcasecmp(name, "billsec")) {
                snprintf(value, length, "%ld", cdr_object_get_billsec(cdr_obj));
        } else if (!strcasecmp(name, "disposition")) {
-               snprintf(value, length, "%d", cdr_obj->disposition);
+               snprintf(value, length, "%u", cdr_obj->disposition);
        } else if (!strcasecmp(name, "amaflags")) {
                snprintf(value, length, "%d", party_a->amaflags);
        } else if (!strcasecmp(name, "accountcode")) {
@@ -2990,7 +3036,7 @@ static int cdr_object_format_property(struct cdr_object *cdr_obj, const char *na
        } else if (!strcasecmp(name, "userfield")) {
                ast_copy_string(value, cdr_obj->party_a.userfield, length);
        } else if (!strcasecmp(name, "sequence")) {
-               snprintf(value, length, "%d", cdr_obj->sequence);
+               snprintf(value, length, "%u", cdr_obj->sequence);
        } else {
                return 1;
        }
@@ -3318,6 +3364,7 @@ int ast_cdr_fork(const char *channel_name, struct ast_flags *options)
 
        {
                SCOPED_AO2LOCK(lock, cdr);
+
                cdr_obj = cdr->last;
                if (cdr_obj->fn_table == &finalized_state_fn_table) {
                        /* If the last CDR in the chain is finalized, don't allow a fork -
@@ -3336,7 +3383,16 @@ int ast_cdr_fork(const char *channel_name, struct ast_flags *options)
                }
                new_cdr->fn_table = cdr_obj->fn_table;
                ast_string_field_set(new_cdr, bridge, cdr->bridge);
+               ast_string_field_set(new_cdr, appl, cdr->appl);
+               ast_string_field_set(new_cdr, data, cdr->data);
+               ast_string_field_set(new_cdr, context, cdr->context);
+               ast_string_field_set(new_cdr, exten, cdr->exten);
                new_cdr->flags = cdr->flags;
+               /* Explicitly clear the AST_CDR_LOCK_APP flag - we want
+                * the application to be changed on the new CDR if the
+                * dialplan demands it
+                */
+               ast_clear_flag(&new_cdr->flags, AST_CDR_LOCK_APP);
 
                /* If there's a Party B, copy it over as well */
                if (cdr_obj->party_b.snapshot) {
@@ -3460,7 +3516,7 @@ static int submit_scheduled_batch(const void *data)
        /* manually reschedule from this point in time */
 
        ast_mutex_lock(&cdr_sched_lock);
-       cdr_sched = ast_sched_add(sched, mod_cfg->general->batch_settings.size * 1000, submit_scheduled_batch, NULL);
+       cdr_sched = ast_sched_add(sched, mod_cfg->general->batch_settings.time * 1000, submit_scheduled_batch, NULL);
        ast_mutex_unlock(&cdr_sched_lock);
        /* returning zero so the scheduler does not automatically reschedule */
        return 0;
@@ -3819,8 +3875,8 @@ static char *handle_cli_status(struct ast_cli_entry *e, int cmd, struct ast_cli_
                        ast_cli(a->fd, "  Safe shutdown:              %s\n", ast_test_flag(&mod_cfg->general->batch_settings.settings, BATCH_MODE_SAFE_SHUTDOWN) ? "Enabled" : "Disabled");
                        ast_cli(a->fd, "  Threading model:            %s\n", ast_test_flag(&mod_cfg->general->batch_settings.settings, BATCH_MODE_SCHEDULER_ONLY) ? "Scheduler only" : "Scheduler plus separate threads");
                        ast_cli(a->fd, "  Current batch size:         %d record%s\n", cnt, ESS(cnt));
-                       ast_cli(a->fd, "  Maximum batch size:         %d record%s\n", mod_cfg->general->batch_settings.size, ESS(mod_cfg->general->batch_settings.size));
-                       ast_cli(a->fd, "  Maximum batch time:         %d second%s\n", mod_cfg->general->batch_settings.time, ESS(mod_cfg->general->batch_settings.time));
+                       ast_cli(a->fd, "  Maximum batch size:         %u record%s\n", mod_cfg->general->batch_settings.size, ESS(mod_cfg->general->batch_settings.size));
+                       ast_cli(a->fd, "  Maximum batch time:         %u second%s\n", mod_cfg->general->batch_settings.time, ESS(mod_cfg->general->batch_settings.time));
                        ast_cli(a->fd, "  Next batch processing time: %ld second%s\n\n", nextbatchtime, ESS(nextbatchtime));
                }
                ast_cli(a->fd, "* Registered Backends\n");
@@ -3915,17 +3971,21 @@ static void finalize_batch_mode(void)
        ast_cdr_engine_term();
 }
 
+struct stasis_message_router *ast_cdr_message_router(void)
+{
+       if (!stasis_router) {
+               return NULL;
+       }
+
+       ao2_bump(stasis_router);
+       return stasis_router;
+}
+
 /*!
- * \brief Destroy the active Stasis subscriptions/router/topics
+ * \brief Destroy the active Stasis subscriptions
  */
 static void destroy_subscriptions(void)
 {
-       stasis_message_router_unsubscribe_and_join(stasis_router);
-       stasis_router = NULL;
-
-       ao2_cleanup(cdr_topic);
-       cdr_topic = NULL;
-
        channel_subscription = stasis_forward_cancel(channel_subscription);
        bridge_subscription = stasis_forward_cancel(bridge_subscription);
        parking_subscription = stasis_forward_cancel(parking_subscription);
@@ -3936,16 +3996,14 @@ static void destroy_subscriptions(void)
  */
 static int create_subscriptions(void)
 {
-       /* Use the CDR topic to determine if we've already created this */
-       if (cdr_topic) {
-               return 0;
-       }
-
-       cdr_topic = stasis_topic_create("cdr_engine");
        if (!cdr_topic) {
                return -1;
        }
 
+       if (channel_subscription || bridge_subscription || parking_subscription) {
+               return 0;
+       }
+
        channel_subscription = stasis_forward_all(ast_channel_topic_all_cached(), cdr_topic);
        if (!channel_subscription) {
                return -1;
@@ -3959,16 +4017,6 @@ static int create_subscriptions(void)
                return -1;
        }
 
-       stasis_router = stasis_message_router_create(cdr_topic);
-       if (!stasis_router) {
-               return -1;
-       }
-       stasis_message_router_add_cache_update(stasis_router, ast_channel_snapshot_type(), handle_channel_cache_message, NULL);
-       stasis_message_router_add(stasis_router, ast_channel_dial_type(), handle_dial_message, NULL);
-       stasis_message_router_add(stasis_router, ast_channel_entered_bridge_type(), handle_bridge_enter_message, NULL);
-       stasis_message_router_add(stasis_router, ast_channel_left_bridge_type(), handle_bridge_leave_message, NULL);
-       stasis_message_router_add(stasis_router, ast_parked_call_type(), handle_parked_call_message, NULL);
-
        return 0;
 }
 
@@ -4019,6 +4067,14 @@ static void cdr_engine_cleanup(void)
 
 static void cdr_engine_shutdown(void)
 {
+       stasis_message_router_unsubscribe_and_join(stasis_router);
+       stasis_router = NULL;
+
+       ao2_cleanup(cdr_topic);
+       cdr_topic = NULL;
+
+       STASIS_MESSAGE_TYPE_CLEANUP(cdr_sync_message_type);
+
        ao2_callback(active_cdrs_by_channel, OBJ_NODATA, cdr_object_dispatch_all_cb,
                NULL);
        finalize_batch_mode();
@@ -4052,7 +4108,7 @@ static void cdr_enable_batch_mode(struct ast_cdr_config *config)
        /* Kill the currently scheduled item */
        AST_SCHED_DEL(sched, cdr_sched);
        cdr_sched = ast_sched_add(sched, config->batch_settings.time * 1000, submit_scheduled_batch, NULL);
-       ast_log(LOG_NOTICE, "CDR batch mode logging enabled, first of either size %d or time %d seconds.\n",
+       ast_log(LOG_NOTICE, "CDR batch mode logging enabled, first of either size %u or time %u seconds.\n",
                        config->batch_settings.size, config->batch_settings.time);
 }
 
@@ -4113,6 +4169,27 @@ int ast_cdr_engine_init(void)
                return -1;
        }
 
+       cdr_topic = stasis_topic_create("cdr_engine");
+       if (!cdr_topic) {
+               return -1;
+       }
+
+       stasis_router = stasis_message_router_create(cdr_topic);
+       if (!stasis_router) {
+               return -1;
+       }
+
+       if (STASIS_MESSAGE_TYPE_INIT(cdr_sync_message_type)) {
+               return -1;
+       }
+
+       stasis_message_router_add_cache_update(stasis_router, ast_channel_snapshot_type(), handle_channel_cache_message, NULL);
+       stasis_message_router_add(stasis_router, ast_channel_dial_type(), handle_dial_message, NULL);
+       stasis_message_router_add(stasis_router, ast_channel_entered_bridge_type(), handle_bridge_enter_message, NULL);
+       stasis_message_router_add(stasis_router, ast_channel_left_bridge_type(), handle_bridge_leave_message, NULL);
+       stasis_message_router_add(stasis_router, ast_parked_call_type(), handle_parked_call_message, NULL);
+       stasis_message_router_add(stasis_router, cdr_sync_message_type(), handle_cdr_sync_message, NULL);
+
        active_cdrs_by_channel = ao2_container_alloc(NUM_CDR_BUCKETS,
                cdr_object_channel_hash_fn, cdr_object_channel_cmp_fn);
        if (!active_cdrs_by_channel) {
@@ -4136,6 +4213,8 @@ int ast_cdr_engine_init(void)
 void ast_cdr_engine_term(void)
 {
        RAII_VAR(struct module_config *, mod_cfg, ao2_global_obj_ref(module_configs), ao2_cleanup);
+       RAII_VAR(void *, payload, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
 
        /* Since this is called explicitly during process shutdown, we might not have ever
         * been initialized. If so, the config object will be NULL.
@@ -4143,10 +4222,25 @@ void ast_cdr_engine_term(void)
        if (!mod_cfg) {
                return;
        }
-       if (!ast_test_flag(&mod_cfg->general->settings, CDR_BATCHMODE)) {
-               return;
+
+       if (cdr_sync_message_type()) {
+               /* Make sure we have the needed items */
+               payload = ao2_alloc(sizeof(*payload), NULL);
+               if (!stasis_router || !payload) {
+                       return;
+               }
+
+               ast_debug(1, "CDR Engine termination request received; waiting on messages...\n");
+
+               message = stasis_message_create(cdr_sync_message_type(), payload);
+               if (message) {
+                       stasis_message_router_publish_sync(stasis_router, message);
+               }
+       }
+
+       if (ast_test_flag(&mod_cfg->general->settings, CDR_BATCHMODE)) {
+               cdr_submit_batch(ast_test_flag(&mod_cfg->general->batch_settings.settings, BATCH_MODE_SAFE_SHUTDOWN));
        }
-       cdr_submit_batch(ast_test_flag(&mod_cfg->general->batch_settings.settings, BATCH_MODE_SAFE_SHUTDOWN));
 }
 
 int ast_cdr_engine_reload(void)