Merge "res_pjsip: New endpoint option "refer_blind_progress""
[asterisk/asterisk.git] / res / res_pjsip_pubsub.c
index 371cceb..a1f3f24 100644 (file)
@@ -42,6 +42,7 @@
 #include "asterisk/res_pjsip.h"
 #include "asterisk/callerid.h"
 #include "asterisk/manager.h"
+#include "asterisk/cli.h"
 #include "asterisk/test.h"
 #include "res_pjsip/include/res_pjsip_private.h"
 #include "asterisk/res_pjsip_presence_xml.h"
                                <configOption name="expires">
                                        <synopsis>The time at which the subscription expires</synopsis>
                                </configOption>
+                               <configOption name="contact_uri">
+                                       <synopsis>The Contact URI of the dialog for the subscription</synopsis>
+                               </configOption>
                        </configObject>
                        <configObject name="resource_list">
                                <synopsis>Resource list configuration parameters.</synopsis>
@@ -355,7 +359,7 @@ struct ast_sip_publication {
 struct subscription_persistence {
        /*! Sorcery object details */
        SORCERY_OBJECT(details);
-       /*! The name of the endpoint involved in the subscrption */
+       /*! The name of the endpoint involved in the subscription */
        char *endpoint;
        /*! SIP message that creates the subscription */
        char packet[PJSIP_MAX_PKT_LEN];
@@ -375,6 +379,29 @@ struct subscription_persistence {
        char *tag;
        /*! When this subscription expires */
        struct timeval expires;
+       /*! Contact URI */
+       char contact_uri[PJSIP_MAX_URL_SIZE];
+};
+
+/*!
+ * \brief The state of the subscription tree
+ */
+enum sip_subscription_tree_state {
+       /*! Normal operation */
+       SIP_SUB_TREE_NORMAL = 0,
+       /*! A terminate has been requested by Asterisk, the client, or pjproject */
+       SIP_SUB_TREE_TERMINATE_PENDING,
+       /*! The terminate is in progress */
+       SIP_SUB_TREE_TERMINATE_IN_PROGRESS,
+       /*! The terminate process has finished and the subscription tree is no longer valid */
+       SIP_SUB_TREE_TERMINATED,
+};
+
+static char *sub_tree_state_description[] = {
+       "Normal",
+       "TerminatePending",
+       "TerminateInProgress",
+       "Terminated"
 };
 
 /*!
@@ -411,8 +438,13 @@ struct sip_subscription_tree {
        int is_list;
        /*! Next item in the list */
        AST_LIST_ENTRY(sip_subscription_tree) next;
-       /*! Indicates that a NOTIFY is currently being sent on the SIP subscription */
-       int last_notify;
+       /*! Subscription tree state */
+       enum sip_subscription_tree_state state;
+       /*! On asterisk restart, this is the task data used
+        * to restart the expiration timer if pjproject isn't
+        * capable of restarting the timer.
+        */
+       struct ast_sip_sched_task *expiration_task;
 };
 
 /*!
@@ -467,6 +499,17 @@ static const char *sip_subscription_roles_map[] = {
        [AST_SIP_NOTIFIER] = "Notifier"
 };
 
+enum sip_persistence_update_type {
+       /*! Called from send request */
+       SUBSCRIPTION_PERSISTENCE_SEND_REQUEST = 0,
+       /*! Subscription created from initial client request */
+       SUBSCRIPTION_PERSISTENCE_CREATED,
+       /*! Subscription recreated by asterisk on startup */
+       SUBSCRIPTION_PERSISTENCE_RECREATED,
+       /*! Subscription created from client refresh */
+       SUBSCRIPTION_PERSISTENCE_REFRESHED,
+};
+
 AST_RWLIST_HEAD_STATIC(subscriptions, sip_subscription_tree);
 
 AST_RWLIST_HEAD_STATIC(body_generators, ast_sip_pubsub_body_generator);
@@ -545,7 +588,7 @@ static struct subscription_persistence *subscription_persistence_create(struct s
 
 /*! \brief Function which updates persistence information of a subscription in sorcery */
 static void subscription_persistence_update(struct sip_subscription_tree *sub_tree,
-       pjsip_rx_data *rdata)
+       pjsip_rx_data *rdata, enum sip_persistence_update_type type)
 {
        pjsip_dialog *dlg;
 
@@ -553,28 +596,38 @@ static void subscription_persistence_update(struct sip_subscription_tree *sub_tr
                return;
        }
 
+       ast_debug(3, "Updating persistence for '%s->%s'\n", sub_tree->persistence->endpoint,
+               sub_tree->root->resource);
+
        dlg = sub_tree->dlg;
        sub_tree->persistence->cseq = dlg->local.cseq;
 
        if (rdata) {
                int expires;
                pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
+               pjsip_contact_hdr *contact_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_CONTACT, NULL);
 
                expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
                sub_tree->persistence->expires = ast_tvadd(ast_tvnow(), ast_samp2tv(expires, 1));
 
+               pjsip_uri_print(PJSIP_URI_IN_CONTACT_HDR, contact_hdr->uri,
+                       sub_tree->persistence->contact_uri, sizeof(sub_tree->persistence->contact_uri));
+
                /* When receiving a packet on an streaming transport, it's possible to receive more than one SIP
                 * message at a time into the rdata->pkt_info.packet buffer. However, the rdata->msg_info.msg_buf
                 * will always point to the proper SIP message that is to be processed. When updating subscription
                 * persistence that is pulled from persistent storage, though, the rdata->pkt_info.packet will
                 * only ever have a single SIP message on it, and so we base persistence on that.
                 */
-               if (rdata->msg_info.msg_buf) {
-                       ast_copy_string(sub_tree->persistence->packet, rdata->msg_info.msg_buf,
-                                       MIN(sizeof(sub_tree->persistence->packet), rdata->msg_info.len));
-               } else {
-                       ast_copy_string(sub_tree->persistence->packet, rdata->pkt_info.packet,
-                                       sizeof(sub_tree->persistence->packet));
+               if (type == SUBSCRIPTION_PERSISTENCE_CREATED
+                       || type == SUBSCRIPTION_PERSISTENCE_RECREATED) {
+                       if (rdata->msg_info.msg_buf) {
+                               ast_copy_string(sub_tree->persistence->packet, rdata->msg_info.msg_buf,
+                                               MIN(sizeof(sub_tree->persistence->packet), rdata->msg_info.len));
+                       } else {
+                               ast_copy_string(sub_tree->persistence->packet, rdata->pkt_info.packet,
+                                               sizeof(sub_tree->persistence->packet));
+                       }
                }
                ast_copy_string(sub_tree->persistence->src_name, rdata->pkt_info.src_name,
                                sizeof(sub_tree->persistence->src_name));
@@ -598,6 +651,7 @@ static void subscription_persistence_remove(struct sip_subscription_tree *sub_tr
 
        ast_sorcery_delete(ast_sip_get_sorcery(), sub_tree->persistence);
        ao2_ref(sub_tree->persistence, -1);
+       sub_tree->persistence = NULL;
 }
 
 
@@ -874,19 +928,20 @@ static void build_node_children(struct ast_sip_endpoint *endpoint, const struct
                        if (PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
                                current = tree_node_alloc(resource, visited, 0);
                                if (!current) {
-                                       ast_debug(1, "Subscription to leaf resource %s was successful, but encountered"
-                                                       "allocation error afterwards\n", resource);
+                                       ast_debug(1,
+                                               "Subscription to leaf resource %s was successful, but encountered allocation error afterwards\n",
+                                               resource);
                                        continue;
                                }
-                               ast_debug(1, "Subscription to leaf resource %s resulted in success. Adding to parent %s\n",
+                               ast_debug(2, "Subscription to leaf resource %s resulted in success. Adding to parent %s\n",
                                                resource, parent->resource);
                                AST_VECTOR_APPEND(&parent->children, current);
                        } else {
-                               ast_debug(1, "Subscription to leaf resource %s resulted in error response %d\n",
+                               ast_debug(2, "Subscription to leaf resource %s resulted in error response %d\n",
                                                resource, resp);
                        }
                } else {
-                       ast_debug(1, "Resource %s (child of %s) is a list\n", resource, parent->resource);
+                       ast_debug(2, "Resource %s (child of %s) is a list\n", resource, parent->resource);
                        current = tree_node_alloc(resource, visited, child_list->full_state);
                        if (!current) {
                                ast_debug(1, "Cannot build children of resource %s due to allocation failure\n", resource);
@@ -897,7 +952,7 @@ static void build_node_children(struct ast_sip_endpoint *endpoint, const struct
                                ast_debug(1, "List %s had no successful children.\n", resource);
                                AST_VECTOR_APPEND(&parent->children, current);
                        } else {
-                               ast_debug(1, "List %s had successful children. Adding to parent %s\n",
+                               ast_debug(2, "List %s had successful children. Adding to parent %s\n",
                                                resource, parent->resource);
                                tree_node_destroy(current);
                        }
@@ -969,7 +1024,8 @@ static int build_resource_tree(struct ast_sip_endpoint *endpoint, const struct a
        struct resources visited;
 
        if (!has_eventlist_support || !(list = retrieve_resource_list(resource, handler->event_name))) {
-               ast_debug(1, "Subscription to resource %s is not to a list\n", resource);
+               ast_debug(2, "Subscription '%s->%s' is not to a list\n",
+                       ast_sorcery_object_get_id(endpoint), resource);
                tree->root = tree_node_alloc(resource, NULL, 0);
                if (!tree->root) {
                        return 500;
@@ -977,7 +1033,8 @@ static int build_resource_tree(struct ast_sip_endpoint *endpoint, const struct a
                return handler->notifier->new_subscribe(endpoint, resource);
        }
 
-       ast_debug(1, "Subscription to resource %s is a list\n", resource);
+       ast_debug(2, "Subscription '%s->%s' is a list\n",
+               ast_sorcery_object_get_id(endpoint), resource);
        if (AST_VECTOR_INIT(&visited, AST_VECTOR_SIZE(&list->items))) {
                return 500;
        }
@@ -1000,74 +1057,36 @@ static int build_resource_tree(struct ast_sip_endpoint *endpoint, const struct a
        }
 }
 
-static int datastore_hash(const void *obj, int flags)
-{
-       const struct ast_datastore *datastore = obj;
-       const char *uid = flags & OBJ_KEY ? obj : datastore->uid;
-
-       ast_assert(uid != NULL);
-
-       return ast_str_hash(uid);
-}
-
-static int datastore_cmp(void *obj, void *arg, int flags)
-{
-       const struct ast_datastore *datastore1 = obj;
-       const struct ast_datastore *datastore2 = arg;
-       const char *uid2 = flags & OBJ_KEY ? arg : datastore2->uid;
-
-       ast_assert(datastore1->uid != NULL);
-       ast_assert(uid2 != NULL);
-
-       return strcmp(datastore1->uid, uid2) ? 0 : CMP_MATCH | CMP_STOP;
-}
-
-static int subscription_remove_serializer(void *obj)
-{
-       struct sip_subscription_tree *sub_tree = obj;
-
-       /* This is why we keep the dialog on the subscription. When the subscription
-        * is destroyed, there is no guarantee that the underlying dialog is ready
-        * to be destroyed. Furthermore, there's no guarantee in the opposite direction
-        * either. The dialog could be destroyed before our subscription is. We fix
-        * this problem by keeping a reference to the dialog until it is time to
-        * destroy the subscription. We need to have the dialog available when the
-        * subscription is destroyed so that we can guarantee that our attempt to
-        * remove the serializer will be successful.
-        */
-       ast_sip_dialog_set_serializer(sub_tree->dlg, NULL);
-       ast_sip_dialog_set_endpoint(sub_tree->dlg, NULL);
-       pjsip_dlg_dec_session(sub_tree->dlg, &pubsub_module);
-
-       return 0;
-}
-
 static void add_subscription(struct sip_subscription_tree *obj)
 {
-       SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
+       AST_RWLIST_WRLOCK(&subscriptions);
        AST_RWLIST_INSERT_TAIL(&subscriptions, obj, next);
+       AST_RWLIST_UNLOCK(&subscriptions);
 }
 
 static void remove_subscription(struct sip_subscription_tree *obj)
 {
        struct sip_subscription_tree *i;
-       SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
+
+       AST_RWLIST_WRLOCK(&subscriptions);
        AST_RWLIST_TRAVERSE_SAFE_BEGIN(&subscriptions, i, next) {
                if (i == obj) {
                        AST_RWLIST_REMOVE_CURRENT(next);
                        if (i->root) {
-                               ast_debug(1, "Removing subscription to resource %s from list of subscriptions\n",
-                                               ast_sip_subscription_get_resource_name(i->root));
+                               ast_debug(2, "Removing subscription '%s->%s' from list of subscriptions\n",
+                                       ast_sorcery_object_get_id(i->endpoint), ast_sip_subscription_get_resource_name(i->root));
                        }
                        break;
                }
        }
        AST_RWLIST_TRAVERSE_SAFE_END;
+       AST_RWLIST_UNLOCK(&subscriptions);
 }
 
 static void destroy_subscription(struct ast_sip_subscription *sub)
 {
-       ast_debug(3, "Destroying SIP subscription to resource %s\n", sub->resource);
+       ast_debug(3, "Destroying SIP subscription from '%s->%s'\n",
+               ast_sorcery_object_get_id(sub->tree->endpoint), sub->resource);
        ast_free(sub->body_text);
 
        AST_VECTOR_FREE(&sub->children);
@@ -1079,6 +1098,10 @@ static void destroy_subscriptions(struct ast_sip_subscription *root)
 {
        int i;
 
+       if (!root) {
+               return;
+       }
+
        for (i = 0; i < AST_VECTOR_SIZE(&root->children); ++i) {
                struct ast_sip_subscription *child;
 
@@ -1101,7 +1124,7 @@ static struct ast_sip_subscription *allocate_subscription(const struct ast_sip_s
        }
        strcpy(sub->resource, resource); /* Safe */
 
-       sub->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp);
+       sub->datastores = ast_datastores_alloc();
        if (!sub->datastores) {
                destroy_subscription(sub);
                return NULL;
@@ -1192,22 +1215,41 @@ static void shutdown_subscriptions(struct ast_sip_subscription *sub)
                sub->handler->subscription_shutdown(sub);
        }
 }
+static int subscription_unreference_dialog(void *obj)
+{
+       struct sip_subscription_tree *sub_tree = obj;
+
+       /* This is why we keep the dialog on the subscription. When the subscription
+        * is destroyed, there is no guarantee that the underlying dialog is ready
+        * to be destroyed. Furthermore, there's no guarantee in the opposite direction
+        * either. The dialog could be destroyed before our subscription is. We fix
+        * this problem by keeping a reference to the dialog until it is time to
+        * destroy the subscription. We need to have the dialog available when the
+        * subscription is destroyed so that we can guarantee that our attempt to
+        * remove the serializer will be successful.
+        */
+       pjsip_dlg_dec_session(sub_tree->dlg, &pubsub_module);
+       sub_tree->dlg = NULL;
+
+       return 0;
+}
 
 static void subscription_tree_destructor(void *obj)
 {
        struct sip_subscription_tree *sub_tree = obj;
 
-       ast_debug(3, "Destroying subscription tree %p\n", sub_tree);
+       ast_debug(3, "Destroying subscription tree %p '%s->%s'\n",
+               sub_tree,
+               sub_tree->endpoint ? ast_sorcery_object_get_id(sub_tree->endpoint) : "Unknown",
+               sub_tree->root ? sub_tree->root->resource : "Unknown");
 
-       remove_subscription(sub_tree);
-
-       subscription_persistence_remove(sub_tree);
        ao2_cleanup(sub_tree->endpoint);
 
+       destroy_subscriptions(sub_tree->root);
+
        if (sub_tree->dlg) {
-               ast_sip_push_task_synchronous(NULL, subscription_remove_serializer, sub_tree);
+               ast_sip_push_task_synchronous(sub_tree->serializer, subscription_unreference_dialog, sub_tree);
        }
-       destroy_subscriptions(sub_tree->root);
 
        ast_taskprocessor_unreference(sub_tree->serializer);
        ast_module_unref(ast_module_info->self);
@@ -1215,23 +1257,21 @@ static void subscription_tree_destructor(void *obj)
 
 void ast_sip_subscription_destroy(struct ast_sip_subscription *sub)
 {
-       ast_debug(3, "Removing subscription %p reference to subscription tree %p\n", sub, sub->tree);
+       ast_debug(3, "Removing subscription %p '%s->%s' reference to subscription tree %p\n",
+               sub, ast_sorcery_object_get_id(sub->tree->endpoint), sub->resource, sub->tree);
        ao2_cleanup(sub->tree);
 }
 
 static void subscription_setup_dialog(struct sip_subscription_tree *sub_tree, pjsip_dialog *dlg)
 {
-       /* We keep a reference to the dialog until our subscription is destroyed. See
-        * the subscription_destructor for more details
-        */
-       pjsip_dlg_inc_session(dlg, &pubsub_module);
        sub_tree->dlg = dlg;
        ast_sip_dialog_set_serializer(dlg, sub_tree->serializer);
        ast_sip_dialog_set_endpoint(dlg, sub_tree->endpoint);
        pjsip_evsub_set_mod_data(sub_tree->evsub, pubsub_module.id, sub_tree);
+       pjsip_dlg_inc_session(dlg, &pubsub_module);
 }
 
-static struct sip_subscription_tree *allocate_subscription_tree(struct ast_sip_endpoint *endpoint)
+static struct sip_subscription_tree *allocate_subscription_tree(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata)
 {
        struct sip_subscription_tree *sub_tree;
 
@@ -1242,7 +1282,24 @@ static struct sip_subscription_tree *allocate_subscription_tree(struct ast_sip_e
 
        ast_module_ref(ast_module_info->self);
 
-       sub_tree->serializer = ast_sip_create_serializer();
+       if (rdata) {
+               /*
+                * We must continue using the serializer that the original
+                * SUBSCRIBE came in on for the dialog.  There may be
+                * retransmissions already enqueued in the original
+                * serializer that can result in reentrancy and message
+                * sequencing problems.
+                */
+               sub_tree->serializer = ast_sip_get_distributor_serializer(rdata);
+       } else {
+               char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
+
+               /* Create name with seq number appended. */
+               ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/pubsub/%s",
+                       ast_sorcery_object_get_id(endpoint));
+
+               sub_tree->serializer = ast_sip_create_serializer(tps_name);
+       }
        if (!sub_tree->serializer) {
                ao2_ref(sub_tree, -1);
                return NULL;
@@ -1251,7 +1308,6 @@ static struct sip_subscription_tree *allocate_subscription_tree(struct ast_sip_e
        sub_tree->endpoint = ao2_bump(endpoint);
        sub_tree->notify_sched_id = -1;
 
-       add_subscription(sub_tree);
        return sub_tree;
 }
 
@@ -1284,7 +1340,7 @@ static struct sip_subscription_tree *create_subscription_tree(const struct ast_s
        pjsip_dialog *dlg;
        struct subscription_persistence *persistence;
 
-       sub_tree = allocate_subscription_tree(endpoint);
+       sub_tree = allocate_subscription_tree(endpoint, rdata);
        if (!sub_tree) {
                *dlg_status = PJ_ENOMEM;
                return NULL;
@@ -1309,12 +1365,15 @@ static struct sip_subscription_tree *create_subscription_tree(const struct ast_s
                dlg->local.tag_hval = pj_hash_calc_tolower(0, NULL, &dlg->local.info->tag);
                pjsip_ua_register_dlg(pjsip_ua_instance(), dlg);
                dlg->local.cseq = persistence->cseq;
-               dlg->remote.cseq = persistence->cseq;
        }
 
        pjsip_evsub_create_uas(dlg, &pubsub_cb, rdata, 0, &sub_tree->evsub);
        subscription_setup_dialog(sub_tree, dlg);
 
+#ifdef HAVE_PJSIP_EVSUB_GRP_LOCK
+       pjsip_evsub_add_ref(sub_tree->evsub);
+#endif
+
        ast_sip_mod_data_set(dlg->pool, dlg->mod_data, pubsub_module.id, MOD_DATA_MSG,
                        pjsip_msg_clone(dlg->pool, rdata->msg_info.msg));
 
@@ -1325,115 +1384,213 @@ static struct sip_subscription_tree *create_subscription_tree(const struct ast_s
                sub_tree->is_list = 1;
        }
 
+       add_subscription(sub_tree);
+
        return sub_tree;
 }
 
+/*! Wrapper structure for initial_notify_task */
+struct initial_notify_data {
+       struct sip_subscription_tree *sub_tree;
+       int expires;
+};
+
 static int initial_notify_task(void *obj);
 static int send_notify(struct sip_subscription_tree *sub_tree, unsigned int force_full_state);
 
-/*! \brief Callback function to perform the actual recreation of a subscription */
-static int subscription_persistence_recreate(void *obj, void *arg, int flags)
+/*! Persistent subscription recreation continuation under distributor serializer data */
+struct persistence_recreate_data {
+       struct subscription_persistence *persistence;
+       pjsip_rx_data *rdata;
+};
+
+/*!
+ * \internal
+ * \brief subscription_persistence_recreate continuation under distributor serializer.
+ * \since 13.10.0
+ *
+ * \retval 0 on success.
+ * \retval -1 on error.
+ */
+static int sub_persistence_recreate(void *obj)
 {
-       struct subscription_persistence *persistence = obj;
-       pj_pool_t *pool = arg;
-       pjsip_rx_data rdata = { { 0, }, };
-       RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
+       struct persistence_recreate_data *recreate_data = obj;
+       struct subscription_persistence *persistence = recreate_data->persistence;
+       pjsip_rx_data *rdata = recreate_data->rdata;
+       struct ast_sip_endpoint *endpoint;
        struct sip_subscription_tree *sub_tree;
        struct ast_sip_pubsub_body_generator *generator;
-       int resp;
+       struct ast_sip_subscription_handler *handler;
        char *resource;
-       size_t resource_size;
        pjsip_sip_uri *request_uri;
+       size_t resource_size;
+       int resp;
        struct resource_tree tree;
        pjsip_expires_hdr *expires_header;
-       struct ast_sip_subscription_handler *handler;
 
-       /* If this subscription has already expired remove it */
-       if (ast_tvdiff_ms(persistence->expires, ast_tvnow()) <= 0) {
-               ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
-               return 0;
-       }
+       request_uri = pjsip_uri_get_uri(rdata->msg_info.msg->line.req.uri);
+       resource_size = pj_strlen(&request_uri->user) + 1;
+       resource = ast_alloca(resource_size);
+       ast_copy_pj_str(resource, &request_uri->user, resource_size);
 
-       endpoint = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "endpoint", persistence->endpoint);
-       if (!endpoint) {
-               ast_log(LOG_WARNING, "A subscription for '%s' could not be recreated as the endpoint was not found\n",
+       /*
+        * We may want to match without any user options getting
+        * in the way.
+        */
+       AST_SIP_USER_OPTIONS_TRUNCATE_CHECK(resource);
+
+       handler = subscription_get_handler_from_rdata(rdata);
+       if (!handler || !handler->notifier) {
+               ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not get subscription handler.\n",
                        persistence->endpoint);
                ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
                return 0;
        }
 
-       pj_pool_reset(pool);
-       rdata.tp_info.pool = pool;
-
-       if (ast_sip_create_rdata(&rdata, persistence->packet, persistence->src_name, persistence->src_port,
-               persistence->transport_key, persistence->local_name, persistence->local_port)) {
-               ast_log(LOG_WARNING, "A subscription for '%s' could not be recreated as the message could not be parsed\n",
+       generator = subscription_get_generator_from_rdata(rdata, handler);
+       if (!generator) {
+               ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Body generator not available.\n",
                        persistence->endpoint);
                ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
                return 0;
        }
 
-       if (rdata.msg_info.msg->type != PJSIP_REQUEST_MSG) {
-               ast_log(LOG_NOTICE, "Endpoint %s persisted a SIP response instead of a subscribe request. Unable to reload subscription.\n",
-                               ast_sorcery_object_get_id(endpoint));
+       ast_sip_mod_data_set(rdata->tp_info.pool, rdata->endpt_info.mod_data,
+               pubsub_module.id, MOD_DATA_PERSISTENCE, persistence);
+
+       /* Getting the endpoint may take some time that can affect the expiration. */
+       endpoint = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "endpoint",
+               persistence->endpoint);
+       if (!endpoint) {
+               ast_log(LOG_WARNING, "Failed recreating '%s' subscription: The endpoint was not found\n",
+                       persistence->endpoint);
                ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
                return 0;
        }
 
-       request_uri = pjsip_uri_get_uri(rdata.msg_info.msg->line.req.uri);
-       resource_size = pj_strlen(&request_uri->user) + 1;
-       resource = ast_alloca(resource_size);
-       ast_copy_pj_str(resource, &request_uri->user, resource_size);
-
        /* Update the expiration header with the new expiration */
-       expires_header = pjsip_msg_find_hdr(rdata.msg_info.msg, PJSIP_H_EXPIRES, rdata.msg_info.msg->hdr.next);
+       expires_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES,
+               rdata->msg_info.msg->hdr.next);
        if (!expires_header) {
-               expires_header = pjsip_expires_hdr_create(pool, 0);
+               expires_header = pjsip_expires_hdr_create(rdata->tp_info.pool, 0);
                if (!expires_header) {
+                       ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not update expires header.\n",
+                               persistence->endpoint);
                        ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+                       ao2_ref(endpoint, -1);
                        return 0;
                }
-               pjsip_msg_add_hdr(rdata.msg_info.msg, (pjsip_hdr*)expires_header);
-       }
-       expires_header->ivalue = (ast_tvdiff_ms(persistence->expires, ast_tvnow()) / 1000);
-
-       handler = subscription_get_handler_from_rdata(&rdata);
-       if (!handler || !handler->notifier) {
-               ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
-               return 0;
+               pjsip_msg_add_hdr(rdata->msg_info.msg, (pjsip_hdr *) expires_header);
        }
 
-       generator = subscription_get_generator_from_rdata(&rdata, handler);
-       if (!generator) {
+       expires_header->ivalue = (ast_tvdiff_ms(persistence->expires, ast_tvnow()) / 1000);
+       if (expires_header->ivalue <= 0) {
+               /* The subscription expired since we started recreating the subscription. */
+               ast_debug(3, "Expired subscription retrived from persistent store '%s' %s\n",
+                       persistence->endpoint, persistence->tag);
                ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+               ao2_ref(endpoint, -1);
                return 0;
        }
 
-       ast_sip_mod_data_set(rdata.tp_info.pool, rdata.endpt_info.mod_data,
-                       pubsub_module.id, MOD_DATA_PERSISTENCE, persistence);
-
        memset(&tree, 0, sizeof(tree));
        resp = build_resource_tree(endpoint, handler, resource, &tree,
-               ast_sip_pubsub_has_eventlist_support(&rdata));
+               ast_sip_pubsub_has_eventlist_support(rdata));
        if (PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
                pj_status_t dlg_status;
 
-               sub_tree = create_subscription_tree(handler, endpoint, &rdata, resource, generator, &tree, &dlg_status);
+               sub_tree = create_subscription_tree(handler, endpoint, rdata, resource, generator,
+                       &tree, &dlg_status);
                if (!sub_tree) {
-                       ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
-                       ast_log(LOG_WARNING, "Failed to re-create subscription for %s\n", persistence->endpoint);
-                       return 0;
-               }
-               sub_tree->persistence = ao2_bump(persistence);
-               subscription_persistence_update(sub_tree, &rdata);
-               if (ast_sip_push_task(sub_tree->serializer, initial_notify_task, ao2_bump(sub_tree))) {
-                       pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE);
-                       ao2_ref(sub_tree, -1);
+                       if (dlg_status != PJ_EEXISTS) {
+                               ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not create subscription tree.\n",
+                                       persistence->endpoint);
+                               ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+                       }
+               } else {
+                       struct initial_notify_data *ind = ast_malloc(sizeof(*ind));
+
+                       if (!ind) {
+                               pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE);
+                               goto error;
+                       }
+
+                       ind->sub_tree = ao2_bump(sub_tree);
+                       ind->expires = expires_header->ivalue;
+
+                       sub_tree->persistence = ao2_bump(persistence);
+                       subscription_persistence_update(sub_tree, rdata, SUBSCRIPTION_PERSISTENCE_RECREATED);
+                       if (ast_sip_push_task(sub_tree->serializer, initial_notify_task, ind)) {
+                               /* Could not send initial subscribe NOTIFY */
+                               pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE);
+                               ao2_ref(sub_tree, -1);
+                               ast_free(ind);
+                       }
                }
        } else {
                ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
        }
+
+error:
        resource_tree_destroy(&tree);
+       ao2_ref(endpoint, -1);
+
+       return 0;
+}
+
+/*! \brief Callback function to perform the actual recreation of a subscription */
+static int subscription_persistence_recreate(void *obj, void *arg, int flags)
+{
+       struct subscription_persistence *persistence = obj;
+       pj_pool_t *pool = arg;
+       struct ast_taskprocessor *serializer;
+       pjsip_rx_data rdata;
+       struct persistence_recreate_data recreate_data;
+
+       /* If this subscription has already expired remove it */
+       if (ast_tvdiff_ms(persistence->expires, ast_tvnow()) <= 0) {
+               ast_debug(3, "Expired subscription retrived from persistent store '%s' %s\n",
+                       persistence->endpoint, persistence->tag);
+               ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+               return 0;
+       }
+
+       memset(&rdata, 0, sizeof(rdata));
+       pj_pool_reset(pool);
+       rdata.tp_info.pool = pool;
+
+       if (ast_sip_create_rdata_with_contact(&rdata, persistence->packet, persistence->src_name,
+               persistence->src_port, persistence->transport_key, persistence->local_name,
+               persistence->local_port, persistence->contact_uri)) {
+               ast_log(LOG_WARNING, "Failed recreating '%s' subscription: The message could not be parsed\n",
+                       persistence->endpoint);
+               ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+               return 0;
+       }
+
+       if (rdata.msg_info.msg->type != PJSIP_REQUEST_MSG) {
+               ast_log(LOG_NOTICE, "Failed recreating '%s' subscription: Stored a SIP response instead of a request.\n",
+                       persistence->endpoint);
+               ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+               return 0;
+       }
+
+       /* Continue the remainder in the distributor serializer */
+       serializer = ast_sip_get_distributor_serializer(&rdata);
+       if (!serializer) {
+               ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not get distributor serializer.\n",
+                       persistence->endpoint);
+               ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+               return 0;
+       }
+       recreate_data.persistence = persistence;
+       recreate_data.rdata = &rdata;
+       if (ast_sip_push_task_synchronous(serializer, sub_persistence_recreate, &recreate_data)) {
+               ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not continue under distributor serializer.\n",
+                       persistence->endpoint);
+               ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+       }
+       ast_taskprocessor_unreference(serializer);
 
        return 0;
 }
@@ -1493,18 +1650,19 @@ static int for_each_subscription(on_subscription_t on_subscription, void *arg)
 {
        int num = 0;
        struct sip_subscription_tree *i;
-       SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
 
        if (!on_subscription) {
                return num;
        }
 
+       AST_RWLIST_RDLOCK(&subscriptions);
        AST_RWLIST_TRAVERSE(&subscriptions, i, next) {
                if (on_subscription(i, arg)) {
                        break;
                }
                ++num;
        }
+       AST_RWLIST_UNLOCK(&subscriptions);
        return num;
 }
 
@@ -1519,7 +1677,11 @@ static void sip_subscription_to_ami(struct sip_subscription_tree *sub_tree,
        ast_str_append(buf, 0, "Endpoint: %s\r\n",
                       ast_sorcery_object_get_id(sub_tree->endpoint));
 
-       ast_copy_pj_str(str, &sub_tree->dlg->call_id->id, sizeof(str));
+       if (sub_tree->dlg) {
+               ast_copy_pj_str(str, &sub_tree->dlg->call_id->id, sizeof(str));
+       } else {
+               ast_copy_string(str, "<unknown>", sizeof(str));
+       }
        ast_str_append(buf, 0, "Callid: %s\r\n", str);
 
        ast_str_append(buf, 0, "State: %s\r\n", pjsip_evsub_get_state_name(sub_tree->evsub));
@@ -1540,10 +1702,12 @@ static void sip_subscription_to_ami(struct sip_subscription_tree *sub_tree,
 
 void *ast_sip_subscription_get_header(const struct ast_sip_subscription *sub, const char *header)
 {
-       pjsip_dialog *dlg = sub->tree->dlg;
-       pjsip_msg *msg = ast_sip_mod_data_get(dlg->mod_data, pubsub_module.id, MOD_DATA_MSG);
+       pjsip_dialog *dlg;
+       pjsip_msg *msg;
        pj_str_t name;
 
+       dlg = sub->tree->dlg;
+       msg = ast_sip_mod_data_get(dlg->mod_data, pubsub_module.id, MOD_DATA_MSG);
        pj_cstr(&name, header);
 
        return pjsip_msg_find_hdr_by_name(msg, &name, NULL);
@@ -1561,7 +1725,7 @@ struct ast_sip_subscription *ast_sip_create_subscription(const struct ast_sip_su
        pjsip_evsub *evsub;
        struct sip_subscription_tree *sub_tree = NULL;
 
-       sub_tree = allocate_subscription_tree(endpoint);
+       sub_tree = allocate_subscription_tree(endpoint, NULL);
        if (!sub_tree) {
                return NULL;
        }
@@ -1596,7 +1760,7 @@ struct ast_sip_subscription *ast_sip_create_subscription(const struct ast_sip_su
        evsub = sub_tree->evsub;
 
        if (pjsip_evsub_initiate(evsub, NULL, -1, &tdata) == PJ_SUCCESS) {
-               pjsip_evsub_send_request(evsub, tdata);
+               pjsip_evsub_send_request(sub_tree->evsub, tdata);
        } else {
                /* pjsip_evsub_terminate will result in pubsub_on_evsub_state,
                 * being called and terminating the subscription. Therefore, we don't
@@ -1607,9 +1771,17 @@ struct ast_sip_subscription *ast_sip_create_subscription(const struct ast_sip_su
                return NULL;
        }
 
+       add_subscription(sub_tree);
+
        return sub;
 }
 
+pjsip_dialog *ast_sip_subscription_get_dialog(struct ast_sip_subscription *sub)
+{
+       ast_assert(sub->tree->dlg != NULL);
+       return sub->tree->dlg;
+}
+
 struct ast_sip_endpoint *ast_sip_subscription_get_endpoint(struct ast_sip_subscription *sub)
 {
        ast_assert(sub->tree->endpoint != NULL);
@@ -1675,17 +1847,19 @@ static int sip_subscription_send_request(struct sip_subscription_tree *sub_tree,
 {
 #ifdef TEST_FRAMEWORK
        struct ast_sip_endpoint *endpoint = sub_tree->endpoint;
-#endif
        pjsip_evsub *evsub = sub_tree->evsub;
+#endif
        int res;
 
        if (allocate_tdata_buffer(tdata)) {
                ast_log(LOG_ERROR, "SIP request %s is too large to send.\n", tdata->info);
+               pjsip_tx_data_dec_ref(tdata);
                return -1;
        }
 
-       res = pjsip_evsub_send_request(evsub, tdata) == PJ_SUCCESS ? 0 : -1;
-       subscription_persistence_update(sub_tree, NULL);
+       res = pjsip_evsub_send_request(sub_tree->evsub, tdata);
+
+       subscription_persistence_update(sub_tree, NULL, SUBSCRIPTION_PERSISTENCE_SEND_REQUEST);
 
        ast_test_suite_event_notify("SUBSCRIPTION_STATE_SET",
                "StateText: %s\r\n"
@@ -1693,7 +1867,7 @@ static int sip_subscription_send_request(struct sip_subscription_tree *sub_tree,
                pjsip_evsub_get_state_name(evsub),
                ast_sorcery_object_get_id(endpoint));
 
-       return res;
+       return (res == PJ_SUCCESS ? 0 : -1);
 }
 
 /*!
@@ -2132,10 +2306,8 @@ static int send_notify(struct sip_subscription_tree *sub_tree, unsigned int forc
                pjsip_msg_add_hdr(tdata->msg, (pjsip_hdr *) require);
        }
 
-       if (sub_tree->root->subscription_state == PJSIP_EVSUB_STATE_TERMINATED) {
-               sub_tree->last_notify = 1;
-       }
        if (sip_subscription_send_request(sub_tree, tdata)) {
+               /* do not call pjsip_tx_data_dec_ref(tdata). The pjsip_dlg_send_request deletes the message on error */
                return -1;
        }
 
@@ -2147,25 +2319,37 @@ static int send_notify(struct sip_subscription_tree *sub_tree, unsigned int forc
 static int serialized_send_notify(void *userdata)
 {
        struct sip_subscription_tree *sub_tree = userdata;
+       pjsip_dialog *dlg = sub_tree->dlg;
+
+       pjsip_dlg_inc_lock(dlg);
 
-       pjsip_dlg_inc_lock(sub_tree->dlg);
        /* It's possible that between when the notification was scheduled
-        * and now, that a new SUBSCRIBE arrived, requiring full state to be
-        * sent out in an immediate NOTIFY. If that has happened, we need to
+        * and now a new SUBSCRIBE arrived requiring full state to be
+        * sent out in an immediate NOTIFY. It's also possible that we're
+        * already processing a terminate.  If that has happened, we need to
         * bail out here instead of sending the batched NOTIFY.
         */
-       if (!sub_tree->send_scheduled_notify) {
-               pjsip_dlg_dec_lock(sub_tree->dlg);
+
+       if (sub_tree->state >= SIP_SUB_TREE_TERMINATE_IN_PROGRESS
+               || !sub_tree->send_scheduled_notify) {
+               pjsip_dlg_dec_lock(dlg);
                ao2_cleanup(sub_tree);
                return 0;
        }
 
+       if (sub_tree->root->subscription_state == PJSIP_EVSUB_STATE_TERMINATED) {
+               sub_tree->state = SIP_SUB_TREE_TERMINATE_IN_PROGRESS;
+       }
+
        send_notify(sub_tree, 0);
-       ast_test_suite_event_notify("SUBSCRIPTION_STATE_CHANGED",
-                       "Resource: %s",
-                       sub_tree->root->resource);
+
+       ast_test_suite_event_notify(
+               sub_tree->state == SIP_SUB_TREE_TERMINATED
+                       ? "SUBSCRIPTION_TERMINATED" : "SUBSCRIPTION_STATE_CHANGED",
+               "Resource: %s", sub_tree->root->resource);
+
        sub_tree->notify_sched_id = -1;
-       pjsip_dlg_dec_lock(sub_tree->dlg);
+       pjsip_dlg_dec_lock(dlg);
        ao2_cleanup(sub_tree);
        return 0;
 }
@@ -2175,7 +2359,10 @@ static int sched_cb(const void *data)
        struct sip_subscription_tree *sub_tree = (struct sip_subscription_tree *) data;
 
        /* We don't need to bump the refcount of sub_tree since we bumped it when scheduling this task */
-       ast_sip_push_task(sub_tree->serializer, serialized_send_notify, sub_tree);
+       if (ast_sip_push_task(sub_tree->serializer, serialized_send_notify, sub_tree)) {
+               ao2_cleanup(sub_tree);
+       }
+
        return 0;
 }
 
@@ -2186,12 +2373,13 @@ static int schedule_notification(struct sip_subscription_tree *sub_tree)
                return 0;
        }
 
+       sub_tree->send_scheduled_notify = 1;
        sub_tree->notify_sched_id = ast_sched_add(sched, sub_tree->notification_batch_interval, sched_cb, ao2_bump(sub_tree));
        if (sub_tree->notify_sched_id < 0) {
+               ao2_cleanup(sub_tree);
                return -1;
        }
 
-       sub_tree->send_scheduled_notify = 1;
        return 0;
 }
 
@@ -2199,23 +2387,25 @@ int ast_sip_subscription_notify(struct ast_sip_subscription *sub, struct ast_sip
                int terminate)
 {
        int res;
+       pjsip_dialog *dlg = sub->tree->dlg;
 
-       pjsip_dlg_inc_lock(sub->tree->dlg);
+       pjsip_dlg_inc_lock(dlg);
 
-       if (!sub->tree->evsub) {
-               pjsip_dlg_dec_lock(sub->tree->dlg);
+       if (sub->tree->state != SIP_SUB_TREE_NORMAL) {
+               pjsip_dlg_dec_lock(dlg);
                return 0;
        }
 
        if (ast_sip_pubsub_generate_body_content(ast_sip_subscription_get_body_type(sub),
                                ast_sip_subscription_get_body_subtype(sub), notify_data, &sub->body_text)) {
-               pjsip_dlg_dec_lock(sub->tree->dlg);
+               pjsip_dlg_dec_lock(dlg);
                return -1;
        }
 
        sub->body_changed = 1;
        if (terminate) {
                sub->subscription_state = PJSIP_EVSUB_STATE_TERMINATED;
+               sub->tree->state = SIP_SUB_TREE_TERMINATE_PENDING;
        }
 
        if (sub->tree->notification_batch_interval) {
@@ -2223,6 +2413,9 @@ int ast_sip_subscription_notify(struct ast_sip_subscription *sub, struct ast_sip
        } else {
                /* See the note in pubsub_on_rx_refresh() for why sub->tree is refbumped here */
                ao2_ref(sub->tree, +1);
+               if (terminate) {
+                       sub->tree->state = SIP_SUB_TREE_TERMINATE_IN_PROGRESS;
+               }
                res = send_notify(sub->tree, 0);
                ast_test_suite_event_notify(terminate ? "SUBSCRIPTION_TERMINATED" : "SUBSCRIPTION_STATE_CHANGED",
                                "Resource: %s",
@@ -2230,10 +2423,15 @@ int ast_sip_subscription_notify(struct ast_sip_subscription *sub, struct ast_sip
                ao2_ref(sub->tree, -1);
        }
 
-       pjsip_dlg_dec_lock(sub->tree->dlg);
+       pjsip_dlg_dec_lock(dlg);
        return res;
 }
 
+pjsip_sip_uri *ast_sip_subscription_get_sip_uri(struct ast_sip_subscription *sub)
+{
+       return sub->uri;
+}
+
 void ast_sip_subscription_get_local_uri(struct ast_sip_subscription *sub, char *buf, size_t size)
 {
        pjsip_uri_print(PJSIP_URI_IN_CONTACT_HDR, sub->uri, buf, size);
@@ -2241,7 +2439,9 @@ void ast_sip_subscription_get_local_uri(struct ast_sip_subscription *sub, char *
 
 void ast_sip_subscription_get_remote_uri(struct ast_sip_subscription *sub, char *buf, size_t size)
 {
-       pjsip_dialog *dlg = sub->tree->dlg;
+       pjsip_dialog *dlg;
+
+       dlg = sub->tree->dlg;
        ast_copy_pj_str(buf, &dlg->remote.info_str, size);
 }
 
@@ -2273,92 +2473,49 @@ static int sip_subscription_accept(struct sip_subscription_tree *sub_tree, pjsip
        return pjsip_evsub_accept(sub_tree->evsub, rdata, response, &res_hdr) == PJ_SUCCESS ? 0 : -1;
 }
 
-static void subscription_datastore_destroy(void *obj)
+struct ast_datastore *ast_sip_subscription_alloc_datastore(const struct ast_datastore_info *info, const char *uid)
 {
-       struct ast_datastore *datastore = obj;
+       return ast_datastores_alloc_datastore(info, uid);
+}
 
-       /* Using the destroy function (if present) destroy the data */
-       if (datastore->info->destroy != NULL && datastore->data != NULL) {
-               datastore->info->destroy(datastore->data);
-               datastore->data = NULL;
-       }
+int ast_sip_subscription_add_datastore(struct ast_sip_subscription *subscription, struct ast_datastore *datastore)
+{
+       return ast_datastores_add(subscription->datastores, datastore);
+}
 
-       ast_free((void *) datastore->uid);
-       datastore->uid = NULL;
+struct ast_datastore *ast_sip_subscription_get_datastore(struct ast_sip_subscription *subscription, const char *name)
+{
+       return ast_datastores_find(subscription->datastores, name);
 }
 
-struct ast_datastore *ast_sip_subscription_alloc_datastore(const struct ast_datastore_info *info, const char *uid)
+void ast_sip_subscription_remove_datastore(struct ast_sip_subscription *subscription, const char *name)
 {
-       RAII_VAR(struct ast_datastore *, datastore, NULL, ao2_cleanup);
-       char uuid_buf[AST_UUID_STR_LEN];
-       const char *uid_ptr = uid;
+       ast_datastores_remove(subscription->datastores, name);
+}
 
-       if (!info) {
-               return NULL;
-       }
+struct ao2_container *ast_sip_subscription_get_datastores(const struct ast_sip_subscription *subscription)
+{
+       return subscription->datastores;
+}
 
-       datastore = ao2_alloc(sizeof(*datastore), subscription_datastore_destroy);
-       if (!datastore) {
-               return NULL;
-       }
+int ast_sip_publication_add_datastore(struct ast_sip_publication *publication, struct ast_datastore *datastore)
+{
+       return ast_datastores_add(publication->datastores, datastore);
+}
 
-       datastore->info = info;
-       if (ast_strlen_zero(uid)) {
-               /* They didn't provide an ID so we'll provide one ourself */
-               uid_ptr = ast_uuid_generate_str(uuid_buf, sizeof(uuid_buf));
-       }
-
-       datastore->uid = ast_strdup(uid_ptr);
-       if (!datastore->uid) {
-               return NULL;
-       }
-
-       ao2_ref(datastore, +1);
-       return datastore;
-}
-
-int ast_sip_subscription_add_datastore(struct ast_sip_subscription *subscription, struct ast_datastore *datastore)
-{
-       ast_assert(datastore != NULL);
-       ast_assert(datastore->info != NULL);
-       ast_assert(!ast_strlen_zero(datastore->uid));
-
-       if (!ao2_link(subscription->datastores, datastore)) {
-               return -1;
-       }
-       return 0;
-}
-
-struct ast_datastore *ast_sip_subscription_get_datastore(struct ast_sip_subscription *subscription, const char *name)
-{
-       return ao2_find(subscription->datastores, name, OBJ_KEY);
-}
-
-void ast_sip_subscription_remove_datastore(struct ast_sip_subscription *subscription, const char *name)
-{
-       ao2_find(subscription->datastores, name, OBJ_SEARCH_KEY | OBJ_UNLINK | OBJ_NODATA);
-}
-
-int ast_sip_publication_add_datastore(struct ast_sip_publication *publication, struct ast_datastore *datastore)
+struct ast_datastore *ast_sip_publication_get_datastore(struct ast_sip_publication *publication, const char *name)
 {
-       ast_assert(datastore != NULL);
-       ast_assert(datastore->info != NULL);
-       ast_assert(!ast_strlen_zero(datastore->uid));
-
-       if (!ao2_link(publication->datastores, datastore)) {
-               return -1;
-       }
-       return 0;
+       return ast_datastores_find(publication->datastores, name);
 }
 
-struct ast_datastore *ast_sip_publication_get_datastore(struct ast_sip_publication *publication, const char *name)
+void ast_sip_publication_remove_datastore(struct ast_sip_publication *publication, const char *name)
 {
-       return ao2_find(publication->datastores, name, OBJ_KEY);
+       ast_datastores_remove(publication->datastores, name);
 }
 
-void ast_sip_publication_remove_datastore(struct ast_sip_publication *publication, const char *name)
+struct ao2_container *ast_sip_publication_get_datastores(const struct ast_sip_publication *publication)
 {
-       ao2_callback(publication->datastores, OBJ_KEY | OBJ_UNLINK | OBJ_NODATA, NULL, (void *) name);
+       return publication->datastores;
 }
 
 AST_RWLIST_HEAD_STATIC(publish_handlers, ast_sip_publish_handler);
@@ -2383,8 +2540,9 @@ static int publication_cmp_fn(void *obj, void *arg, int flags)
 
 static void publish_add_handler(struct ast_sip_publish_handler *handler)
 {
-       SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
+       AST_RWLIST_WRLOCK(&publish_handlers);
        AST_RWLIST_INSERT_TAIL(&publish_handlers, handler, next);
+       AST_RWLIST_UNLOCK(&publish_handlers);
 }
 
 int ast_sip_register_publish_handler(struct ast_sip_publish_handler *handler)
@@ -2411,7 +2569,8 @@ int ast_sip_register_publish_handler(struct ast_sip_publish_handler *handler)
 void ast_sip_unregister_publish_handler(struct ast_sip_publish_handler *handler)
 {
        struct ast_sip_publish_handler *iter;
-       SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
+
+       AST_RWLIST_WRLOCK(&publish_handlers);
        AST_RWLIST_TRAVERSE_SAFE_BEGIN(&publish_handlers, iter, next) {
                if (handler == iter) {
                        AST_RWLIST_REMOVE_CURRENT(next);
@@ -2421,27 +2580,30 @@ void ast_sip_unregister_publish_handler(struct ast_sip_publish_handler *handler)
                }
        }
        AST_RWLIST_TRAVERSE_SAFE_END;
+       AST_RWLIST_UNLOCK(&publish_handlers);
 }
 
 AST_RWLIST_HEAD_STATIC(subscription_handlers, ast_sip_subscription_handler);
 
 static void sub_add_handler(struct ast_sip_subscription_handler *handler)
 {
-       SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
+       AST_RWLIST_WRLOCK(&subscription_handlers);
        AST_RWLIST_INSERT_TAIL(&subscription_handlers, handler, next);
        ast_module_ref(ast_module_info->self);
+       AST_RWLIST_UNLOCK(&subscription_handlers);
 }
 
 static struct ast_sip_subscription_handler *find_sub_handler_for_event_name(const char *event_name)
 {
        struct ast_sip_subscription_handler *iter;
-       SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
 
+       AST_RWLIST_RDLOCK(&subscription_handlers);
        AST_RWLIST_TRAVERSE(&subscription_handlers, iter, next) {
                if (!strcmp(iter->event_name, event_name)) {
                        break;
                }
        }
+       AST_RWLIST_UNLOCK(&subscription_handlers);
        return iter;
 }
 
@@ -2459,8 +2621,9 @@ int ast_sip_register_subscription_handler(struct ast_sip_subscription_handler *h
 
        existing = find_sub_handler_for_event_name(handler->event_name);
        if (existing) {
-               ast_log(LOG_ERROR, "Unable to register subscription handler for event %s."
-                               "A handler is already registered\n", handler->event_name);
+               ast_log(LOG_ERROR,
+                       "Unable to register subscription handler for event %s.  A handler is already registered\n",
+                       handler->event_name);
                return -1;
        }
 
@@ -2480,7 +2643,8 @@ int ast_sip_register_subscription_handler(struct ast_sip_subscription_handler *h
 void ast_sip_unregister_subscription_handler(struct ast_sip_subscription_handler *handler)
 {
        struct ast_sip_subscription_handler *iter;
-       SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
+
+       AST_RWLIST_WRLOCK(&subscription_handlers);
        AST_RWLIST_TRAVERSE_SAFE_BEGIN(&subscription_handlers, iter, next) {
                if (handler == iter) {
                        AST_RWLIST_REMOVE_CURRENT(next);
@@ -2489,22 +2653,31 @@ void ast_sip_unregister_subscription_handler(struct ast_sip_subscription_handler
                }
        }
        AST_RWLIST_TRAVERSE_SAFE_END;
+       AST_RWLIST_UNLOCK(&subscription_handlers);
 }
 
-static struct ast_sip_pubsub_body_generator *find_body_generator_type_subtype(const char *content_type,
-               const char *content_subtype)
+static struct ast_sip_pubsub_body_generator *find_body_generator_type_subtype_nolock(const char *type, const char *subtype)
 {
-       struct ast_sip_pubsub_body_generator *iter;
-       SCOPED_LOCK(lock, &body_generators, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
+       struct ast_sip_pubsub_body_generator *gen;
 
-       AST_LIST_TRAVERSE(&body_generators, iter, list) {
-               if (!strcmp(iter->type, content_type) &&
-                               !strcmp(iter->subtype, content_subtype)) {
+       AST_LIST_TRAVERSE(&body_generators, gen, list) {
+               if (!strcmp(gen->type, type)
+                       && !strcmp(gen->subtype, subtype)) {
                        break;
                }
-       };
+       }
 
-       return iter;
+       return gen;
+}
+
+static struct ast_sip_pubsub_body_generator *find_body_generator_type_subtype(const char *type, const char *subtype)
+{
+       struct ast_sip_pubsub_body_generator *gen;
+
+       AST_RWLIST_RDLOCK(&body_generators);
+       gen = find_body_generator_type_subtype_nolock(type, subtype);
+       AST_RWLIST_UNLOCK(&body_generators);
+       return gen;
 }
 
 static struct ast_sip_pubsub_body_generator *find_body_generator_accept(const char *accept)
@@ -2585,21 +2758,45 @@ static int generate_initial_notify(struct ast_sip_subscription *sub)
        return res;
 }
 
+static int pubsub_on_refresh_timeout(void *userdata);
+
 static int initial_notify_task(void * obj)
 {
-       struct sip_subscription_tree *sub_tree;
+       struct initial_notify_data *ind = obj;
 
-       sub_tree = obj;
-       if (generate_initial_notify(sub_tree->root)) {
-               pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE);
+       if (generate_initial_notify(ind->sub_tree->root)) {
+               pjsip_evsub_terminate(ind->sub_tree->evsub, PJ_TRUE);
        } else {
-               send_notify(sub_tree, 1);
+               send_notify(ind->sub_tree, 1);
                ast_test_suite_event_notify("SUBSCRIPTION_ESTABLISHED",
                        "Resource: %s",
-                       sub_tree->root->resource);
+                       ind->sub_tree->root->resource);
+       }
+
+       if (ind->expires > -1) {
+               char *name = ast_alloca(strlen("->/ ") +
+                       strlen(ind->sub_tree->persistence->endpoint) +
+                       strlen(ind->sub_tree->root->resource) +
+                       strlen(ind->sub_tree->root->handler->event_name) +
+                       ind->sub_tree->dlg->call_id->id.slen + 1);
+
+               sprintf(name, "%s->%s/%s %.*s", ind->sub_tree->persistence->endpoint,
+                       ind->sub_tree->root->resource, ind->sub_tree->root->handler->event_name,
+                       (int)ind->sub_tree->dlg->call_id->id.slen, ind->sub_tree->dlg->call_id->id.ptr);
+
+               ast_debug(3, "Scheduling timer: %s\n", name);
+               ind->sub_tree->expiration_task = ast_sip_schedule_task(ind->sub_tree->serializer,
+                       ind->expires * 1000, pubsub_on_refresh_timeout, name,
+                       ind->sub_tree, AST_SIP_SCHED_TASK_FIXED | AST_SIP_SCHED_TASK_DATA_AO2);
+               if (!ind->sub_tree->expiration_task) {
+                       ast_log(LOG_ERROR, "Unable to create expiration timer of %d seconds for %s\n",
+                               ind->expires, name);
+               }
        }
 
-       ao2_ref(sub_tree, -1);
+       ao2_ref(ind->sub_tree, -1);
+       ast_free(ind);
+
        return 0;
 }
 
@@ -2643,8 +2840,13 @@ static pj_bool_t pubsub_on_rx_subscribe_request(pjsip_rx_data *rdata)
        resource = ast_alloca(resource_size);
        ast_copy_pj_str(resource, &request_uri_sip->user, resource_size);
 
-       expires_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, rdata->msg_info.msg->hdr.next);
+       /*
+        * We may want to match without any user options getting
+        * in the way.
+        */
+       AST_SIP_USER_OPTIONS_TRUNCATE_CHECK(resource);
 
+       expires_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, rdata->msg_info.msg->hdr.next);
        if (expires_header) {
                if (expires_header->ivalue == 0) {
                        ast_log(LOG_WARNING, "Subscription request from endpoint %s rejected. Expiration of 0 is invalid\n",
@@ -2687,12 +2889,25 @@ static pj_bool_t pubsub_on_rx_subscribe_request(pjsip_rx_data *rdata)
                        pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
                }
        } else {
+               struct initial_notify_data *ind = ast_malloc(sizeof(*ind));
+
+               if (!ind) {
+                       pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE);
+                       resource_tree_destroy(&tree);
+                       return PJ_TRUE;
+               }
+
+               ind->sub_tree = ao2_bump(sub_tree);
+               /* Since this is a normal subscribe, pjproject takes care of the timer */
+               ind->expires = -1;
+
                sub_tree->persistence = subscription_persistence_create(sub_tree);
-               subscription_persistence_update(sub_tree, rdata);
+               subscription_persistence_update(sub_tree, rdata, SUBSCRIPTION_PERSISTENCE_CREATED);
                sip_subscription_accept(sub_tree, rdata, resp);
-               if (ast_sip_push_task(sub_tree->serializer, initial_notify_task, ao2_bump(sub_tree))) {
+               if (ast_sip_push_task(sub_tree->serializer, initial_notify_task, ind)) {
                        pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE);
                        ao2_ref(sub_tree, -1);
+                       ast_free(ind);
                }
        }
 
@@ -2703,8 +2918,8 @@ static pj_bool_t pubsub_on_rx_subscribe_request(pjsip_rx_data *rdata)
 static struct ast_sip_publish_handler *find_pub_handler(const char *event)
 {
        struct ast_sip_publish_handler *iter = NULL;
-       SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
 
+       AST_RWLIST_RDLOCK(&publish_handlers);
        AST_RWLIST_TRAVERSE(&publish_handlers, iter, next) {
                if (strcmp(event, iter->event_name)) {
                        ast_debug(3, "Event %s does not match %s\n", event, iter->event_name);
@@ -2713,6 +2928,7 @@ static struct ast_sip_publish_handler *find_pub_handler(const char *event)
                ast_debug(3, "Event name match: %s = %s\n", event, iter->event_name);
                break;
        }
+       AST_RWLIST_UNLOCK(&publish_handlers);
 
        return iter;
 }
@@ -2772,7 +2988,7 @@ static struct ast_sip_publication *sip_create_publication(struct ast_sip_endpoin
                return NULL;
        }
 
-       if (!(publication->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp))) {
+       if (!(publication->datastores = ast_datastores_alloc())) {
                ao2_ref(publication, -1);
                return NULL;
        }
@@ -2793,7 +3009,6 @@ static struct ast_sip_publication *sip_create_publication(struct ast_sip_endpoin
 static int sip_publication_respond(struct ast_sip_publication *pub, int status_code,
                pjsip_rx_data *rdata)
 {
-       pj_status_t status;
        pjsip_tx_data *tdata;
        pjsip_transaction *tsx;
 
@@ -2802,26 +3017,24 @@ static int sip_publication_respond(struct ast_sip_publication *pub, int status_c
        }
 
        if (PJSIP_IS_STATUS_IN_CLASS(status_code, 200)) {
-               RAII_VAR(char *, entity_tag, NULL, ast_free_ptr);
-               RAII_VAR(char *, expires, NULL, ast_free_ptr);
+               char buf[30];
 
-               if ((ast_asprintf(&entity_tag, "%d", pub->entity_tag) < 0) ||
-                       (ast_asprintf(&expires, "%d", pub->expires) < 0)) {
-                       pjsip_tx_data_dec_ref(tdata);
-                       return -1;
-               }
+               snprintf(buf, sizeof(buf), "%d", pub->entity_tag);
+               ast_sip_add_header(tdata, "SIP-ETag", buf);
 
-               ast_sip_add_header(tdata, "SIP-ETag", entity_tag);
-               ast_sip_add_header(tdata, "Expires", expires);
+               snprintf(buf, sizeof(buf), "%d", pub->expires);
+               ast_sip_add_header(tdata, "Expires", buf);
        }
 
-       if ((status = pjsip_tsx_create_uas(&pubsub_module, rdata, &tsx)) != PJ_SUCCESS) {
+       if (pjsip_tsx_create_uas(&pubsub_module, rdata, &tsx) != PJ_SUCCESS) {
+               pjsip_tx_data_dec_ref(tdata);
                return -1;
        }
 
        pjsip_tsx_recv_msg(tsx, rdata);
 
        if (pjsip_tsx_send_msg(tsx, tdata) != PJ_SUCCESS) {
+               pjsip_tx_data_dec_ref(tdata);
                return -1;
        }
 
@@ -2856,13 +3069,22 @@ static struct ast_sip_publication *publish_request_initial(struct ast_sip_endpoi
        resource_name = ast_alloca(resource_size);
        ast_copy_pj_str(resource_name, &request_uri_sip->user, resource_size);
 
+       /*
+        * We may want to match without any user options getting
+        * in the way.
+        */
+       AST_SIP_USER_OPTIONS_TRUNCATE_CHECK(resource_name);
+
        resource = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "inbound-publication", resource_name);
        if (!resource) {
+               ast_debug(1, "No 'inbound-publication' defined for resource '%s'\n", resource_name);
                pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 404, NULL, NULL, NULL);
                return NULL;
        }
 
        if (!ast_strlen_zero(resource->endpoint) && strcmp(resource->endpoint, ast_sorcery_object_get_id(endpoint))) {
+               ast_debug(1, "Resource %s has a defined endpoint '%s', but does not match endpoint '%s' that received the request\n",
+                       resource_name, resource->endpoint, ast_sorcery_object_get_id(endpoint));
                pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 403, NULL, NULL, NULL);
                return NULL;
        }
@@ -2874,6 +3096,7 @@ static struct ast_sip_publication *publish_request_initial(struct ast_sip_endpoi
        }
 
        if (!event_configuration_name) {
+               ast_debug(1, "Event '%s' is not configured for '%s'\n", handler->event_name, resource_name);
                pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 404, NULL, NULL, NULL);
                return NULL;
        }
@@ -3036,20 +3259,25 @@ const char *ast_sip_publication_get_event_configuration(const struct ast_sip_pub
        return pub->event_configuration_name;
 }
 
+int ast_sip_pubsub_is_body_generator_registered(const char *type, const char *subtype)
+{
+       return !!find_body_generator_type_subtype(type, subtype);
+}
+
 int ast_sip_pubsub_register_body_generator(struct ast_sip_pubsub_body_generator *generator)
 {
        struct ast_sip_pubsub_body_generator *existing;
        pj_str_t accept;
        pj_size_t accept_len;
 
-       existing = find_body_generator_type_subtype(generator->type, generator->subtype);
+       AST_RWLIST_WRLOCK(&body_generators);
+       existing = find_body_generator_type_subtype_nolock(generator->type, generator->subtype);
        if (existing) {
-               ast_log(LOG_WARNING, "Cannot register body generator of %s/%s."
-                               "One is already registered.\n", generator->type, generator->subtype);
+               AST_RWLIST_UNLOCK(&body_generators);
+               ast_log(LOG_WARNING, "A body generator for %s/%s is already registered.\n",
+                       generator->type, generator->subtype);
                return -1;
        }
-
-       AST_RWLIST_WRLOCK(&body_generators);
        AST_LIST_INSERT_HEAD(&body_generators, generator, list);
        AST_RWLIST_UNLOCK(&body_generators);
 
@@ -3069,8 +3297,8 @@ int ast_sip_pubsub_register_body_generator(struct ast_sip_pubsub_body_generator
 void ast_sip_pubsub_unregister_body_generator(struct ast_sip_pubsub_body_generator *generator)
 {
        struct ast_sip_pubsub_body_generator *iter;
-       SCOPED_LOCK(lock, &body_generators, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
 
+       AST_RWLIST_WRLOCK(&body_generators);
        AST_RWLIST_TRAVERSE_SAFE_BEGIN(&body_generators, iter, list) {
                if (iter == generator) {
                        AST_LIST_REMOVE_CURRENT(list);
@@ -3078,6 +3306,7 @@ void ast_sip_pubsub_unregister_body_generator(struct ast_sip_pubsub_body_generat
                }
        }
        AST_RWLIST_TRAVERSE_SAFE_END;
+       AST_RWLIST_UNLOCK(&body_generators);
 }
 
 int ast_sip_pubsub_register_body_supplement(struct ast_sip_pubsub_body_supplement *supplement)
@@ -3092,8 +3321,8 @@ int ast_sip_pubsub_register_body_supplement(struct ast_sip_pubsub_body_supplemen
 void ast_sip_pubsub_unregister_body_supplement(struct ast_sip_pubsub_body_supplement *supplement)
 {
        struct ast_sip_pubsub_body_supplement *iter;
-       SCOPED_LOCK(lock, &body_supplements, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
 
+       AST_RWLIST_WRLOCK(&body_supplements);
        AST_RWLIST_TRAVERSE_SAFE_BEGIN(&body_supplements, iter, list) {
                if (iter == supplement) {
                        AST_LIST_REMOVE_CURRENT(list);
@@ -3101,6 +3330,7 @@ void ast_sip_pubsub_unregister_body_supplement(struct ast_sip_pubsub_body_supple
                }
        }
        AST_RWLIST_TRAVERSE_SAFE_END;
+       AST_RWLIST_UNLOCK(&body_supplements);
 }
 
 const char *ast_sip_subscription_get_body_type(struct ast_sip_subscription *sub)
@@ -3129,14 +3359,15 @@ int ast_sip_pubsub_generate_body_content(const char *type, const char *subtype,
        }
 
        if (strcmp(data->body_type, generator->body_type)) {
-               ast_log(LOG_WARNING, "Body generator does not accept the type of data provided\n");
+               ast_log(LOG_WARNING, "%s/%s body generator does not accept the type of data provided\n",
+                       type, subtype);
                return -1;
        }
 
        body = generator->allocate_body(data->body_data);
        if (!body) {
-               ast_log(LOG_WARNING, "Unable to allocate a NOTIFY body of type %s/%s\n",
-                               type, subtype);
+               ast_log(LOG_WARNING, "%s/%s body generator could not to allocate a body\n",
+                       type, subtype);
                return -1;
        }
 
@@ -3190,124 +3421,175 @@ static void set_state_terminated(struct ast_sip_subscription *sub)
        }
 }
 
-/* XXX This function and serialized_pubsub_on_rx_refresh are nearly identical */
-static int serialized_pubsub_on_server_timeout(void *userdata)
-{
-       struct sip_subscription_tree *sub_tree = userdata;
-
-       pjsip_dlg_inc_lock(sub_tree->dlg);
-       if (!sub_tree->evsub) {
-               pjsip_dlg_dec_lock(sub_tree->dlg);
-               return 0;
-       }
-       set_state_terminated(sub_tree->root);
-       send_notify(sub_tree, 1);
-       ast_test_suite_event_notify("SUBSCRIPTION_TERMINATED",
-                       "Resource: %s",
-                       sub_tree->root->resource);
-
-       pjsip_dlg_dec_lock(sub_tree->dlg);
-       ao2_cleanup(sub_tree);
-       return 0;
-}
-
 /*!
- * \brief PJSIP callback when underlying SIP subscription changes state
- *
- * This callback is a bit of a mess, because it's not always called when
- * you might expect it to be, and it can be called multiple times for the
- * same state.
+ * \brief Callback sequence for subscription terminate:
  *
- * For instance, this function is not called at all when an incoming SUBSCRIBE
- * arrives to refresh a subscription. That makes sense in a way, since the
- * subscription state has not made a change; it was active and remains active.
+ * * Client initiated:
+ *     pjproject receives SUBSCRIBE on the subscription's serializer thread
+ *         calls pubsub_on_rx_refresh with dialog locked
+ *             pubsub_on_rx_refresh sets TERMINATE_PENDING
+ *             pushes serialized_pubsub_on_refresh_timeout
+ *             returns to pjproject
+ *         pjproject calls pubsub_on_evsub_state
+ *             pubsub_evsub_set_state checks state == TERMINATE_IN_PROGRESS (no)
+ *             ignore and return
+ *         pjproject unlocks dialog
+ *     serialized_pubsub_on_refresh_timeout starts (1)
+ *       locks dialog
+ *       checks state == TERMINATE_PENDING
+ *       sets TERMINATE_IN_PROGRESS
+ *       calls send_notify (2)
+ *           send_notify ultimately calls pjsip_evsub_send_request
+ *               pjsip_evsub_send_request calls evsub's set_state
+ *                   set_state calls pubsub_evsub_set_state
+ *                       pubsub_on_evsub_state checks state == TERMINATE_IN_PROGRESS
+ *                       removes the subscriptions
+ *                       cleans up references to evsub
+ *                       sets state = TERMINATED
+ *       serialized_pubsub_on_refresh_timeout unlocks dialog
  *
- * However, if an incoming SUBSCRIBE arrives to end a subscription, then this
- * will be called into once upon receiving the SUBSCRIBE (after the call to
- * pubsub_on_rx_refresh) and again when sending a NOTIFY to end the subscription.
- * In both cases, the apparent state of the subscription is "terminated".
+ * * Subscription timer expires:
+ *     pjproject timer expires
+ *         locks dialog
+ *         calls pubsub_on_server_timeout
+ *             pubsub_on_server_timeout checks state == NORMAL
+ *             sets TERMINATE_PENDING
+ *             pushes serialized_pubsub_on_refresh_timeout
+ *             returns to pjproject
+ *         pjproject unlocks dialog
+ *     serialized_pubsub_on_refresh_timeout starts
+ *         See (1) Above
  *
- * However, the double-terminated state changes don't happen in all cases. For
- * instance, if a subscription expires, then the only time this callback is
- * called is when we send the NOTIFY to end the subscription.
+ * * Transmission failure sending NOTIFY or error response from client
+ *     pjproject transaction timer expires or non OK response
+ *         pjproject locks dialog
+ *         calls pubsub_on_evsub_state with event TSX_STATE
+ *             pubsub_on_evsub_state checks event == TSX_STATE
+ *             removes the subscriptions
+ *             cleans up references to evsub
+ *             sets state = TERMINATED
+ *         pjproject unlocks dialog
  *
- * As far as state changes are concerned, we only ever care about transitions
- * to the "terminated" state. The action we take here is dependent on the
- * conditions behind why the state change to "terminated" occurred. If the
- * state change has occurred because we are sending a NOTIFY to end the
- * subscription, we consider this to be the final hurrah of the subscription
- * and take measures to start shutting things down. If the state change to
- * terminated occurs for a different reason (e.g. transaction timeout,
- * incoming SUBSCRIBE to end the subscription), then we push a task to
- * send out a NOTIFY. When that NOTIFY is sent, this callback will be
- * called again and we will actually shut down the subscription. The
- * subscription tree's last_notify field let's us know if this is being
- * called as a result of a terminating NOTIFY or not.
+ * * ast_sip_subscription_notify is called
+ *       checks state == NORMAL
+ *       if not batched...
+ *           sets TERMINATE_IN_PROGRESS (if terminate is requested)
+ *           calls send_notify
+ *               See (2) Above
+ *       if batched...
+ *           sets TERMINATE_PENDING
+ *           schedules task
+ *       scheduler runs sched_task
+ *           sched_task pushes serialized_send_notify
+ *       serialized_send_notify starts
+ *           checks state <= TERMINATE_PENDING
+ *           if state == TERMINATE_PENDING set state = TERMINATE_IN_PROGRESS
+ *           call send_notify
+ *               See (2) Above
  *
- * There is no guarantee that this function will be called from a serializer
- * thread since it can be called due to a transaction timeout. Therefore
- * synchronization primitives are necessary to ensure that no operations
- * step on each others' toes. The dialog lock is always held when this
- * callback is called, so we ensure that relevant structures that may
- * be touched in this function are always protected by the dialog lock
- * elsewhere as well. The dialog lock in particular protects
+ */
+
+/*!
+ * \brief PJSIP callback when underlying SIP subscription changes state
  *
- * \li The subscription tree's last_notify field
- * \li The subscription tree's evsub pointer
+ * Although this function is called for every state change, we only care
+ * about the TERMINATED state, and only when we're actually processing the final
+ * notify (SIP_SUB_TREE_TERMINATE_IN_PROGRESS) OR when a transmission failure
+ * occurs (PJSIP_EVENT_TSX_STATE).  In this case, we do all the subscription tree
+ * cleanup tasks and decrement the evsub reference.
  */
 static void pubsub_on_evsub_state(pjsip_evsub *evsub, pjsip_event *event)
 {
-       struct sip_subscription_tree *sub_tree;
+       struct sip_subscription_tree *sub_tree =
+               pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
 
-       ast_debug(3, "on_evsub_state called with state %s\n", pjsip_evsub_get_state_name(evsub));
+       ast_debug(3, "evsub %p state %s event %s sub_tree %p sub_tree state %s\n", evsub,
+               pjsip_evsub_get_state_name(evsub), pjsip_event_str(event->type), sub_tree,
+               (sub_tree ? sub_tree_state_description[sub_tree->state] : "UNKNOWN"));
 
-       if (pjsip_evsub_get_state(evsub) != PJSIP_EVSUB_STATE_TERMINATED) {
+       if (!sub_tree || pjsip_evsub_get_state(evsub) != PJSIP_EVSUB_STATE_TERMINATED) {
                return;
        }
 
-       sub_tree = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
-       if (!sub_tree) {
+       /* It's easier to write this as what we WANT to process, then negate it. */
+       if (!(sub_tree->state == SIP_SUB_TREE_TERMINATE_IN_PROGRESS
+               || (event->type == PJSIP_EVENT_TSX_STATE && sub_tree->state == SIP_SUB_TREE_NORMAL)
+               )) {
+               ast_debug(3, "Do nothing.\n");
                return;
        }
 
-       if (!sub_tree->last_notify) {
-               if (ast_sip_push_task(sub_tree->serializer, serialized_pubsub_on_server_timeout, ao2_bump(sub_tree))) {
-                       ast_log(LOG_ERROR, "Failed to push task to send final NOTIFY.\n");
-                       ao2_ref(sub_tree, -1);
-               } else {
-                       return;
-               }
+       if (sub_tree->expiration_task) {
+               char task_name[256];
+
+               ast_sip_sched_task_get_name(sub_tree->expiration_task, task_name, sizeof(task_name));
+               ast_debug(3, "Cancelling timer: %s\n", task_name);
+               ast_sip_sched_task_cancel(sub_tree->expiration_task);
+               ao2_cleanup(sub_tree->expiration_task);
+               sub_tree->expiration_task = NULL;
        }
 
+       remove_subscription(sub_tree);
+
        pjsip_evsub_set_mod_data(evsub, pubsub_module.id, NULL);
+
+#ifdef HAVE_PJSIP_EVSUB_GRP_LOCK
+       pjsip_evsub_dec_ref(sub_tree->evsub);
+#endif
+
        sub_tree->evsub = NULL;
+
+       ast_sip_dialog_set_serializer(sub_tree->dlg, NULL);
+       ast_sip_dialog_set_endpoint(sub_tree->dlg, NULL);
+
+       subscription_persistence_remove(sub_tree);
        shutdown_subscriptions(sub_tree->root);
+
+       sub_tree->state = SIP_SUB_TREE_TERMINATED;
        /* Remove evsub's reference to the sub_tree */
        ao2_ref(sub_tree, -1);
 }
 
-static int serialized_pubsub_on_rx_refresh(void *userdata)
+static int pubsub_on_refresh_timeout(void *userdata)
 {
        struct sip_subscription_tree *sub_tree = userdata;
+       pjsip_dialog *dlg = sub_tree->dlg;
 
-       pjsip_dlg_inc_lock(sub_tree->dlg);
-       if (!sub_tree->evsub) {
-               pjsip_dlg_dec_lock(sub_tree->dlg);
+       ast_debug(3, "sub_tree %p sub_tree state %s\n", sub_tree,
+               (sub_tree ? sub_tree_state_description[sub_tree->state] : "UNKNOWN"));
+
+       pjsip_dlg_inc_lock(dlg);
+       if (sub_tree->state >= SIP_SUB_TREE_TERMINATE_IN_PROGRESS) {
+               pjsip_dlg_dec_lock(dlg);
                return 0;
        }
 
-       if (pjsip_evsub_get_state(sub_tree->evsub) == PJSIP_EVSUB_STATE_TERMINATED) {
+       if (sub_tree->state == SIP_SUB_TREE_TERMINATE_PENDING) {
+               sub_tree->state = SIP_SUB_TREE_TERMINATE_IN_PROGRESS;
                set_state_terminated(sub_tree->root);
        }
 
        send_notify(sub_tree, 1);
 
        ast_test_suite_event_notify(sub_tree->root->subscription_state == PJSIP_EVSUB_STATE_TERMINATED ?
-                       "SUBSCRIPTION_TERMINATED" : "SUBSCRIPTION_REFRESHED",
-                       "Resource: %s", sub_tree->root->resource);
+                               "SUBSCRIPTION_TERMINATED" : "SUBSCRIPTION_REFRESHED",
+                               "Resource: %s", sub_tree->root->resource);
 
-       pjsip_dlg_dec_lock(sub_tree->dlg);
+       pjsip_dlg_dec_lock(dlg);
+
+       return 0;
+}
+
+static int serialized_pubsub_on_refresh_timeout(void *userdata)
+{
+       struct sip_subscription_tree *sub_tree = userdata;
+
+       ast_debug(3, "sub_tree %p sub_tree state %s\n", sub_tree,
+               (sub_tree ? sub_tree_state_description[sub_tree->state] : "UNKNOWN"));
+
+       pubsub_on_refresh_timeout(userdata);
        ao2_cleanup(sub_tree);
+
        return 0;
 }
 
@@ -3317,10 +3599,8 @@ static int serialized_pubsub_on_rx_refresh(void *userdata)
  * This includes both SUBSCRIBE requests that actually refresh the subscription
  * as well as SUBSCRIBE requests that end the subscription.
  *
- * In the case where the SUBSCRIBE is actually refreshing the subscription we
- * push a task to send an appropriate NOTIFY request. In the case where the
- * SUBSCRIBE is ending the subscription, we let the pubsub_on_evsub_state
- * callback take care of sending the terminal NOTIFY request instead.
+ * In either case we push serialized_pubsub_on_refresh_timeout to send an
+ * appropriate NOTIFY request.
  */
 static void pubsub_on_rx_refresh(pjsip_evsub *evsub, pjsip_rx_data *rdata,
                int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body)
@@ -3328,18 +3608,38 @@ static void pubsub_on_rx_refresh(pjsip_evsub *evsub, pjsip_rx_data *rdata,
        struct sip_subscription_tree *sub_tree;
 
        sub_tree = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
-       if (!sub_tree) {
+       ast_debug(3, "evsub %p sub_tree %p sub_tree state %s\n", evsub, sub_tree,
+               (sub_tree ? sub_tree_state_description[sub_tree->state] : "UNKNOWN"));
+
+       if (!sub_tree || sub_tree->state != SIP_SUB_TREE_NORMAL) {
                return;
        }
 
+       if (sub_tree->expiration_task) {
+               char task_name[256];
+
+               ast_sip_sched_task_get_name(sub_tree->expiration_task, task_name, sizeof(task_name));
+               ast_debug(3, "Cancelling timer: %s\n", task_name);
+               ast_sip_sched_task_cancel(sub_tree->expiration_task);
+               ao2_cleanup(sub_tree->expiration_task);
+               sub_tree->expiration_task = NULL;
+       }
+
        /* PJSIP will set the evsub's state to terminated before calling into this function
         * if the Expires value of the incoming SUBSCRIBE is 0.
         */
-       if (pjsip_evsub_get_state(sub_tree->evsub) != PJSIP_EVSUB_STATE_TERMINATED) {
-               if (ast_sip_push_task(sub_tree->serializer, serialized_pubsub_on_rx_refresh, ao2_bump(sub_tree))) {
-                       /* If we can't push the NOTIFY refreshing task...we'll just go with it. */
-                       ao2_ref(sub_tree, -1);
-               }
+
+       if (pjsip_evsub_get_state(sub_tree->evsub) == PJSIP_EVSUB_STATE_TERMINATED) {
+               sub_tree->state = SIP_SUB_TREE_TERMINATE_PENDING;
+       }
+
+       subscription_persistence_update(sub_tree, rdata, SUBSCRIPTION_PERSISTENCE_REFRESHED);
+
+       if (ast_sip_push_task(sub_tree->serializer, serialized_pubsub_on_refresh_timeout, ao2_bump(sub_tree))) {
+               /* If we can't push the NOTIFY refreshing task...we'll just go with it. */
+               ast_log(LOG_ERROR, "Failed to push task to send NOTIFY.\n");
+               sub_tree->state = SIP_SUB_TREE_NORMAL;
+               ao2_ref(sub_tree, -1);
        }
 
        if (sub_tree->is_list) {
@@ -3350,9 +3650,9 @@ static void pubsub_on_rx_refresh(pjsip_evsub *evsub, pjsip_rx_data *rdata,
 static void pubsub_on_rx_notify(pjsip_evsub *evsub, pjsip_rx_data *rdata, int *p_st_code,
                pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body)
 {
-       struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
+       struct ast_sip_subscription *sub;
 
-       if (!sub) {
+       if (!(sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id))) {
                return;
        }
 
@@ -3365,45 +3665,61 @@ static int serialized_pubsub_on_client_refresh(void *userdata)
        struct sip_subscription_tree *sub_tree = userdata;
        pjsip_tx_data *tdata;
 
+       if (!sub_tree->evsub) {
+               ao2_cleanup(sub_tree);
+               return 0;
+       }
+
        if (pjsip_evsub_initiate(sub_tree->evsub, NULL, -1, &tdata) == PJ_SUCCESS) {
                pjsip_evsub_send_request(sub_tree->evsub, tdata);
        } else {
                pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE);
        }
+
        ao2_cleanup(sub_tree);
        return 0;
 }
 
 static void pubsub_on_client_refresh(pjsip_evsub *evsub)
 {
-       struct sip_subscription_tree *sub_tree = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
+       struct sip_subscription_tree *sub_tree;
+
+       if (!(sub_tree = pjsip_evsub_get_mod_data(evsub, pubsub_module.id))) {
+               return;
+       }
 
-       ao2_ref(sub_tree, +1);
-       ast_sip_push_task(sub_tree->serializer, serialized_pubsub_on_client_refresh, sub_tree);
+       if (ast_sip_push_task(sub_tree->serializer, serialized_pubsub_on_client_refresh, ao2_bump(sub_tree))) {
+               ao2_cleanup(sub_tree);
+       }
 }
 
 static void pubsub_on_server_timeout(pjsip_evsub *evsub)
 {
+       struct sip_subscription_tree *sub_tree;
 
-       struct sip_subscription_tree *sub_tree = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
-       if (!sub_tree) {
-               /* PJSIP does not terminate the server timeout timer when a SUBSCRIBE
-                * with Expires: 0 arrives to end a subscription, nor does it terminate
-                * this timer when we send a NOTIFY request in response to receiving such
-                * a SUBSCRIBE. PJSIP does not stop the server timeout timer until the
-                * NOTIFY transaction has finished (either through receiving a response
-                * or through a transaction timeout).
-                *
-                * Therefore, it is possible that we can be told that a server timeout
-                * occurred after we already thought that the subscription had been
-                * terminated. In such a case, we will have already removed the sub_tree
-                * from the evsub's mod_data array.
-                */
+       /* PJSIP does not terminate the server timeout timer when a SUBSCRIBE
+        * with Expires: 0 arrives to end a subscription, nor does it terminate
+        * this timer when we send a NOTIFY request in response to receiving such
+        * a SUBSCRIBE. PJSIP does not stop the server timeout timer until the
+        * NOTIFY transaction has finished (either through receiving a response
+        * or through a transaction timeout).
+        *
+        * Therefore, it is possible that we can be told that a server timeout
+        * occurred after we already thought that the subscription had been
+        * terminated. In such a case, we will have already removed the sub_tree
+        * from the evsub's mod_data array.
+        */
+
+       sub_tree = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
+       if (!sub_tree || sub_tree->state != SIP_SUB_TREE_NORMAL) {
         return;
        }
 
-       ao2_ref(sub_tree, +1);
-       ast_sip_push_task(sub_tree->serializer, serialized_pubsub_on_server_timeout, sub_tree);
+       sub_tree->state = SIP_SUB_TREE_TERMINATE_PENDING;
+       if (ast_sip_push_task(sub_tree->serializer, serialized_pubsub_on_refresh_timeout, ao2_bump(sub_tree))) {
+               sub_tree->state = SIP_SUB_TREE_NORMAL;
+               ao2_cleanup(sub_tree);
+       }
 }
 
 static int ami_subscription_detail(struct sip_subscription_tree *sub_tree,
@@ -3420,6 +3736,8 @@ static int ami_subscription_detail(struct sip_subscription_tree *sub_tree,
        sip_subscription_to_ami(sub_tree, &buf);
        astman_append(ami->s, "%s\r\n", ast_str_buffer(buf));
        ast_free(buf);
+
+       ++ami->count;
        return 0;
 }
 
@@ -3438,14 +3756,13 @@ static int ami_subscription_detail_outbound(struct sip_subscription_tree *sub_tr
 static int ami_show_subscriptions_inbound(struct mansession *s, const struct message *m)
 {
        struct ast_sip_ami ami = { .s = s, .m = m, .action_id = astman_get_header(m, "ActionID"), };
-       int num;
 
        astman_send_listack(s, m, "Following are Events for each inbound Subscription",
                "start");
 
-       num = for_each_subscription(ami_subscription_detail_inbound, &ami);
+       for_each_subscription(ami_subscription_detail_inbound, &ami);
 
-       astman_send_list_complete_start(s, m, "InboundSubscriptionDetailComplete", num);
+       astman_send_list_complete_start(s, m, "InboundSubscriptionDetailComplete", ami.count);
        astman_send_list_complete_end(s);
        return 0;
 }
@@ -3453,14 +3770,13 @@ static int ami_show_subscriptions_inbound(struct mansession *s, const struct mes
 static int ami_show_subscriptions_outbound(struct mansession *s, const struct message *m)
 {
        struct ast_sip_ami ami = { .s = s, .m = m, .action_id = astman_get_header(m, "ActionID"), };
-       int num;
 
        astman_send_listack(s, m, "Following are Events for each outbound Subscription",
                "start");
 
-       num = for_each_subscription(ami_subscription_detail_outbound, &ami);
+       for_each_subscription(ami_subscription_detail_outbound, &ami);
 
-       astman_send_list_complete_start(s, m, "OutboundSubscriptionDetailComplete", num);
+       astman_send_list_complete_start(s, m, "OutboundSubscriptionDetailComplete", ami.count);
        astman_send_list_complete_end(s);
        return 0;
 }
@@ -3481,21 +3797,21 @@ static int format_ami_resource_lists(void *obj, void *arg, int flags)
                return CMP_STOP;
        }
        astman_append(ami->s, "%s\r\n", ast_str_buffer(buf));
-
        ast_free(buf);
+
+       ++ami->count;
        return 0;
 }
 
 static int ami_show_resource_lists(struct mansession *s, const struct message *m)
 {
        struct ast_sip_ami ami = { .s = s, .m = m, .action_id = astman_get_header(m, "ActionID"), };
-       int num;
        struct ao2_container *lists;
 
        lists = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(), "resource_list",
                        AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL);
 
-       if (!lists || !(num = ao2_container_count(lists))) {
+       if (!lists || !ao2_container_count(lists)) {
                astman_send_error(s, m, "No resource lists found\n");
                return 0;
        }
@@ -3505,7 +3821,7 @@ static int ami_show_resource_lists(struct mansession *s, const struct message *m
 
        ao2_callback(lists, OBJ_NODATA, format_ami_resource_lists, &ami);
 
-       astman_send_list_complete_start(s, m, "ResourceListDetailComplete", num);
+       astman_send_list_complete_start(s, m, "ResourceListDetailComplete", ami.count);
        astman_send_list_complete_end(s);
        return 0;
 }
@@ -3513,6 +3829,541 @@ static int ami_show_resource_lists(struct mansession *s, const struct message *m
 #define AMI_SHOW_SUBSCRIPTIONS_INBOUND "PJSIPShowSubscriptionsInbound"
 #define AMI_SHOW_SUBSCRIPTIONS_OUTBOUND "PJSIPShowSubscriptionsOutbound"
 
+#define MAX_REGEX_ERROR_LEN 128
+
+struct cli_sub_parms {
+       /*! CLI handler entry e parameter */
+       struct ast_cli_entry *e;
+       /*! CLI handler entry a parameter */
+       struct ast_cli_args *a;
+       /*! CLI subscription entry output line(s) */
+       struct ast_str *buf;
+       /*! Compiled regular expression to select if buf is written to CLI when not NULL. */
+       regex_t *like;
+       int count;
+};
+
+struct cli_sub_complete_parms {
+       struct ast_cli_args *a;
+       /*! Found callid for search position */
+       char *callid;
+       int wordlen;
+       int which;
+};
+
+static int cli_complete_subscription_common(struct sip_subscription_tree *sub_tree, struct cli_sub_complete_parms *cli)
+{
+       pj_str_t *callid;
+
+       if (!sub_tree->dlg) {
+               return 0;
+       }
+
+       callid = &sub_tree->dlg->call_id->id;
+       if (cli->wordlen <= pj_strlen(callid)
+               && !strncasecmp(cli->a->word, pj_strbuf(callid), cli->wordlen)
+               && (++cli->which > cli->a->n)) {
+               cli->callid = ast_malloc(pj_strlen(callid) + 1);
+               if (cli->callid) {
+                       ast_copy_pj_str(cli->callid, callid, pj_strlen(callid) + 1);
+               }
+               return -1;
+       }
+       return 0;
+}
+
+static int cli_complete_subscription_inbound(struct sip_subscription_tree *sub_tree, void *arg)
+{
+       return sub_tree->role == AST_SIP_NOTIFIER
+               ? cli_complete_subscription_common(sub_tree, arg) : 0;
+}
+
+static int cli_complete_subscription_outbound(struct sip_subscription_tree *sub_tree, void *arg)
+{
+       return sub_tree->role == AST_SIP_SUBSCRIBER
+               ? cli_complete_subscription_common(sub_tree, arg) : 0;
+}
+
+static char *cli_complete_subscription_callid(struct ast_cli_args *a)
+{
+       struct cli_sub_complete_parms cli;
+       on_subscription_t on_subscription;
+
+       if (a->pos != 4) {
+               return NULL;
+       }
+
+       if (!strcasecmp(a->argv[3], "inbound")) {
+               on_subscription = cli_complete_subscription_inbound;
+       } else if (!strcasecmp(a->argv[3], "outbound")) {
+               on_subscription = cli_complete_subscription_outbound;
+       } else {
+               /* Should never get here */
+               ast_assert(0);
+               return NULL;
+       }
+
+       cli.a = a;
+       cli.callid = NULL;
+       cli.wordlen = strlen(a->word);
+       cli.which = 0;
+       for_each_subscription(on_subscription, &cli);
+
+       return cli.callid;
+}
+
+static int cli_subscription_expiry(struct sip_subscription_tree *sub_tree)
+{
+       int expiry;
+
+       expiry = sub_tree->persistence
+               ? ast_tvdiff_ms(sub_tree->persistence->expires, ast_tvnow()) / 1000
+               : 0;
+       if (expiry < 0) {
+               /* Subscription expired */
+               expiry = 0;
+       }
+       return expiry;
+}
+
+static int cli_show_subscription_common(struct sip_subscription_tree *sub_tree, struct cli_sub_parms *cli)
+{
+       const char *callid = (const char *) cli->buf;/* Member repurposed to pass in callid */
+       pj_str_t *sub_callid;
+       struct ast_str *buf;
+       char *src;
+       char *dest;
+       char *key;
+       char *value;
+       char *value_end;
+       int key_len;
+       int key_filler_width;
+       int value_len;
+
+       if (!sub_tree->dlg) {
+               return 0;
+       }
+       sub_callid = &sub_tree->dlg->call_id->id;
+       if (pj_strcmp2(sub_callid, callid)) {
+               return 0;
+       }
+
+       buf = ast_str_create(512);
+       if (!buf) {
+               return -1;
+       }
+
+       ast_cli(cli->a->fd,
+               "%-20s: %s\n"
+               "===========================================================================\n",
+               "ParameterName", "ParameterValue");
+
+       ast_str_append(&buf, 0, "Resource: %s\n", sub_tree->root->resource);
+       ast_str_append(&buf, 0, "Event: %s\n", sub_tree->root->handler->event_name);
+       ast_str_append(&buf, 0, "Expiry: %d\n", cli_subscription_expiry(sub_tree));
+
+       sip_subscription_to_ami(sub_tree, &buf);
+
+       /* Convert AMI \r\n to \n line terminators. */
+       src = strchr(ast_str_buffer(buf), '\r');
+       if (src) {
+               dest = src;
+               ++src;
+               while (*src) {
+                       if (*src == '\r') {
+                               ++src;
+                               continue;
+                       }
+                       *dest++ = *src++;
+               }
+               *dest = '\0';
+               ast_str_update(buf);
+       }
+
+       /* Reformat AMI key value pairs to pretty columns */
+       key = ast_str_buffer(buf);
+       do {
+               value = strchr(key, ':');
+               if (!value) {
+                       break;
+               }
+               value_end = strchr(value, '\n');
+               if (!value_end) {
+                       break;
+               }
+
+               /* Calculate field lengths */
+               key_len = value - key;
+               key_filler_width = 20 - key_len;
+               if (key_filler_width < 0) {
+                       key_filler_width = 0;
+               }
+               value_len = value_end - value;
+
+               ast_cli(cli->a->fd, "%.*s%*s%.*s\n",
+                       key_len, key, key_filler_width, "",
+                       value_len, value);
+
+               key = value_end + 1;
+       } while (*key);
+       ast_cli(cli->a->fd, "\n");
+
+       ast_free(buf);
+
+       return -1;
+}
+
+static int cli_show_subscription_inbound(struct sip_subscription_tree *sub_tree, void *arg)
+{
+       return sub_tree->role == AST_SIP_NOTIFIER
+               ? cli_show_subscription_common(sub_tree, arg) : 0;
+}
+
+static int cli_show_subscription_outbound(struct sip_subscription_tree *sub_tree, void *arg)
+{
+       return sub_tree->role == AST_SIP_SUBSCRIBER
+               ? cli_show_subscription_common(sub_tree, arg) : 0;
+}
+
+static char *cli_show_subscription_inout(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
+{
+       on_subscription_t on_subscription;
+       struct cli_sub_parms cli;
+
+       switch (cmd) {
+       case CLI_INIT:
+               e->command = "pjsip show subscription {inbound|outbound}";
+               e->usage = "Usage:\n"
+                                  "   pjsip show subscription inbound <call-id>\n"
+                                  "   pjsip show subscription outbound <call-id>\n"
+                                  "      Show active subscription with the dialog call-id\n";
+               return NULL;
+       case CLI_GENERATE:
+               return cli_complete_subscription_callid(a);
+       }
+
+       if (a->argc != 5) {
+               return CLI_SHOWUSAGE;
+       }
+
+       if (!strcasecmp(a->argv[3], "inbound")) {
+               on_subscription = cli_show_subscription_inbound;
+       } else if (!strcasecmp(a->argv[3], "outbound")) {
+               on_subscription = cli_show_subscription_outbound;
+       } else {
+               /* Should never get here */
+               ast_assert(0);
+               return NULL;
+       }
+
+       /* Find the subscription with the specified call-id */
+       cli.a = a;
+       cli.e = e;
+       cli.buf = (void *) a->argv[4];/* Repurpose the buf member to pass in callid */
+       for_each_subscription(on_subscription, &cli);
+
+       return CLI_SUCCESS;
+}
+
+#define CLI_SHOW_SUB_FORMAT_HEADER \
+       "Endpoint: <Endpoint/Caller-ID.............................................>\n" \
+       "Resource: <Resource/Event.................................................>\n" \
+       "  Expiry: <Expiry>  <Call-id..............................................>\n" \
+       "===========================================================================\n\n"
+#define CLI_SHOW_SUB_FORMAT_ENTRY  \
+       "Endpoint: %s/%s\n" \
+       "Resource: %s/%s\n" \
+       "  Expiry: %8d  %s\n\n"
+
+static int cli_show_subscriptions_detail(struct sip_subscription_tree *sub_tree, struct cli_sub_parms *cli)
+{
+       char caller_id[256];
+       char callid[256];
+
+       ast_callerid_merge(caller_id, sizeof(caller_id),
+               S_COR(sub_tree->endpoint->id.self.name.valid,
+                       sub_tree->endpoint->id.self.name.str, NULL),
+               S_COR(sub_tree->endpoint->id.self.number.valid,
+                       sub_tree->endpoint->id.self.number.str, NULL),
+               "<none>");
+
+       /* Call-id */
+       if (sub_tree->dlg) {
+               ast_copy_pj_str(callid, &sub_tree->dlg->call_id->id, sizeof(callid));
+       } else {
+               ast_copy_string(callid, "<unknown>", sizeof(callid));
+       }
+
+       ast_str_set(&cli->buf, 0, CLI_SHOW_SUB_FORMAT_ENTRY,
+               ast_sorcery_object_get_id(sub_tree->endpoint), caller_id,
+               sub_tree->root->resource, sub_tree->root->handler->event_name,
+               cli_subscription_expiry(sub_tree), callid);
+
+       if (cli->like) {
+               if (regexec(cli->like, ast_str_buffer(cli->buf), 0, NULL, 0)) {
+                       /* Output line did not match the regex */
+                       return 0;
+               }
+       }
+
+       ast_cli(cli->a->fd, "%s", ast_str_buffer(cli->buf));
+       ++cli->count;
+
+       return 0;
+}
+
+static int cli_show_subscriptions_inbound(struct sip_subscription_tree *sub_tree, void *arg)
+{
+       return sub_tree->role == AST_SIP_NOTIFIER
+               ? cli_show_subscriptions_detail(sub_tree, arg) : 0;
+}
+
+static int cli_show_subscriptions_outbound(struct sip_subscription_tree *sub_tree, void *arg)
+{
+       return sub_tree->role == AST_SIP_SUBSCRIBER
+               ? cli_show_subscriptions_detail(sub_tree, arg) : 0;
+}
+
+static char *cli_show_subscriptions_inout(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
+{
+       on_subscription_t on_subscription;
+       struct cli_sub_parms cli;
+       regex_t like;
+       const char *regex;
+
+       switch (cmd) {
+       case CLI_INIT:
+               e->command = "pjsip show subscriptions {inbound|outbound} [like]";
+               e->usage = "Usage:\n"
+                                  "   pjsip show subscriptions inbound [like <regex>]\n"
+                                  "      Show active inbound subscriptions\n"
+                                  "   pjsip show subscriptions outbound [like <regex>]\n"
+                                  "      Show active outbound subscriptions\n"
+                                  "\n"
+                                  "   The regex selects a subscriptions output that matches.\n"
+                                  "   i.e.,  All output lines for a subscription are checked\n"
+                                  "   as a block by the regex.\n";
+               return NULL;
+       case CLI_GENERATE:
+               return NULL;
+       }
+
+       if (a->argc != 4 && a->argc != 6) {
+               return CLI_SHOWUSAGE;
+       }
+       if (!strcasecmp(a->argv[3], "inbound")) {
+               on_subscription = cli_show_subscriptions_inbound;
+       } else if (!strcasecmp(a->argv[3], "outbound")) {
+               on_subscription = cli_show_subscriptions_outbound;
+       } else {
+               /* Should never get here */
+               ast_assert(0);
+               return CLI_SHOWUSAGE;
+       }
+       if (a->argc == 6) {
+               int rc;
+
+               if (strcasecmp(a->argv[4], "like")) {
+                       return CLI_SHOWUSAGE;
+               }
+
+               /* Setup regular expression */
+               memset(&like, 0, sizeof(like));
+               cli.like = &like;
+               regex = a->argv[5];
+               rc = regcomp(cli.like, regex, REG_EXTENDED | REG_NOSUB);
+               if (rc) {
+                       char *regerr = ast_alloca(MAX_REGEX_ERROR_LEN);
+
+                       regerror(rc, cli.like, regerr, MAX_REGEX_ERROR_LEN);
+                       ast_cli(a->fd, "Regular expression '%s' failed to compile: %s\n",
+                               regex, regerr);
+                       return CLI_FAILURE;
+               }
+       } else {
+               cli.like = NULL;
+               regex = NULL;
+       }
+
+       cli.a = a;
+       cli.e = e;
+       cli.count = 0;
+       cli.buf = ast_str_create(256);
+       if (!cli.buf) {
+               if (cli.like) {
+                       regfree(cli.like);
+               }
+               return CLI_FAILURE;
+       }
+
+       ast_cli(a->fd, CLI_SHOW_SUB_FORMAT_HEADER);
+       for_each_subscription(on_subscription, &cli);
+       ast_cli(a->fd, "%d active subscriptions%s%s%s\n",
+               cli.count,
+               regex ? " matched \"" : "",
+               regex ?: "",
+               regex ? "\"" : "");
+
+       ast_free(cli.buf);
+       if (cli.like) {
+               regfree(cli.like);
+       }
+
+       return CLI_SUCCESS;
+}
+
+#define CLI_LIST_SUB_FORMAT_HEADER "%-30.30s %-30.30s %6.6s %s\n"
+#define CLI_LIST_SUB_FORMAT_ENTRY  "%-30.30s %-30.30s %6d %s\n"
+
+static int cli_list_subscriptions_detail(struct sip_subscription_tree *sub_tree, struct cli_sub_parms *cli)
+{
+       char ep_cid_buf[50];
+       char res_evt_buf[50];
+       char callid[256];
+
+       /* Endpoint/CID column */
+       snprintf(ep_cid_buf, sizeof(ep_cid_buf), "%s/%s",
+               ast_sorcery_object_get_id(sub_tree->endpoint),
+               S_COR(sub_tree->endpoint->id.self.name.valid, sub_tree->endpoint->id.self.name.str,
+                       S_COR(sub_tree->endpoint->id.self.number.valid,
+                               sub_tree->endpoint->id.self.number.str, "<none>")));
+
+       /* Resource/Event column */
+       snprintf(res_evt_buf, sizeof(res_evt_buf), "%s/%s",
+               sub_tree->root->resource,
+               sub_tree->root->handler->event_name);
+
+       /* Call-id column */
+       if (sub_tree->dlg) {
+               ast_copy_pj_str(callid, &sub_tree->dlg->call_id->id, sizeof(callid));
+       } else {
+               ast_copy_string(callid, "<unknown>", sizeof(callid));
+       }
+
+       ast_str_set(&cli->buf, 0, CLI_LIST_SUB_FORMAT_ENTRY,
+               ep_cid_buf,
+               res_evt_buf,
+               cli_subscription_expiry(sub_tree),
+               callid);
+
+       if (cli->like) {
+               if (regexec(cli->like, ast_str_buffer(cli->buf), 0, NULL, 0)) {
+                       /* Output line did not match the regex */
+                       return 0;
+               }
+       }
+
+       ast_cli(cli->a->fd, "%s", ast_str_buffer(cli->buf));
+       ++cli->count;
+
+       return 0;
+}
+
+static int cli_list_subscriptions_inbound(struct sip_subscription_tree *sub_tree, void *arg)
+{
+       return sub_tree->role == AST_SIP_NOTIFIER
+               ? cli_list_subscriptions_detail(sub_tree, arg) : 0;
+}
+
+static int cli_list_subscriptions_outbound(struct sip_subscription_tree *sub_tree, void *arg)
+{
+       return sub_tree->role == AST_SIP_SUBSCRIBER
+               ? cli_list_subscriptions_detail(sub_tree, arg) : 0;
+}
+
+static char *cli_list_subscriptions_inout(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
+{
+       on_subscription_t on_subscription;
+       struct cli_sub_parms cli;
+       regex_t like;
+       const char *regex;
+
+       switch (cmd) {
+       case CLI_INIT:
+               e->command = "pjsip list subscriptions {inbound|outbound} [like]";
+               e->usage = "Usage:\n"
+                                  "   pjsip list subscriptions inbound [like <regex>]\n"
+                                  "      List active inbound subscriptions\n"
+                                  "   pjsip list subscriptions outbound [like <regex>]\n"
+                                  "      List active outbound subscriptions\n"
+                                  "\n"
+                                  "   The regex selects output lines that match.\n";
+               return NULL;
+       case CLI_GENERATE:
+               return NULL;
+       }
+
+       if (a->argc != 4 && a->argc != 6) {
+               return CLI_SHOWUSAGE;
+       }
+       if (!strcasecmp(a->argv[3], "inbound")) {
+               on_subscription = cli_list_subscriptions_inbound;
+       } else if (!strcasecmp(a->argv[3], "outbound")) {
+               on_subscription = cli_list_subscriptions_outbound;
+       } else {
+               /* Should never get here */
+               ast_assert(0);
+               return CLI_SHOWUSAGE;
+       }
+       if (a->argc == 6) {
+               int rc;
+
+               if (strcasecmp(a->argv[4], "like")) {
+                       return CLI_SHOWUSAGE;
+               }
+
+               /* Setup regular expression */
+               memset(&like, 0, sizeof(like));
+               cli.like = &like;
+               regex = a->argv[5];
+               rc = regcomp(cli.like, regex, REG_EXTENDED | REG_NOSUB);
+               if (rc) {
+                       char *regerr = ast_alloca(MAX_REGEX_ERROR_LEN);
+
+                       regerror(rc, cli.like, regerr, MAX_REGEX_ERROR_LEN);
+                       ast_cli(a->fd, "Regular expression '%s' failed to compile: %s\n",
+                               regex, regerr);
+                       return CLI_FAILURE;
+               }
+       } else {
+               cli.like = NULL;
+               regex = NULL;
+       }
+
+       cli.a = a;
+       cli.e = e;
+       cli.count = 0;
+       cli.buf = ast_str_create(256);
+       if (!cli.buf) {
+               if (cli.like) {
+                       regfree(cli.like);
+               }
+               return CLI_FAILURE;
+       }
+
+       ast_cli(a->fd, CLI_LIST_SUB_FORMAT_HEADER,
+               "Endpoint/CLI", "Resource/Event", "Expiry", "Call-id");
+       for_each_subscription(on_subscription, &cli);
+       ast_cli(a->fd, "\n%d active subscriptions%s%s%s\n",
+               cli.count,
+               regex ? " matched \"" : "",
+               regex ?: "",
+               regex ? "\"" : "");
+
+       ast_free(cli.buf);
+       if (cli.like) {
+               regfree(cli.like);
+       }
+
+       return CLI_SUCCESS;
+}
+
+static struct ast_cli_entry cli_commands[] = {
+       AST_CLI_DEFINE(cli_list_subscriptions_inout, "List active inbound/outbound subscriptions"),
+       AST_CLI_DEFINE(cli_show_subscription_inout, "Show active subscription details"),
+       AST_CLI_DEFINE(cli_show_subscriptions_inout, "Show active inbound/outbound subscriptions"),
+};
+
 static int persistence_endpoint_str2struct(const struct aco_option *opt, struct ast_variable *var, void *obj)
 {
        struct subscription_persistence *persistence = obj;
@@ -3608,7 +4459,11 @@ static int list_item_handler(const struct aco_option *opt,
        char *items = ast_strdupa(var->value);
        char *item;
 
-       while ((item = strsep(&items, ","))) {
+       while ((item = ast_strip(strsep(&items, ",")))) {
+               if (ast_strlen_zero(item)) {
+                       continue;
+               }
+
                if (item_in_vector(list, item)) {
                        ast_log(LOG_WARNING, "Ignoring duplicated list item '%s'\n", item);
                        continue;
@@ -4370,13 +5225,13 @@ static int load_module(void)
 
        if (!(sched = ast_sched_context_create())) {
                ast_log(LOG_ERROR, "Could not create scheduler for publication expiration\n");
-               return AST_MODULE_LOAD_FAILURE;
+               return AST_MODULE_LOAD_DECLINE;
        }
 
        if (ast_sched_start_thread(sched)) {
                ast_log(LOG_ERROR, "Could not start scheduler thread for publication expiration\n");
                ast_sched_context_destroy(sched);
-               return AST_MODULE_LOAD_FAILURE;
+               return AST_MODULE_LOAD_DECLINE;
        }
 
        pjsip_endpt_add_capability(ast_sip_get_pjsip_endpoint(), NULL, PJSIP_H_ALLOW, NULL, 1, &str_PUBLISH);
@@ -4384,7 +5239,7 @@ static int load_module(void)
        if (ast_sip_register_service(&pubsub_module)) {
                ast_log(LOG_ERROR, "Could not register pubsub service\n");
                ast_sched_context_destroy(sched);
-               return AST_MODULE_LOAD_FAILURE;
+               return AST_MODULE_LOAD_DECLINE;
        }
 
        ast_sorcery_apply_config(sorcery, "res_pjsip_pubsub");
@@ -4394,7 +5249,7 @@ static int load_module(void)
                ast_log(LOG_ERROR, "Could not register subscription persistence object support\n");
                ast_sip_unregister_service(&pubsub_module);
                ast_sched_context_destroy(sched);
-               return AST_MODULE_LOAD_FAILURE;
+               return AST_MODULE_LOAD_DECLINE;
        }
        ast_sorcery_object_field_register(sorcery, "subscription_persistence", "packet", "", OPT_CHAR_ARRAY_T, 0,
                CHARFLDSET(struct subscription_persistence, packet));
@@ -4416,11 +5271,13 @@ static int load_module(void)
                persistence_tag_str2struct, persistence_tag_struct2str, NULL, 0, 0);
        ast_sorcery_object_field_register_custom(sorcery, "subscription_persistence", "expires", "",
                persistence_expires_str2struct, persistence_expires_struct2str, NULL, 0, 0);
+       ast_sorcery_object_field_register(sorcery, "subscription_persistence", "contact_uri", "", OPT_CHAR_ARRAY_T, 0,
+               CHARFLDSET(struct subscription_persistence, contact_uri));
 
        if (apply_list_configuration(sorcery)) {
                ast_sip_unregister_service(&pubsub_module);
                ast_sched_context_destroy(sched);
-               return AST_MODULE_LOAD_FAILURE;
+               return AST_MODULE_LOAD_DECLINE;
        }
 
        ast_sorcery_apply_default(sorcery, "inbound-publication", "config", "pjsip.conf,criteria=type=inbound-publication");
@@ -4429,7 +5286,7 @@ static int load_module(void)
                ast_log(LOG_ERROR, "Could not register subscription persistence object support\n");
                ast_sip_unregister_service(&pubsub_module);
                ast_sched_context_destroy(sched);
-               return AST_MODULE_LOAD_FAILURE;
+               return AST_MODULE_LOAD_DECLINE;
        }
        ast_sorcery_object_field_register(sorcery, "inbound-publication", "type", "", OPT_NOOP_T, 0, 0);
        ast_sorcery_object_field_register_custom(sorcery, "inbound-publication", "endpoint", "",
@@ -4450,6 +5307,8 @@ static int load_module(void)
        ast_manager_register_xml("PJSIPShowResourceLists", EVENT_FLAG_SYSTEM,
                        ami_show_resource_lists);
 
+       ast_cli_register_multiple(cli_commands, ARRAY_LEN(cli_commands));
+
        AST_TEST_REGISTER(resource_tree);
        AST_TEST_REGISTER(complex_resource_tree);
        AST_TEST_REGISTER(bad_resource);
@@ -4463,6 +5322,16 @@ static int load_module(void)
 
 static int unload_module(void)
 {
+       AST_TEST_UNREGISTER(resource_tree);
+       AST_TEST_UNREGISTER(complex_resource_tree);
+       AST_TEST_UNREGISTER(bad_resource);
+       AST_TEST_UNREGISTER(bad_branch);
+       AST_TEST_UNREGISTER(duplicate_resource);
+       AST_TEST_UNREGISTER(loop);
+       AST_TEST_UNREGISTER(bad_event);
+
+       ast_cli_unregister_multiple(cli_commands, ARRAY_LEN(cli_commands));
+
        ast_manager_unregister(AMI_SHOW_SUBSCRIPTIONS_OUTBOUND);
        ast_manager_unregister(AMI_SHOW_SUBSCRIPTIONS_INBOUND);
        ast_manager_unregister("PJSIPShowResourceLists");
@@ -4472,14 +5341,6 @@ static int unload_module(void)
                ast_sched_context_destroy(sched);
        }
 
-       AST_TEST_UNREGISTER(resource_tree);
-       AST_TEST_UNREGISTER(complex_resource_tree);
-       AST_TEST_UNREGISTER(bad_resource);
-       AST_TEST_UNREGISTER(bad_branch);
-       AST_TEST_UNREGISTER(duplicate_resource);
-       AST_TEST_UNREGISTER(loop);
-       AST_TEST_UNREGISTER(bad_event);
-
        return 0;
 }