res_fax.c: Add chan locked precondition comments.
[asterisk/asterisk.git] / res / res_pjsip_outbound_publish.c
index 856d84a..0aad5fc 100644 (file)
 #include <pjsip.h>
 #include <pjsip_simple.h>
 
+#include "asterisk/res_pjproject.h"
 #include "asterisk/res_pjsip.h"
 #include "asterisk/res_pjsip_outbound_publish.h"
 #include "asterisk/module.h"
 #include "asterisk/taskprocessor.h"
+#include "asterisk/threadpool.h"
 #include "asterisk/datastore.h"
 
 /*** DOCUMENTATION
                                <configOption name="max_auth_attempts" default="5">
                                        <synopsis>Maximum number of authentication attempts before stopping the publication.</synopsis>
                                </configOption>
+                               <configOption name="transport">
+                                       <synopsis>Transport used for outbound publish</synopsis>
+                                       <description>
+                                               <note><para>A <replaceable>transport</replaceable> configured in
+                                               <literal>pjsip.conf</literal>. As with other <literal>res_pjsip</literal> modules, this will use the first available transport of the appropriate type if unconfigured.</para></note>
+                                       </description>
+                               </configOption>
+                               <configOption name="multi_user" default="no">
+                                       <synopsis>Enable multi-user support</synopsis>
+                                       <description><para>When enabled the user portion of the server uri is replaced by a dynamically created user</para></description>
+                               </configOption>
                                <configOption name="type">
                                        <synopsis>Must be of type 'outbound-publish'.</synopsis>
                                </configOption>
        </configInfo>
  ***/
 
+static int pjsip_max_url_size = PJSIP_MAX_URL_SIZE;
+
 /*! \brief Queued outbound publish message */
 struct sip_outbound_publish_message {
        /*! \brief Optional body */
@@ -105,6 +120,39 @@ struct sip_outbound_publish_message {
        char body_contents[0];
 };
 
+/*
+ * A note about some of the object types used in this module:
+ *
+ * The reason we currently have 4 separate object types that relate to configuration,
+ * publishing, state, and client information is due to object lifetimes and order of
+ * destruction dependencies.
+ *
+ * Separation of concerns is a good thing and of course it makes sense to have a
+ * configuration object type as well as an object type wrapper around pjsip's publishing
+ * client class. There also may be run time state data that needs to be tracked, so
+ * again having something to handle that is prudent. However, it may be tempting to think
+ * "why not combine the state and client object types?" Especially seeing as how they have
+ * a one-to-one relationship. The answer is, it's possible, but it'd make the code a bit
+ * more awkward.
+ *
+ * Currently this module maintains a global container of current state objects. When this
+ * states container is replaced, or deleted, it un-references all contained objects. Any
+ * state with a reference left have probably been carried over from a reload/realtime fetch.
+ * States not carried over are destructed and the associated client (and all its publishers)
+ * get unpublished.
+ *
+ * This "unpublishing" goes through a careful process of unpublishing the client, all its
+ * publishers, and making sure all the appropriate references are removed in a sane order.
+ * This process is essentially kicked off with the destruction of the state. If the state
+ * and client objects were to be merged, where clients became the globally tracked object
+ * type, this "unpublishing" process would never start because of the multiple references
+ * held to the client object over it's lifetime. Meaning the global tracking container
+ * would remove its reference to the client object when done with it, but other sources
+ * would still be holding a reference to it (namely the datastore and publisher(s)).
+ *
+ * Thus at this time it is easier to keep them separate.
+ */
+
 /*! \brief Outbound publish information */
 struct ast_sip_outbound_publish {
        /*! \brief Sorcery object details */
@@ -117,6 +165,8 @@ struct ast_sip_outbound_publish {
                AST_STRING_FIELD(from_uri);
                /*! \brief URI for the To header */
                AST_STRING_FIELD(to_uri);
+               /*! \brief Explicit transport to use for publish */
+               AST_STRING_FIELD(transport);
                /*! \brief Outbound proxy to use */
                AST_STRING_FIELD(outbound_proxy);
                /*! \brief The event type to publish */
@@ -128,28 +178,49 @@ struct ast_sip_outbound_publish {
        unsigned int max_auth_attempts;
        /*! \brief Configured authentication credentials */
        struct ast_sip_auth_vector outbound_auths;
+       /*! \brief The publishing client is used for multiple users when true */
+       unsigned int multi_user;
 };
 
-/*! \brief Outbound publish client state information (persists for lifetime that publish should exist) */
-struct ast_sip_outbound_publish_client {
+struct sip_outbound_publisher {
+       /*! \brief The client object that 'owns' this client
+
+            \note any potential circular reference problems are accounted
+            for (see publisher alloc for more information)
+       */
+       struct ast_sip_outbound_publish_client *owner;
        /*! \brief Underlying publish client */
        pjsip_publishc *client;
+       /*! \brief The From URI for this specific publisher */
+       char *from_uri;
+       /*! \brief The To URI for this specific publisher */
+       char *to_uri;
        /*! \brief Timer entry for refreshing publish */
        pj_timer_entry timer;
-       /*! \brief Publisher datastores set up by handlers */
-       struct ao2_container *datastores;
        /*! \brief The number of auth attempts done */
        unsigned int auth_attempts;
        /*! \brief Queue of outgoing publish messages to send*/
        AST_LIST_HEAD_NOLOCK(, sip_outbound_publish_message) queue;
        /*! \brief The message currently being sent */
        struct sip_outbound_publish_message *sending;
-       /*! \brief Publish client has been fully started and event type informed */
-       unsigned int started;
        /*! \brief Publish client should be destroyed */
        unsigned int destroy;
+       /*! \brief Serializer for stuff and things */
+       struct ast_taskprocessor *serializer;
+       /*! \brief User, if any, associated with the publisher */
+       char user[0];
+};
+
+/*! \brief Outbound publish client state information (persists for lifetime of a publish) */
+struct ast_sip_outbound_publish_client {
        /*! \brief Outbound publish information */
        struct ast_sip_outbound_publish *publish;
+       /*! \brief Publisher datastores set up by handlers */
+       struct ao2_container *datastores;
+       /*! \brief Container of all the client publishing objects */
+       struct ao2_container *publishers;
+       /*! \brief Publishing has been fully started and event type informed */
+       unsigned int started;
 };
 
 /*! \brief Outbound publish state information (persists for lifetime of a publish) */
@@ -160,13 +231,25 @@ struct ast_sip_outbound_publish_state {
        char id[0];
 };
 
-/*! \brief Unloading data */
-struct unloading_data {
-       int is_unloading;
-       int count;
-       ast_mutex_t lock;
-       ast_cond_t cond;
-} unloading;
+/*!
+ * \brief Used for locking while loading/reloading
+ *
+ * Mutli-user configurations make it so publishers can be dynamically added and
+ * removed. Publishers should not be added or removed during a [re]load since
+ * it could cause the current_clients container to be out of sync. Thus the
+ * reason for this lock.
+ */
+AST_RWLOCK_DEFINE_STATIC(load_lock);
+
+#define DEFAULT_PUBLISHER_BUCKETS 119
+AO2_STRING_FIELD_HASH_FN(sip_outbound_publisher, user);
+AO2_STRING_FIELD_CMP_FN(sip_outbound_publisher, user);
+
+/*! Time needs to be long enough for a transaction to timeout if nothing replies. */
+#define MAX_UNLOAD_TIMEOUT_TIME                35      /* Seconds */
+
+/*! Shutdown group to monitor sip_outbound_registration_client_state serializers. */
+static struct ast_serializer_shutdown_group *shutdown_group;
 
 /*! \brief Default number of client state container buckets */
 #define DEFAULT_STATE_BUCKETS 31
@@ -227,6 +310,7 @@ static int outbound_publish_state_cmp(void *obj, void *arg, int flags)
 static struct ao2_container *get_publishes_and_update_state(void)
 {
        struct ao2_container *container;
+       SCOPED_WRLOCK(lock, &load_lock);
 
        container = ast_sorcery_retrieve_by_fields(
                ast_sip_get_sorcery(), "outbound-publish",
@@ -263,22 +347,22 @@ static struct ast_sip_event_publisher_handler *find_publisher_handler_for_event_
        return iter;
 }
 
-/*! \brief Helper function which cancels the refresh timer on a client */
-static void cancel_publish_refresh(struct ast_sip_outbound_publish_client *client)
+/*! \brief Helper function which cancels the refresh timer on a publisher */
+static void cancel_publish_refresh(struct sip_outbound_publisher *publisher)
 {
-       if (pj_timer_heap_cancel(pjsip_endpt_get_timer_heap(ast_sip_get_pjsip_endpoint()), &client->timer)) {
-               /* The timer was successfully cancelled, drop the refcount of the client */
-               ao2_ref(client, -1);
+       if (pj_timer_heap_cancel(pjsip_endpt_get_timer_heap(ast_sip_get_pjsip_endpoint()), &publisher->timer)) {
+               /* The timer was successfully cancelled, drop the refcount of the publisher */
+               ao2_ref(publisher, -1);
        }
 }
 
 /*! \brief Helper function which sets up the timer to send publication */
-static void schedule_publish_refresh(struct ast_sip_outbound_publish_client *client, int expiration)
+static void schedule_publish_refresh(struct sip_outbound_publisher *publisher, int expiration)
 {
-       struct ast_sip_outbound_publish *publish = ao2_bump(client->publish);
+       struct ast_sip_outbound_publish *publish = ao2_bump(publisher->owner->publish);
        pj_time_val delay = { .sec = 0, };
 
-       cancel_publish_refresh(client);
+       cancel_publish_refresh(publisher);
 
        if (expiration > 0) {
                delay.sec = expiration - PJSIP_PUBLISHC_DELAY_BEFORE_REFRESH;
@@ -290,55 +374,83 @@ static void schedule_publish_refresh(struct ast_sip_outbound_publish_client *cli
                delay.sec = PJSIP_PUBLISHC_DELAY_BEFORE_REFRESH;
        }
 
-       ao2_ref(client, +1);
-       if (pjsip_endpt_schedule_timer(ast_sip_get_pjsip_endpoint(), &client->timer, &delay) != PJ_SUCCESS) {
+       ao2_ref(publisher, +1);
+       if (pjsip_endpt_schedule_timer(ast_sip_get_pjsip_endpoint(), &publisher->timer, &delay) != PJ_SUCCESS) {
                ast_log(LOG_WARNING, "Failed to pass timed publish refresh to scheduler\n");
-               ao2_ref(client, -1);
+               ao2_ref(publisher, -1);
        }
        ao2_ref(publish, -1);
 }
 
+static int publisher_client_send(void *obj, void *arg, void *data, int flags);
+
 /*! \brief Publish client timer callback function */
 static void sip_outbound_publish_timer_cb(pj_timer_heap_t *timer_heap, struct pj_timer_entry *entry)
 {
-       struct ast_sip_outbound_publish_client *client = entry->user_data;
+       struct sip_outbound_publisher *publisher = entry->user_data;
 
-       ao2_lock(client);
-       if (AST_LIST_EMPTY(&client->queue)) {
+       ao2_lock(publisher);
+       if (AST_LIST_EMPTY(&publisher->queue)) {
+               int res;
                /* If there are no outstanding messages send an empty PUBLISH message so our publication doesn't expire */
-               ast_sip_publish_client_send(client, NULL);
+               publisher_client_send(publisher, NULL, &res, 0);
        }
-       ao2_unlock(client);
+       ao2_unlock(publisher);
 
-       ao2_ref(client, -1);
+       ao2_ref(publisher, -1);
 }
 
 /*! \brief Task for cancelling a refresh timer */
 static int cancel_refresh_timer_task(void *data)
 {
-       struct ast_sip_outbound_publish_client *client = data;
+       struct sip_outbound_publisher *publisher = data;
 
-       cancel_publish_refresh(client);
-       ao2_ref(client, -1);
+       cancel_publish_refresh(publisher);
+       ao2_ref(publisher, -1);
 
        return 0;
 }
 
+static void set_transport(struct sip_outbound_publisher *publisher, pjsip_tx_data *tdata)
+{
+       if (!ast_strlen_zero(publisher->owner->publish->transport)) {
+               pjsip_tpselector selector = { .type = PJSIP_TPSELECTOR_NONE, };
+               ast_sip_set_tpselector_from_transport_name(
+                       publisher->owner->publish->transport, &selector);
+               pjsip_tx_data_set_transport(tdata, &selector);
+       }
+}
+
 /*! \brief Task for sending an unpublish */
 static int send_unpublish_task(void *data)
 {
-       struct ast_sip_outbound_publish_client *client = data;
+       struct sip_outbound_publisher *publisher = data;
        pjsip_tx_data *tdata;
 
-       if (pjsip_publishc_unpublish(client->client, &tdata) == PJ_SUCCESS) {
-               pjsip_publishc_send(client->client, tdata);
+       if (pjsip_publishc_unpublish(publisher->client, &tdata) == PJ_SUCCESS) {
+               set_transport(publisher, tdata);
+               pjsip_publishc_send(publisher->client, tdata);
        }
 
-       ao2_ref(client, -1);
+       ao2_ref(publisher, -1);
 
        return 0;
 }
 
+static void stop_publishing(struct ast_sip_outbound_publish_client *client,
+                           struct ast_sip_event_publisher_handler *handler)
+{
+       if (!handler) {
+               handler = find_publisher_handler_for_event_name(client->publish->event);
+       }
+
+       if (handler) {
+               handler->stop_publishing(client);
+       }
+}
+
+static int cancel_and_unpublish(void *obj, void *arg, int flags);
+
 /*! \brief Helper function which starts or stops publish clients when applicable */
 static void sip_outbound_publish_synchronize(struct ast_sip_event_publisher_handler *removed)
 {
@@ -373,14 +485,9 @@ static void sip_outbound_publish_synchronize(struct ast_sip_event_publisher_hand
                                state->client->started = 1;
                        }
                } else if (state->client->started && !handler && removed && !strcmp(publish->event, removed->event_name)) {
-                       /* If the publisher client has been started but it is going away stop it */
-                       removed->stop_publishing(state->client);
+                       stop_publishing(state->client, removed);
+                       ao2_callback(state->client->publishers, OBJ_NODATA, cancel_and_unpublish, NULL);
                        state->client->started = 0;
-                       if (ast_sip_push_task(NULL, cancel_refresh_timer_task, ao2_bump(state->client))) {
-                               ast_log(LOG_WARNING, "Could not stop refresh timer on client '%s'\n",
-                                       ast_sorcery_object_get_id(publish));
-                               ao2_ref(state->client, -1);
-                       }
                }
                ao2_ref(publish, -1);
                ao2_ref(state, -1);
@@ -389,25 +496,106 @@ static void sip_outbound_publish_synchronize(struct ast_sip_event_publisher_hand
        ao2_ref(states, -1);
 }
 
-struct ast_sip_outbound_publish_client *ast_sip_publish_client_get(const char *name)
+static struct ast_sip_outbound_publish_state *sip_publish_state_get(const char *id)
 {
-       RAII_VAR(struct ao2_container *, states,
-                ao2_global_obj_ref(current_states), ao2_cleanup);
-       RAII_VAR(struct ast_sip_outbound_publish_state *, state, NULL, ao2_cleanup);
+       struct ao2_container *states = ao2_global_obj_ref(current_states);
+       struct ast_sip_outbound_publish_state *res;
 
        if (!states) {
                return NULL;
        }
 
-       state = ao2_find(states, name, OBJ_SEARCH_KEY);
+       res = ao2_find(states, id, OBJ_SEARCH_KEY);
+       ao2_ref(states, -1);
+       return res;
+}
+
+struct ast_sip_outbound_publish_client *ast_sip_publish_client_get(const char *name)
+{
+       struct ast_sip_outbound_publish_state *state = sip_publish_state_get(name);
+
        if (!state) {
                return NULL;
        }
 
        ao2_ref(state->client, +1);
+       ao2_ref(state, -1);
        return state->client;
 }
 
+const char *ast_sip_publish_client_get_from_uri(struct ast_sip_outbound_publish_client *client)
+{
+       struct ast_sip_outbound_publish *publish = client->publish;
+
+       return S_OR(publish->from_uri, S_OR(publish->server_uri, ""));
+}
+
+static struct sip_outbound_publisher *sip_outbound_publish_client_add_publisher(
+        struct ast_sip_outbound_publish_client *client, const char *user);
+
+static struct sip_outbound_publisher *sip_outbound_publish_client_get_publisher(
+       struct ast_sip_outbound_publish_client *client, const char *user)
+{
+       struct sip_outbound_publisher *publisher;
+
+       /*
+        * Lock before searching since there could be a race between searching and adding.
+        * Just use the load_lock since we might need to lock it anyway (if adding) and
+        * also it simplifies the code (otherwise we'd have to lock the publishers, no-
+        * lock the search and pass a flag to 'add publisher to no-lock the potential link).
+        */
+       ast_rwlock_wrlock(&load_lock);
+       publisher = ao2_find(client->publishers, user, OBJ_SEARCH_KEY);
+       if (!publisher) {
+               if (!(publisher = sip_outbound_publish_client_add_publisher(client, user))) {
+                       ast_rwlock_unlock(&load_lock);
+                       return NULL;
+               }
+       }
+       ast_rwlock_unlock(&load_lock);
+
+       return publisher;
+}
+
+const char *ast_sip_publish_client_get_user_from_uri(struct ast_sip_outbound_publish_client *client, const char *user,
+       char *uri, size_t size)
+{
+       struct sip_outbound_publisher *publisher;
+
+       publisher = sip_outbound_publish_client_get_publisher(client, user);
+       if (!publisher) {
+               return NULL;
+       }
+
+       ast_copy_string(uri, publisher->from_uri, size);
+       ao2_ref(publisher, -1);
+
+       return uri;
+}
+
+const char *ast_sip_publish_client_get_to_uri(struct ast_sip_outbound_publish_client *client)
+{
+       struct ast_sip_outbound_publish *publish = client->publish;
+
+       return S_OR(publish->to_uri, S_OR(publish->server_uri, ""));
+}
+
+const char *ast_sip_publish_client_get_user_to_uri(struct ast_sip_outbound_publish_client *client, const char *user,
+       char *uri, size_t size)
+{
+       struct sip_outbound_publisher *publisher;
+
+       publisher = sip_outbound_publish_client_get_publisher(client, user);
+       if (!publisher) {
+               return NULL;
+       }
+
+       ast_copy_string(uri, publisher->to_uri, size);
+       ao2_ref(publisher, -1);
+
+       return uri;
+}
+
 int ast_sip_register_event_publisher_handler(struct ast_sip_event_publisher_handler *handler)
 {
        struct ast_sip_event_publisher_handler *existing;
@@ -544,19 +732,19 @@ void ast_sip_publish_client_remove_datastore(struct ast_sip_outbound_publish_cli
        ao2_find(client->datastores, name, OBJ_SEARCH_KEY | OBJ_UNLINK | OBJ_NODATA);
 }
 
-static int sip_publish_client_service_queue(void *data)
+static int sip_publisher_service_queue(void *data)
 {
-       RAII_VAR(struct ast_sip_outbound_publish_client *, client, data, ao2_cleanup);
-       SCOPED_AO2LOCK(lock, client);
+       RAII_VAR(struct sip_outbound_publisher *, publisher, data, ao2_cleanup);
+       SCOPED_AO2LOCK(lock, publisher);
        struct sip_outbound_publish_message *message;
        pjsip_tx_data *tdata;
        pj_status_t status;
 
-       if (client->destroy || client->sending || !(message = AST_LIST_FIRST(&client->queue))) {
+       if (publisher->destroy || publisher->sending || !(message = AST_LIST_FIRST(&publisher->queue))) {
                return 0;
        }
 
-       if (pjsip_publishc_publish(client->client, PJ_FALSE, &tdata) != PJ_SUCCESS) {
+       if (pjsip_publishc_publish(publisher->client, PJ_FALSE, &tdata) != PJ_SUCCESS) {
                goto fatal;
        }
 
@@ -566,7 +754,9 @@ static int sip_publish_client_service_queue(void *data)
                goto fatal;
        }
 
-       status = pjsip_publishc_send(client->client, tdata);
+       set_transport(publisher, tdata);
+
+       status = pjsip_publishc_send(publisher->client, tdata);
        if (status == PJ_EBUSY) {
                /* We attempted to send the message but something else got there first */
                goto service;
@@ -574,30 +764,31 @@ static int sip_publish_client_service_queue(void *data)
                goto fatal;
        }
 
-       client->sending = message;
+       publisher->sending = message;
 
        return 0;
 
 fatal:
-       AST_LIST_REMOVE_HEAD(&client->queue, entry);
+       AST_LIST_REMOVE_HEAD(&publisher->queue, entry);
        ast_free(message);
 
 service:
-       if (ast_sip_push_task(NULL, sip_publish_client_service_queue, ao2_bump(client))) {
-               ao2_ref(client, -1);
+       if (ast_sip_push_task(publisher->serializer, sip_publisher_service_queue, ao2_bump(publisher))) {
+               ao2_ref(publisher, -1);
        }
        return -1;
 }
 
-int ast_sip_publish_client_send(struct ast_sip_outbound_publish_client *client,
-       const struct ast_sip_body *body)
+static int publisher_client_send(void *obj, void *arg, void *data, int flags)
 {
-       SCOPED_AO2LOCK(lock, client);
+       struct sip_outbound_publisher *publisher = obj;
+       const struct ast_sip_body *body = arg;
        struct sip_outbound_publish_message *message;
        size_t type_len = 0, subtype_len = 0, body_text_len = 0;
-       int res;
+       int *res = data;
 
-       if (!client->client) {
+       *res = -1;
+       if (!publisher->client) {
                return -1;
        }
 
@@ -623,96 +814,400 @@ int ast_sip_publish_client_send(struct ast_sip_outbound_publish_client *client,
                message->body.body_text = strcpy(dst, body->body_text);
        }
 
-       AST_LIST_INSERT_TAIL(&client->queue, message, entry);
+       AST_LIST_INSERT_TAIL(&publisher->queue, message, entry);
 
-       res = ast_sip_push_task(NULL, sip_publish_client_service_queue, ao2_bump(client));
-       if (res) {
-               ao2_ref(client, -1);
+       *res = ast_sip_push_task(publisher->serializer, sip_publisher_service_queue, ao2_bump(publisher));
+       if (*res) {
+               ao2_ref(publisher, -1);
        }
 
+       return *res;
+}
+
+int ast_sip_publish_client_send(struct ast_sip_outbound_publish_client *client,
+       const struct ast_sip_body *body)
+{
+       SCOPED_AO2LOCK(lock, client);
+       int res = 0;
+
+       ao2_callback_data(client->publishers, OBJ_NODATA,
+                         publisher_client_send, (void *)body, &res);
        return res;
 }
 
+static int sip_outbound_publisher_set_uri(
+       pj_pool_t *pool, const char *uri, const char *user, pj_str_t *res_uri)
+{
+       pj_str_t tmp;
+       pjsip_uri *parsed;
+       pjsip_sip_uri *parsed_uri;
+       int size;
+
+       pj_strdup2_with_null(pool, &tmp, uri);
+       if (!(parsed = pjsip_parse_uri(pool, tmp.ptr, tmp.slen, 0))) {
+               return -1;
+       }
+
+       if (!(parsed_uri = pjsip_uri_get_uri(parsed))) {
+               return -1;
+       }
+
+       if (!ast_strlen_zero(user)) {
+               pj_strdup2(pool, &parsed_uri->user, user);
+       }
+
+       res_uri->ptr = (char*) pj_pool_alloc(pool, pjsip_max_url_size);
+       if (!res_uri->ptr) {
+               return -1;
+       }
+
+       if ((size = pjsip_uri_print(PJSIP_URI_IN_OTHER, parsed_uri, res_uri->ptr,
+                                   pjsip_max_url_size - 1)) <= 0) {
+               return -1;
+       }
+       res_uri->ptr[size] = '\0';
+       res_uri->slen = size;
+
+       return 0;
+}
+
+static int sip_outbound_publisher_set_uris(
+       pj_pool_t *pool, struct sip_outbound_publisher *publisher,
+       pj_str_t *server_uri, pj_str_t *to_uri, pj_str_t *from_uri)
+{
+       struct ast_sip_outbound_publish *publish = publisher->owner->publish;
+
+       if (sip_outbound_publisher_set_uri(pool, publish->server_uri, publisher->user, server_uri)) {
+               ast_log(LOG_ERROR, "Invalid server URI '%s' specified on outbound publish '%s'\n",
+                       publish->server_uri, ast_sorcery_object_get_id(publish));
+               return -1;
+       }
+
+       if (ast_strlen_zero(publish->to_uri)) {
+               to_uri->ptr = server_uri->ptr;
+               to_uri->slen = server_uri->slen;
+       } else if (sip_outbound_publisher_set_uri(pool, publish->to_uri, publisher->user, to_uri)) {
+               ast_log(LOG_ERROR, "Invalid to URI '%s' specified on outbound publish '%s'\n",
+                       publish->to_uri, ast_sorcery_object_get_id(publish));
+               return -1;
+       }
+
+       publisher->to_uri = ast_strdup(to_uri->ptr);
+       if (!publisher->to_uri) {
+               return -1;
+       }
+
+       if (ast_strlen_zero(publish->from_uri)) {
+               from_uri->ptr = server_uri->ptr;
+               from_uri->slen = server_uri->slen;
+       } else if (sip_outbound_publisher_set_uri(pool, publish->from_uri, publisher->user, from_uri)) {
+               ast_log(LOG_ERROR, "Invalid from URI '%s' specified on outbound publish '%s'\n",
+                       publish->from_uri, ast_sorcery_object_get_id(publish));
+               return -1;
+       }
+
+       publisher->from_uri = ast_strdup(from_uri->ptr);
+       if (!publisher->from_uri) {
+               return -1;
+       }
+
+       return 0;
+}
+
+static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param);
+
+/*! \brief Helper function that allocates a pjsip publish client and configures it */
+static int sip_outbound_publisher_init(void *data)
+{
+       struct sip_outbound_publisher *publisher = data;
+       RAII_VAR(struct ast_sip_outbound_publish *, publish, NULL, ao2_cleanup);
+       pjsip_publishc_opt opt = {
+               .queue_request = PJ_FALSE,
+       };
+       pj_pool_t *pool;
+       pj_str_t event, server_uri, to_uri, from_uri;
+
+       if (publisher->client) {
+               return 0;
+       }
+
+       if (pjsip_publishc_create(ast_sip_get_pjsip_endpoint(), &opt,
+                                 ao2_bump(publisher), sip_outbound_publish_callback,
+               &publisher->client) != PJ_SUCCESS) {
+               ao2_ref(publisher, -1);
+               return -1;
+       }
+
+       publish = ao2_bump(publisher->owner->publish);
+
+       if (!ast_strlen_zero(publish->outbound_proxy)) {
+               pjsip_route_hdr route_set, *route;
+               static const pj_str_t ROUTE_HNAME = { "Route", 5 };
+
+               pj_list_init(&route_set);
+
+               if (!(route = pjsip_parse_hdr(pjsip_publishc_get_pool(publisher->client), &ROUTE_HNAME,
+                       (char*)publish->outbound_proxy, strlen(publish->outbound_proxy), NULL))) {
+                       pjsip_publishc_destroy(publisher->client);
+                       return -1;
+               }
+               pj_list_insert_nodes_before(&route_set, route);
+
+               pjsip_publishc_set_route_set(publisher->client, &route_set);
+       }
+
+       pool = pjsip_endpt_create_pool(ast_sip_get_pjsip_endpoint(), "URI Validation",
+                                      pjsip_max_url_size, pjsip_max_url_size);
+       if (!pool) {
+               ast_log(LOG_ERROR, "Could not create pool for URI validation on outbound publish '%s'\n",
+                       ast_sorcery_object_get_id(publish));
+               pjsip_publishc_destroy(publisher->client);
+               return -1;
+       }
+
+       if (sip_outbound_publisher_set_uris(pool, publisher, &server_uri, &from_uri, &to_uri)) {
+               pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
+               pjsip_publishc_destroy(publisher->client);
+               return -1;
+       }
+
+       pj_cstr(&event, publish->event);
+       if (pjsip_publishc_init(publisher->client, &event, &server_uri, &from_uri, &to_uri,
+                               publish->expiration != PJ_SUCCESS)) {
+               ast_log(LOG_ERROR, "Failed to initialize publishing client on outbound publish '%s'\n",
+                       ast_sorcery_object_get_id(publish));
+               pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
+               pjsip_publishc_destroy(publisher->client);
+               return -1;
+       }
+
+       pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
+       return 0;
+}
+
+static int sip_outbound_publisher_reinit(void *obj, void *arg, int flags)
+{
+       return sip_outbound_publisher_init(obj);
+}
+
+static int sip_outbound_publisher_reinit_all(void *data)
+{
+       ao2_callback(data, OBJ_NODATA, sip_outbound_publisher_reinit, NULL);
+       return 0;
+}
+
 /*! \brief Destructor function for publish client */
-static void sip_outbound_publish_client_destroy(void *obj)
+static void sip_outbound_publisher_destroy(void *obj)
 {
-       struct ast_sip_outbound_publish_client *client = obj;
+       struct sip_outbound_publisher *publisher = obj;
        struct sip_outbound_publish_message *message;
 
        /* You might be tempted to think "the publish client isn't being destroyed" but it actually is - just elsewhere */
 
-       while ((message = AST_LIST_REMOVE_HEAD(&client->queue, entry))) {
+       while ((message = AST_LIST_REMOVE_HEAD(&publisher->queue, entry))) {
                ast_free(message);
        }
 
-       ao2_cleanup(client->datastores);
-       ao2_cleanup(client->publish);
+       ao2_cleanup(publisher->owner);
+       ast_free(publisher->from_uri);
+       ast_free(publisher->to_uri);
+
+       ast_taskprocessor_unreference(publisher->serializer);
+}
+
+static struct sip_outbound_publisher *sip_outbound_publisher_alloc(
+       struct ast_sip_outbound_publish_client *client, const char *user)
+{
+       struct sip_outbound_publisher *publisher;
+       char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
+
+       publisher = ao2_alloc(sizeof(*publisher) + (user ? strlen(user) : 0) + 1,
+                             sip_outbound_publisher_destroy);
+       if (!publisher) {
+               return NULL;
+       }
+
+       /*
+        * Bump the ref to the client. This essentially creates a circular reference,
+        * but it is needed in order to make sure the client object doesn't get pulled
+        * out from under us when the publisher stops publishing.
+        *
+        * The circular reference is alleviated by calling cancel_and_unpublish for
+        * each client, from the state's destructor. By calling it there all references
+        * to the publishers should go to zero, thus calling the publisher's destructor.
+        * This in turn removes the client reference we added here. The state then removes
+        * its reference to the client, which should take it to zero.
+        */
+       publisher->owner = ao2_bump(client);
+       publisher->timer.user_data = publisher;
+       publisher->timer.cb = sip_outbound_publish_timer_cb;
+       if (user) {
+               strcpy(publisher->user, user);
+       } else {
+               *publisher->user = '\0';
+       }
+
+       ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/outpub/%s",
+               ast_sorcery_object_get_id(client->publish));
+
+       publisher->serializer = ast_sip_create_serializer_group(tps_name,
+               shutdown_group);
+       if (!publisher->serializer) {
+               ao2_ref(publisher, -1);
+               return NULL;
+       }
+
+       if (ast_sip_push_task_synchronous(NULL, sip_outbound_publisher_init, publisher)) {
+               ast_log(LOG_ERROR, "Unable to create publisher for outbound publish '%s'\n",
+                       ast_sorcery_object_get_id(client->publish));
+               ao2_ref(publisher, -1);
+               return NULL;
+       }
+
+       return publisher;
+}
 
-       /* if unloading the module and all objects have been unpublished
-          send the signal to finish unloading */
-       if (unloading.is_unloading) {
-               ast_mutex_lock(&unloading.lock);
-               if (--unloading.count == 0) {
-                       ast_cond_signal(&unloading.cond);
+static struct sip_outbound_publisher *sip_outbound_publish_client_add_publisher(
+       struct ast_sip_outbound_publish_client *client, const char *user)
+{
+       struct sip_outbound_publisher *publisher =
+               sip_outbound_publisher_alloc(client, user);
+
+       if (!publisher) {
+               return NULL;
+       }
+
+       if (!ao2_link(client->publishers, publisher)) {
+               /*
+                * No need to bump the reference here. The task will take care of
+                * removing the reference.
+                */
+               if (ast_sip_push_task(publisher->serializer, cancel_refresh_timer_task, publisher)) {
+                       ao2_ref(publisher, -1);
                }
-               ast_mutex_unlock(&unloading.lock);
+               return NULL;
        }
+
+       return publisher;
+}
+
+int ast_sip_publish_client_user_send(struct ast_sip_outbound_publish_client *client,
+                                    const char *user, const struct ast_sip_body *body)
+{
+       struct sip_outbound_publisher *publisher;
+       int res;
+
+       publisher = sip_outbound_publish_client_get_publisher(client, user);
+       if (!publisher) {
+               return -1;
+       }
+
+       publisher_client_send(publisher, (void *)body, &res, 0);
+       ao2_ref(publisher, -1);
+       return res;
+}
+
+void ast_sip_publish_client_remove(struct ast_sip_outbound_publish_client *client,
+                                  const char *user)
+{
+       SCOPED_WRLOCK(lock, &load_lock);
+       ao2_find(client->publishers, user, OBJ_SEARCH_KEY | OBJ_UNLINK | OBJ_NODATA);
 }
 
 static int explicit_publish_destroy(void *data)
 {
-       struct ast_sip_outbound_publish_client *client = data;
+       struct sip_outbound_publisher *publisher = data;
 
-       pjsip_publishc_destroy(client->client);
-       ao2_ref(client, -1);
+       /*
+        * If there is no pjsip publishing client then we obviously don't need
+        * to destroy it. Also, the ref for the Asterisk publishing client that
+        * pjsip had would not exist or should already be gone as well.
+        */
+       if (publisher->client) {
+               pjsip_publishc_destroy(publisher->client);
+               ao2_ref(publisher, -1);
+       }
+
+       ao2_ref(publisher, -1);
 
        return 0;
 }
 
 /*! \brief Helper function which cancels and un-publishes a no longer used client */
-static int cancel_and_unpublish(struct ast_sip_outbound_publish_client *client)
+static int cancel_and_unpublish(void *obj, void *arg, int flags)
 {
-       struct ast_sip_event_publisher_handler *handler;
-       SCOPED_AO2LOCK(lock, client);
+       struct sip_outbound_publisher *publisher = obj;
+       struct ast_sip_outbound_publish_client *client = publisher->owner;
+
+       SCOPED_AO2LOCK(lock, publisher);
 
        if (!client->started) {
-               /* If the client was never started, there's nothing to unpublish, so just
-                * destroy the publication and remove its reference to the client.
+               /* If the publisher was never started, there's nothing to unpublish, so just
+                * destroy the publication and remove its reference to the publisher.
                 */
-               ast_sip_push_task(NULL, explicit_publish_destroy, client);
+               if (ast_sip_push_task(publisher->serializer, explicit_publish_destroy, ao2_bump(publisher))) {
+                       ao2_ref(publisher, -1);
+               }
                return 0;
        }
 
-       handler = find_publisher_handler_for_event_name(client->publish->event);
-       if (handler) {
-               handler->stop_publishing(client);
-       }
-
-       client->started = 0;
-       if (ast_sip_push_task(NULL, cancel_refresh_timer_task, ao2_bump(client))) {
+       if (ast_sip_push_task(publisher->serializer, cancel_refresh_timer_task, ao2_bump(publisher))) {
                ast_log(LOG_WARNING, "Could not stop refresh timer on outbound publish '%s'\n",
                        ast_sorcery_object_get_id(client->publish));
-               ao2_ref(client, -1);
+               ao2_ref(publisher, -1);
        }
 
        /* If nothing is being sent right now send the unpublish - the destroy will happen in the subsequent callback */
-       if (!client->sending) {
-               if (ast_sip_push_task(NULL, send_unpublish_task, ao2_bump(client))) {
+       if (!publisher->sending) {
+               if (ast_sip_push_task(publisher->serializer, send_unpublish_task, ao2_bump(publisher))) {
                        ast_log(LOG_WARNING, "Could not send unpublish message on outbound publish '%s'\n",
                                ast_sorcery_object_get_id(client->publish));
-                       ao2_ref(client, -1);
+                       ao2_ref(publisher, -1);
                }
        }
-       client->destroy = 1;
+       publisher->destroy = 1;
        return 0;
 }
 
+/*! \brief Destructor function for publish client */
+static void sip_outbound_publish_client_destroy(void *obj)
+{
+       struct ast_sip_outbound_publish_client *client = obj;
+
+       ao2_cleanup(client->datastores);
+
+       /*
+        * The client's publishers have already been unpublished and destroyed
+        * by this point, so it is safe to finally remove the reference to the
+        * publish object. The client needed to hold a reference to it until
+        * the publishers were done with it.
+        */
+       ao2_cleanup(client->publish);
+}
+
 /*! \brief Destructor function for publish state */
 static void sip_outbound_publish_state_destroy(void *obj)
 {
        struct ast_sip_outbound_publish_state *state = obj;
 
-       cancel_and_unpublish(state->client);
+       stop_publishing(state->client, NULL);
+       /*
+        * Since the state is being destroyed the associated client needs to also
+        * be destroyed. However simply removing the reference to the client will
+        * not initiate client destruction since the client's publisher(s) hold a
+        * reference to the client object as well. So we need to unpublish the
+        * the client's publishers here, which will remove the publisher's client
+        * reference during that process.
+        *
+        * That being said we don't want to remove the client's reference to the
+        * publish object just yet. We'll hold off on that until client destruction
+        * itself. This is because the publishers need access to the client's
+        * publish object while they are unpublishing.
+        */
+       ao2_callback(state->client->publishers, OBJ_NODATA | OBJ_UNLINK, cancel_and_unpublish, NULL);
+       ao2_cleanup(state->client->publishers);
+
+       state->client->started = 0;
        ao2_cleanup(state->client);
 }
 
@@ -746,123 +1241,32 @@ static int can_reuse_publish(struct ast_sip_outbound_publish *existing, struct a
        return 1;
 }
 
-static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param);
-
-/*! \brief Helper function that allocates a pjsip publish client and configures it */
-static int sip_outbound_publish_client_alloc(void *data)
-{
-       struct ast_sip_outbound_publish_client *client = data;
-       RAII_VAR(struct ast_sip_outbound_publish *, publish, NULL, ao2_cleanup);
-       pjsip_publishc_opt opt = {
-               .queue_request = PJ_FALSE,
-       };
-       pj_str_t event, server_uri, to_uri, from_uri;
-       pj_status_t status;
-
-       if (client->client) {
-               return 0;
-       } else if (pjsip_publishc_create(ast_sip_get_pjsip_endpoint(), &opt, ao2_bump(client), sip_outbound_publish_callback,
-               &client->client) != PJ_SUCCESS) {
-               ao2_ref(client, -1);
-               return -1;
-       }
-
-       publish = ao2_bump(client->publish);
-
-       if (!ast_strlen_zero(publish->outbound_proxy)) {
-               pjsip_route_hdr route_set, *route;
-               static const pj_str_t ROUTE_HNAME = { "Route", 5 };
-
-               pj_list_init(&route_set);
-
-               if (!(route = pjsip_parse_hdr(pjsip_publishc_get_pool(client->client), &ROUTE_HNAME,
-                       (char*)publish->outbound_proxy, strlen(publish->outbound_proxy), NULL))) {
-                       pjsip_publishc_destroy(client->client);
-                       return -1;
-               }
-               pj_list_insert_nodes_before(&route_set, route);
-
-               pjsip_publishc_set_route_set(client->client, &route_set);
-       }
-
-       pj_cstr(&event, publish->event);
-       pj_cstr(&server_uri, publish->server_uri);
-       pj_cstr(&to_uri, S_OR(publish->to_uri, publish->server_uri));
-       pj_cstr(&from_uri, S_OR(publish->from_uri, publish->server_uri));
-
-       status = pjsip_publishc_init(client->client, &event, &server_uri, &from_uri, &to_uri,
-               publish->expiration);
-       if (status == PJSIP_EINVALIDURI) {
-               pj_pool_t *pool;
-               pj_str_t tmp;
-               pjsip_uri *uri;
-
-               pool = pjsip_endpt_create_pool(ast_sip_get_pjsip_endpoint(), "URI Validation", 256, 256);
-               if (!pool) {
-                       ast_log(LOG_ERROR, "Could not create pool for URI validation on outbound publish '%s'\n",
-                               ast_sorcery_object_get_id(publish));
-                       pjsip_publishc_destroy(client->client);
-                       return -1;
-               }
-
-               pj_strdup2_with_null(pool, &tmp, publish->server_uri);
-               uri = pjsip_parse_uri(pool, tmp.ptr, tmp.slen, 0);
-               if (!uri) {
-                       ast_log(LOG_ERROR, "Invalid server URI '%s' specified on outbound publish '%s'\n",
-                               publish->server_uri, ast_sorcery_object_get_id(publish));
-               }
-
-               if (!ast_strlen_zero(publish->to_uri)) {
-                       pj_strdup2_with_null(pool, &tmp, publish->to_uri);
-                       uri = pjsip_parse_uri(pool, tmp.ptr, tmp.slen, 0);
-                       if (!uri) {
-                               ast_log(LOG_ERROR, "Invalid to URI '%s' specified on outbound publish '%s'\n",
-                                       publish->to_uri, ast_sorcery_object_get_id(publish));
-                       }
-               }
-
-               if (!ast_strlen_zero(publish->from_uri)) {
-                       pj_strdup2_with_null(pool, &tmp, publish->from_uri);
-                       uri = pjsip_parse_uri(pool, tmp.ptr, tmp.slen, 0);
-                       if (!uri) {
-                               ast_log(LOG_ERROR, "Invalid from URI '%s' specified on outbound publish '%s'\n",
-                                       publish->from_uri, ast_sorcery_object_get_id(publish));
-                       }
-               }
-
-               pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
-               pjsip_publishc_destroy(client->client);
-               return -1;
-       } else if (status != PJ_SUCCESS) {
-               pjsip_publishc_destroy(client->client);
-               return -1;
-       }
-
-       return 0;
-}
-
 /*! \brief Callback function for publish client responses */
 static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param)
 {
-       RAII_VAR(struct ast_sip_outbound_publish_client *, client, ao2_bump(param->token), ao2_cleanup);
-       RAII_VAR(struct ast_sip_outbound_publish *, publish, ao2_bump(client->publish), ao2_cleanup);
-       SCOPED_AO2LOCK(lock, client);
+#define DESTROY_CLIENT() do {                     \
+       pjsip_publishc_destroy(publisher->client); \
+       publisher->client = NULL; \
+       ao2_ref(publisher, -1); } while (0)
+
+       RAII_VAR(struct sip_outbound_publisher *, publisher, ao2_bump(param->token), ao2_cleanup);
+       RAII_VAR(struct ast_sip_outbound_publish *, publish, ao2_bump(publisher->owner->publish), ao2_cleanup);
+       SCOPED_AO2LOCK(lock, publisher);
        pjsip_tx_data *tdata;
 
-       if (client->destroy) {
-               if (client->sending) {
-                       client->sending = NULL;
+       if (publisher->destroy) {
+               if (publisher->sending) {
+                       publisher->sending = NULL;
 
-                       if (!ast_sip_push_task(NULL, send_unpublish_task, ao2_bump(client))) {
+                       if (!ast_sip_push_task(publisher->serializer, send_unpublish_task, ao2_bump(publisher))) {
                                return;
                        }
                        ast_log(LOG_WARNING, "Could not send unpublish message on outbound publish '%s'\n",
                                ast_sorcery_object_get_id(publish));
-                       ao2_ref(client, -1);
+                       ao2_ref(publisher, -1);
                }
-               /* Once the destroy is called this callback will not get called any longer, so drop the client ref */
-               pjsip_publishc_destroy(client->client);
-               ao2_ref(client, -1);
+               /* Once the destroy is called this callback will not get called any longer, so drop the publisher ref */
+               DESTROY_CLIENT();
                return;
        }
 
@@ -871,14 +1275,13 @@ static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param)
 
                if (!ast_sip_create_request_with_auth(&publish->outbound_auths,
                                param->rdata, tsx->last_tx, &tdata)) {
-                       pjsip_publishc_send(client->client, tdata);
+                       set_transport(publisher, tdata);
+                       pjsip_publishc_send(publisher->client, tdata);
                }
-               client->auth_attempts++;
-
-               if (client->auth_attempts == publish->max_auth_attempts) {
-                       pjsip_publishc_destroy(client->client);
-                       client->client = NULL;
+               publisher->auth_attempts++;
 
+               if (publisher->auth_attempts == publish->max_auth_attempts) {
+                       DESTROY_CLIENT();
                        ast_log(LOG_ERROR, "Reached maximum number of PUBLISH authentication attempts on outbound publish '%s'\n",
                                ast_sorcery_object_get_id(publish));
 
@@ -887,60 +1290,57 @@ static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param)
                return;
        }
 
-       client->auth_attempts = 0;
+       publisher->auth_attempts = 0;
 
        if (param->code == 412) {
-               pjsip_publishc_destroy(client->client);
-               client->client = NULL;
-
-               if (sip_outbound_publish_client_alloc(client)) {
+               DESTROY_CLIENT();
+               if (sip_outbound_publisher_init(publisher)) {
                        ast_log(LOG_ERROR, "Failed to create a new outbound publish client for '%s' on 412 response\n",
                                ast_sorcery_object_get_id(publish));
                        goto end;
                }
 
                /* Setting this to NULL will cause a new PUBLISH to get created and sent for the same underlying body */
-               client->sending = NULL;
+               publisher->sending = NULL;
        } else if (param->code == 423) {
                /* Update the expiration with the new expiration time if available */
                pjsip_expires_hdr *expires;
 
                expires = pjsip_msg_find_hdr(param->rdata->msg_info.msg, PJSIP_H_MIN_EXPIRES, NULL);
                if (!expires || !expires->ivalue) {
+                       DESTROY_CLIENT();
                        ast_log(LOG_ERROR, "Received 423 response on outbound publish '%s' without a Min-Expires header\n",
                                ast_sorcery_object_get_id(publish));
-                       pjsip_publishc_destroy(client->client);
-                       client->client = NULL;
                        goto end;
                }
 
-               pjsip_publishc_update_expires(client->client, expires->ivalue);
-               client->sending = NULL;
-       } else if (client->sending) {
+               pjsip_publishc_update_expires(publisher->client, expires->ivalue);
+               publisher->sending = NULL;
+       } else if (publisher->sending) {
                /* Remove the message currently being sent so that when the queue is serviced another will get sent */
-               AST_LIST_REMOVE_HEAD(&client->queue, entry);
-               ast_free(client->sending);
-               client->sending = NULL;
+               AST_LIST_REMOVE_HEAD(&publisher->queue, entry);
+               ast_free(publisher->sending);
+               publisher->sending = NULL;
                if (!param->rdata) {
                        ast_log(LOG_NOTICE, "No response received for outbound publish '%s'\n",
                                ast_sorcery_object_get_id(publish));
                }
        }
 
-       if (AST_LIST_EMPTY(&client->queue)) {
-               schedule_publish_refresh(client, param->expiration);
+       if (AST_LIST_EMPTY(&publisher->queue)) {
+               schedule_publish_refresh(publisher, param->expiration);
        }
 
 end:
-       if (!client->client) {
+       if (!publisher->client) {
                struct sip_outbound_publish_message *message;
 
-               while ((message = AST_LIST_REMOVE_HEAD(&client->queue, entry))) {
+               while ((message = AST_LIST_REMOVE_HEAD(&publisher->queue, entry))) {
                        ast_free(message);
                }
        } else {
-               if (ast_sip_push_task(NULL, sip_publish_client_service_queue, ao2_bump(client))) {
-                       ao2_ref(client, -1);
+               if (ast_sip_push_task(publisher->serializer, sip_publisher_service_queue, ao2_bump(publisher))) {
+                       ao2_ref(publisher, -1);
                }
        }
 }
@@ -1028,33 +1428,103 @@ static struct ast_sip_outbound_publish_state *sip_outbound_publish_state_alloc(
                return NULL;
        }
 
-       state->client->timer.user_data = state->client;
-       state->client->timer.cb = sip_outbound_publish_timer_cb;
+       state->client->publishers = ao2_container_alloc(DATASTORE_BUCKETS, sip_outbound_publisher_hash_fn,
+                                                       sip_outbound_publisher_cmp_fn);
+       if (!state->client->publishers) {
+               ao2_ref(state, -1);
+               return NULL;
+       }
+
        state->client->publish = ao2_bump(publish);
 
        strcpy(state->id, id);
        return state;
 }
 
-/*! \brief Apply function which finds or allocates a state structure */
-static int sip_outbound_publish_apply(const struct ast_sorcery *sorcery, void *obj)
+static int validate_publish_config(struct ast_sip_outbound_publish *publish)
 {
-       RAII_VAR(struct ao2_container *, states, ao2_global_obj_ref(current_states), ao2_cleanup);
-       RAII_VAR(struct ast_sip_outbound_publish_state *, state, NULL, ao2_cleanup);
-       struct ast_sip_outbound_publish *applied = obj;
-
-       if (ast_strlen_zero(applied->server_uri)) {
+       if (ast_strlen_zero(publish->server_uri)) {
                ast_log(LOG_ERROR, "No server URI specified on outbound publish '%s'\n",
-                       ast_sorcery_object_get_id(applied));
+                       ast_sorcery_object_get_id(publish));
                return -1;
-       } else if (ast_strlen_zero(applied->event)) {
+       } else if (ast_strlen_zero(publish->event)) {
                ast_log(LOG_ERROR, "No event type specified for outbound publish '%s'\n",
-                       ast_sorcery_object_get_id(applied));
+                       ast_sorcery_object_get_id(publish));
                return -1;
        }
+       return 0;
+}
+
+static int current_state_reusable(struct ast_sip_outbound_publish *publish,
+                                 struct ast_sip_outbound_publish_state *current_state)
+{
+       struct ast_sip_outbound_publish *old_publish;
 
+       /*
+        * Don't maintain the old state/client objects if the multi_user option changed.
+        */
+       if ((!publish->multi_user && current_state->client->publish->multi_user) ||
+           (publish->multi_user && !current_state->client->publish->multi_user)) {
+               return 0;
+       }
+
+
+       if (!can_reuse_publish(current_state->client->publish, publish)) {
+               /*
+                * Something significant has changed in the configuration, so we are
+                * unable to use the old state object. The current state needs to go
+                * away and a new one needs to be created.
+                */
+               return 0;
+       }
+
+       /*
+        * We can reuse the current state object so keep it, but swap out the
+        * underlying publish object with the new one.
+        */
+       old_publish = current_state->client->publish;
+       current_state->client->publish = publish;
+       if (ast_sip_push_task_synchronous(
+                   NULL, sip_outbound_publisher_reinit_all, current_state->client->publishers)) {
+               /*
+                * If the state object fails to re-initialize then swap
+                * the old publish info back in.
+                */
+               current_state->client->publish = publish;
+               ast_log(LOG_ERROR, "Unable to reinitialize client(s) for outbound publish '%s'\n",
+                       ast_sorcery_object_get_id(current_state->client->publish));
+               return -1;
+       }
+
+       /*
+        * Since we swapped out the publish object the new one needs a ref
+        * while the old one needs to go away.
+        */
+       ao2_ref(current_state->client->publish, +1);
+       ao2_cleanup(old_publish);
+
+       /* Tell the caller that the current state object should be used */
+       return 1;
+}
+
+/*! \brief Apply function which finds or allocates a state structure */
+static int sip_outbound_publish_apply(const struct ast_sorcery *sorcery, void *obj)
+{
+#define ADD_TO_NEW_STATES(__obj) \
+       do { if (__obj) { \
+            ao2_link(new_states, __obj); \
+            ao2_ref(__obj, -1); } } while (0)
+
+       struct ast_sip_outbound_publish *applied = obj;
+       struct ast_sip_outbound_publish_state *current_state, *new_state;
+       struct sip_outbound_publisher *publisher = NULL;
+       int res;
+
+       /*
+        * New states are being loaded or reloaded. We'll need to add the new
+        * object if created/updated, or keep the old object if an error occurs.
+        */
        if (!new_states) {
-               /* make sure new_states has been allocated as we will be adding to it */
                new_states = ao2_container_alloc_options(
                        AO2_ALLOC_OPT_LOCK_NOLOCK, DEFAULT_STATE_BUCKETS,
                        outbound_publish_state_hash, outbound_publish_state_cmp);
@@ -1065,35 +1535,46 @@ static int sip_outbound_publish_apply(const struct ast_sorcery *sorcery, void *o
                }
        }
 
-       if (states) {
-               state = ao2_find(states, ast_sorcery_object_get_id(obj), OBJ_SEARCH_KEY);
-               if (state) {
-                       if (can_reuse_publish(state->client->publish, applied)) {
-                               ao2_replace(state->client->publish, applied);
-                       } else {
-                               ao2_ref(state, -1);
-                               state = NULL;
-                       }
-               }
+       /* If there is current state we'll want to maintain it if any errors occur */
+       current_state = sip_publish_state_get(ast_sorcery_object_get_id(applied));
+
+       if ((res = validate_publish_config(applied))) {
+               ADD_TO_NEW_STATES(current_state);
+               return res;
        }
 
-       if (!state) {
-               state = sip_outbound_publish_state_alloc(applied);
-               if (!state) {
-                       ast_log(LOG_ERROR, "Unable to create state for outbound publish '%s'\n",
-                               ast_sorcery_object_get_id(applied));
-                       return -1;
-               };
+       if (current_state && (res = current_state_reusable(applied, current_state))) {
+               /*
+                * The current state object was able to be reused, or an error
+                * occurred. Either way we keep the current state and be done.
+                */
+               ADD_TO_NEW_STATES(current_state);
+               return res == 1 ? 0 : -1;
        }
 
-       if (ast_sip_push_task_synchronous(NULL, sip_outbound_publish_client_alloc, state->client)) {
-               ast_log(LOG_ERROR, "Unable to create client for outbound publish '%s'\n",
+       /*
+        * No current state was found or it was unable to be reused. Either way
+        * we'll need to create a new state object.
+        */
+       new_state = sip_outbound_publish_state_alloc(applied);
+       if (!new_state) {
+               ast_log(LOG_ERROR, "Unable to create state for outbound publish '%s'\n",
                        ast_sorcery_object_get_id(applied));
+               ADD_TO_NEW_STATES(current_state);
+               return -1;
+       };
+
+       if (!applied->multi_user &&
+           !(publisher = sip_outbound_publish_client_add_publisher(new_state->client, NULL))) {
+               ADD_TO_NEW_STATES(current_state);
+               ao2_ref(new_state, -1);
                return -1;
        }
+       ao2_cleanup(publisher);
 
-       ao2_link(new_states, state);
-       return 0;
+       ADD_TO_NEW_STATES(new_state);
+       ao2_cleanup(current_state);
+       return res;
 }
 
 static int outbound_auth_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
@@ -1103,15 +1584,51 @@ static int outbound_auth_handler(const struct aco_option *opt, struct ast_variab
        return ast_sip_auth_vector_init(&publish->outbound_auths, var->value);
 }
 
+
+static int unload_module(void)
+{
+       int remaining;
+
+       ast_sorcery_object_unregister(ast_sip_get_sorcery(), "outbound-publish");
+
+       ao2_global_obj_release(current_states);
+
+       /* Wait for publication serializers to get destroyed. */
+       ast_debug(2, "Waiting for publication to complete for unload.\n");
+       remaining = ast_serializer_shutdown_group_join(shutdown_group, MAX_UNLOAD_TIMEOUT_TIME);
+       if (remaining) {
+               ast_log(LOG_WARNING, "Unload incomplete.  Could not stop %d outbound publications.  Try again later.\n",
+                       remaining);
+               return -1;
+       }
+
+       ast_debug(2, "Successful shutdown.\n");
+
+       ao2_cleanup(shutdown_group);
+       shutdown_group = NULL;
+
+       return 0;
+}
+
 static int load_module(void)
 {
        CHECK_PJSIP_MODULE_LOADED();
 
+       /* As of pjproject 2.4.5, PJSIP_MAX_URL_SIZE isn't exposed yet but we try anyway. */
+       ast_pjproject_get_buildopt("PJSIP_MAX_URL_SIZE", "%d", &pjsip_max_url_size);
+
+       shutdown_group = ast_serializer_shutdown_group_alloc();
+       if (!shutdown_group) {
+               return AST_MODULE_LOAD_FAILURE;
+       }
+
        ast_sorcery_apply_config(ast_sip_get_sorcery(), "res_pjsip_outbound_publish");
        ast_sorcery_apply_default(ast_sip_get_sorcery(), "outbound-publish", "config", "pjsip.conf,criteria=type=outbound-publish");
 
        if (ast_sorcery_object_register(ast_sip_get_sorcery(), "outbound-publish", sip_outbound_publish_alloc, NULL,
                sip_outbound_publish_apply)) {
+               ast_log(LOG_ERROR, "Unable to register 'outbound-publish' type with sorcery\n");
+               unload_module();
                return AST_MODULE_LOAD_DECLINE;
        }
 
@@ -1123,7 +1640,9 @@ static int load_module(void)
        ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "outbound_proxy", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_outbound_publish, outbound_proxy));
        ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "expiration", "3600", OPT_UINT_T, 0, FLDSET(struct ast_sip_outbound_publish, expiration));
        ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "max_auth_attempts", "5", OPT_UINT_T, 0, FLDSET(struct ast_sip_outbound_publish, max_auth_attempts));
+       ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "transport", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_outbound_publish, transport));
        ast_sorcery_object_field_register_custom(ast_sip_get_sorcery(), "outbound-publish", "outbound_auth", "", outbound_auth_handler, NULL, NULL, 0, 0);
+       ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "multi_user", "no", OPT_BOOL_T, 1, FLDSET(struct ast_sip_outbound_publish, multi_user));
 
        ast_sorcery_reload_object(ast_sip_get_sorcery(), "outbound-publish");
 
@@ -1146,48 +1665,6 @@ static int reload_module(void)
        return 0;
 }
 
-static int unload_module(void)
-{
-       struct timeval start = ast_tvnow();
-       struct timespec end = {
-               .tv_sec = start.tv_sec + 10,
-               .tv_nsec = start.tv_usec * 1000
-       };
-       int res = 0;
-       struct ao2_container *states = ao2_global_obj_ref(current_states);
-
-       if (!states || !(unloading.count = ao2_container_count(states))) {
-               return 0;
-       }
-       ao2_ref(states, -1);
-
-       ast_mutex_init(&unloading.lock);
-       ast_cond_init(&unloading.cond, NULL);
-       ast_mutex_lock(&unloading.lock);
-
-       unloading.is_unloading = 1;
-       ao2_global_obj_release(current_states);
-
-       /* wait for items to unpublish */
-       ast_verb(5, "Waiting to complete unpublishing task(s)\n");
-       while (unloading.count) {
-               res = ast_cond_timedwait(&unloading.cond, &unloading.lock, &end);
-       }
-       ast_mutex_unlock(&unloading.lock);
-
-       ast_mutex_destroy(&unloading.lock);
-       ast_cond_destroy(&unloading.cond);
-
-       if (res) {
-               ast_verb(5, "At least %d items were unable to unpublish "
-                       "in the allowed time\n", unloading.count);
-       } else {
-               ast_verb(5, "All items successfully unpublished\n");
-       }
-
-       return res;
-}
-
 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS | AST_MODFLAG_LOAD_ORDER, "PJSIP Outbound Publish Support",
        .load = load_module,
        .reload = reload_module,