#include <pjsip.h>
#include <pjsip_simple.h>
+#include "asterisk/res_pjproject.h"
#include "asterisk/res_pjsip.h"
#include "asterisk/res_pjsip_outbound_publish.h"
#include "asterisk/module.h"
#include "asterisk/taskprocessor.h"
+#include "asterisk/threadpool.h"
#include "asterisk/datastore.h"
/*** DOCUMENTATION
<configOption name="max_auth_attempts" default="5">
<synopsis>Maximum number of authentication attempts before stopping the publication.</synopsis>
</configOption>
+ <configOption name="transport">
+ <synopsis>Transport used for outbound publish</synopsis>
+ <description>
+ <note><para>A <replaceable>transport</replaceable> configured in
+ <literal>pjsip.conf</literal>. As with other <literal>res_pjsip</literal> modules, this will use the first available transport of the appropriate type if unconfigured.</para></note>
+ </description>
+ </configOption>
+ <configOption name="multi_user" default="no">
+ <synopsis>Enable multi-user support</synopsis>
+ <description><para>When enabled the user portion of the server uri is replaced by a dynamically created user</para></description>
+ </configOption>
<configOption name="type">
<synopsis>Must be of type 'outbound-publish'.</synopsis>
</configOption>
</configInfo>
***/
+static int pjsip_max_url_size = PJSIP_MAX_URL_SIZE;
+
/*! \brief Queued outbound publish message */
struct sip_outbound_publish_message {
/*! \brief Optional body */
char body_contents[0];
};
+/*
+ * A note about some of the object types used in this module:
+ *
+ * The reason we currently have 4 separate object types that relate to configuration,
+ * publishing, state, and client information is due to object lifetimes and order of
+ * destruction dependencies.
+ *
+ * Separation of concerns is a good thing and of course it makes sense to have a
+ * configuration object type as well as an object type wrapper around pjsip's publishing
+ * client class. There also may be run time state data that needs to be tracked, so
+ * again having something to handle that is prudent. However, it may be tempting to think
+ * "why not combine the state and client object types?" Especially seeing as how they have
+ * a one-to-one relationship. The answer is, it's possible, but it'd make the code a bit
+ * more awkward.
+ *
+ * Currently this module maintains a global container of current state objects. When this
+ * states container is replaced, or deleted, it un-references all contained objects. Any
+ * state with a reference left have probably been carried over from a reload/realtime fetch.
+ * States not carried over are destructed and the associated client (and all its publishers)
+ * get unpublished.
+ *
+ * This "unpublishing" goes through a careful process of unpublishing the client, all its
+ * publishers, and making sure all the appropriate references are removed in a sane order.
+ * This process is essentially kicked off with the destruction of the state. If the state
+ * and client objects were to be merged, where clients became the globally tracked object
+ * type, this "unpublishing" process would never start because of the multiple references
+ * held to the client object over it's lifetime. Meaning the global tracking container
+ * would remove its reference to the client object when done with it, but other sources
+ * would still be holding a reference to it (namely the datastore and publisher(s)).
+ *
+ * Thus at this time it is easier to keep them separate.
+ */
+
/*! \brief Outbound publish information */
struct ast_sip_outbound_publish {
/*! \brief Sorcery object details */
AST_STRING_FIELD(from_uri);
/*! \brief URI for the To header */
AST_STRING_FIELD(to_uri);
+ /*! \brief Explicit transport to use for publish */
+ AST_STRING_FIELD(transport);
/*! \brief Outbound proxy to use */
AST_STRING_FIELD(outbound_proxy);
/*! \brief The event type to publish */
unsigned int max_auth_attempts;
/*! \brief Configured authentication credentials */
struct ast_sip_auth_vector outbound_auths;
+ /*! \brief The publishing client is used for multiple users when true */
+ unsigned int multi_user;
};
-/*! \brief Outbound publish client state information (persists for lifetime that publish should exist) */
-struct ast_sip_outbound_publish_client {
+struct sip_outbound_publisher {
+ /*! \brief The client object that 'owns' this client
+
+ \note any potential circular reference problems are accounted
+ for (see publisher alloc for more information)
+ */
+ struct ast_sip_outbound_publish_client *owner;
/*! \brief Underlying publish client */
pjsip_publishc *client;
+ /*! \brief The From URI for this specific publisher */
+ char *from_uri;
+ /*! \brief The To URI for this specific publisher */
+ char *to_uri;
/*! \brief Timer entry for refreshing publish */
pj_timer_entry timer;
- /*! \brief Publisher datastores set up by handlers */
- struct ao2_container *datastores;
/*! \brief The number of auth attempts done */
unsigned int auth_attempts;
/*! \brief Queue of outgoing publish messages to send*/
AST_LIST_HEAD_NOLOCK(, sip_outbound_publish_message) queue;
/*! \brief The message currently being sent */
struct sip_outbound_publish_message *sending;
- /*! \brief Publish client has been fully started and event type informed */
- unsigned int started;
/*! \brief Publish client should be destroyed */
unsigned int destroy;
+ /*! \brief Serializer for stuff and things */
+ struct ast_taskprocessor *serializer;
+ /*! \brief User, if any, associated with the publisher */
+ char user[0];
+};
+
+/*! \brief Outbound publish client state information (persists for lifetime of a publish) */
+struct ast_sip_outbound_publish_client {
/*! \brief Outbound publish information */
struct ast_sip_outbound_publish *publish;
+ /*! \brief Publisher datastores set up by handlers */
+ struct ao2_container *datastores;
+ /*! \brief Container of all the client publishing objects */
+ struct ao2_container *publishers;
+ /*! \brief Publishing has been fully started and event type informed */
+ unsigned int started;
};
/*! \brief Outbound publish state information (persists for lifetime of a publish) */
char id[0];
};
-/*! \brief Unloading data */
-struct unloading_data {
- int is_unloading;
- int count;
- ast_mutex_t lock;
- ast_cond_t cond;
-} unloading;
+/*!
+ * \brief Used for locking while loading/reloading
+ *
+ * Mutli-user configurations make it so publishers can be dynamically added and
+ * removed. Publishers should not be added or removed during a [re]load since
+ * it could cause the current_clients container to be out of sync. Thus the
+ * reason for this lock.
+ */
+AST_RWLOCK_DEFINE_STATIC(load_lock);
+
+#define DEFAULT_PUBLISHER_BUCKETS 119
+AO2_STRING_FIELD_HASH_FN(sip_outbound_publisher, user);
+AO2_STRING_FIELD_CMP_FN(sip_outbound_publisher, user);
+
+/*! Time needs to be long enough for a transaction to timeout if nothing replies. */
+#define MAX_UNLOAD_TIMEOUT_TIME 35 /* Seconds */
+
+/*! Shutdown group to monitor sip_outbound_registration_client_state serializers. */
+static struct ast_serializer_shutdown_group *shutdown_group;
/*! \brief Default number of client state container buckets */
#define DEFAULT_STATE_BUCKETS 31
static struct ao2_container *get_publishes_and_update_state(void)
{
struct ao2_container *container;
+ SCOPED_WRLOCK(lock, &load_lock);
container = ast_sorcery_retrieve_by_fields(
ast_sip_get_sorcery(), "outbound-publish",
return iter;
}
-/*! \brief Helper function which cancels the refresh timer on a client */
-static void cancel_publish_refresh(struct ast_sip_outbound_publish_client *client)
+/*! \brief Helper function which cancels the refresh timer on a publisher */
+static void cancel_publish_refresh(struct sip_outbound_publisher *publisher)
{
- if (pj_timer_heap_cancel(pjsip_endpt_get_timer_heap(ast_sip_get_pjsip_endpoint()), &client->timer)) {
- /* The timer was successfully cancelled, drop the refcount of the client */
- ao2_ref(client, -1);
+ if (pj_timer_heap_cancel(pjsip_endpt_get_timer_heap(ast_sip_get_pjsip_endpoint()), &publisher->timer)) {
+ /* The timer was successfully cancelled, drop the refcount of the publisher */
+ ao2_ref(publisher, -1);
}
}
/*! \brief Helper function which sets up the timer to send publication */
-static void schedule_publish_refresh(struct ast_sip_outbound_publish_client *client, int expiration)
+static void schedule_publish_refresh(struct sip_outbound_publisher *publisher, int expiration)
{
- struct ast_sip_outbound_publish *publish = ao2_bump(client->publish);
+ struct ast_sip_outbound_publish *publish = ao2_bump(publisher->owner->publish);
pj_time_val delay = { .sec = 0, };
- cancel_publish_refresh(client);
+ cancel_publish_refresh(publisher);
if (expiration > 0) {
delay.sec = expiration - PJSIP_PUBLISHC_DELAY_BEFORE_REFRESH;
delay.sec = PJSIP_PUBLISHC_DELAY_BEFORE_REFRESH;
}
- ao2_ref(client, +1);
- if (pjsip_endpt_schedule_timer(ast_sip_get_pjsip_endpoint(), &client->timer, &delay) != PJ_SUCCESS) {
+ ao2_ref(publisher, +1);
+ if (pjsip_endpt_schedule_timer(ast_sip_get_pjsip_endpoint(), &publisher->timer, &delay) != PJ_SUCCESS) {
ast_log(LOG_WARNING, "Failed to pass timed publish refresh to scheduler\n");
- ao2_ref(client, -1);
+ ao2_ref(publisher, -1);
}
ao2_ref(publish, -1);
}
+static int publisher_client_send(void *obj, void *arg, void *data, int flags);
+
/*! \brief Publish client timer callback function */
static void sip_outbound_publish_timer_cb(pj_timer_heap_t *timer_heap, struct pj_timer_entry *entry)
{
- struct ast_sip_outbound_publish_client *client = entry->user_data;
+ struct sip_outbound_publisher *publisher = entry->user_data;
- ao2_lock(client);
- if (AST_LIST_EMPTY(&client->queue)) {
+ ao2_lock(publisher);
+ if (AST_LIST_EMPTY(&publisher->queue)) {
+ int res;
/* If there are no outstanding messages send an empty PUBLISH message so our publication doesn't expire */
- ast_sip_publish_client_send(client, NULL);
+ publisher_client_send(publisher, NULL, &res, 0);
}
- ao2_unlock(client);
+ ao2_unlock(publisher);
- ao2_ref(client, -1);
+ ao2_ref(publisher, -1);
}
/*! \brief Task for cancelling a refresh timer */
static int cancel_refresh_timer_task(void *data)
{
- struct ast_sip_outbound_publish_client *client = data;
+ struct sip_outbound_publisher *publisher = data;
- cancel_publish_refresh(client);
- ao2_ref(client, -1);
+ cancel_publish_refresh(publisher);
+ ao2_ref(publisher, -1);
return 0;
}
+static void set_transport(struct sip_outbound_publisher *publisher, pjsip_tx_data *tdata)
+{
+ if (!ast_strlen_zero(publisher->owner->publish->transport)) {
+ pjsip_tpselector selector = { .type = PJSIP_TPSELECTOR_NONE, };
+ ast_sip_set_tpselector_from_transport_name(
+ publisher->owner->publish->transport, &selector);
+ pjsip_tx_data_set_transport(tdata, &selector);
+ }
+}
+
/*! \brief Task for sending an unpublish */
static int send_unpublish_task(void *data)
{
- struct ast_sip_outbound_publish_client *client = data;
+ struct sip_outbound_publisher *publisher = data;
pjsip_tx_data *tdata;
- if (pjsip_publishc_unpublish(client->client, &tdata) == PJ_SUCCESS) {
- pjsip_publishc_send(client->client, tdata);
+ if (pjsip_publishc_unpublish(publisher->client, &tdata) == PJ_SUCCESS) {
+ set_transport(publisher, tdata);
+ pjsip_publishc_send(publisher->client, tdata);
}
- ao2_ref(client, -1);
+ ao2_ref(publisher, -1);
return 0;
}
+static void stop_publishing(struct ast_sip_outbound_publish_client *client,
+ struct ast_sip_event_publisher_handler *handler)
+{
+ if (!handler) {
+ handler = find_publisher_handler_for_event_name(client->publish->event);
+ }
+
+ if (handler) {
+ handler->stop_publishing(client);
+ }
+}
+
+static int cancel_and_unpublish(void *obj, void *arg, int flags);
+
/*! \brief Helper function which starts or stops publish clients when applicable */
static void sip_outbound_publish_synchronize(struct ast_sip_event_publisher_handler *removed)
{
state->client->started = 1;
}
} else if (state->client->started && !handler && removed && !strcmp(publish->event, removed->event_name)) {
- /* If the publisher client has been started but it is going away stop it */
- removed->stop_publishing(state->client);
+ stop_publishing(state->client, removed);
+ ao2_callback(state->client->publishers, OBJ_NODATA, cancel_and_unpublish, NULL);
state->client->started = 0;
- if (ast_sip_push_task(NULL, cancel_refresh_timer_task, ao2_bump(state->client))) {
- ast_log(LOG_WARNING, "Could not stop refresh timer on client '%s'\n",
- ast_sorcery_object_get_id(publish));
- ao2_ref(state->client, -1);
- }
}
ao2_ref(publish, -1);
ao2_ref(state, -1);
ao2_ref(states, -1);
}
-struct ast_sip_outbound_publish_client *ast_sip_publish_client_get(const char *name)
+static struct ast_sip_outbound_publish_state *sip_publish_state_get(const char *id)
{
- RAII_VAR(struct ao2_container *, states,
- ao2_global_obj_ref(current_states), ao2_cleanup);
- RAII_VAR(struct ast_sip_outbound_publish_state *, state, NULL, ao2_cleanup);
+ struct ao2_container *states = ao2_global_obj_ref(current_states);
+ struct ast_sip_outbound_publish_state *res;
if (!states) {
return NULL;
}
- state = ao2_find(states, name, OBJ_SEARCH_KEY);
+ res = ao2_find(states, id, OBJ_SEARCH_KEY);
+ ao2_ref(states, -1);
+ return res;
+}
+
+struct ast_sip_outbound_publish_client *ast_sip_publish_client_get(const char *name)
+{
+ struct ast_sip_outbound_publish_state *state = sip_publish_state_get(name);
+
if (!state) {
return NULL;
}
ao2_ref(state->client, +1);
+ ao2_ref(state, -1);
return state->client;
}
+const char *ast_sip_publish_client_get_from_uri(struct ast_sip_outbound_publish_client *client)
+{
+ struct ast_sip_outbound_publish *publish = client->publish;
+
+ return S_OR(publish->from_uri, S_OR(publish->server_uri, ""));
+}
+
+static struct sip_outbound_publisher *sip_outbound_publish_client_add_publisher(
+ struct ast_sip_outbound_publish_client *client, const char *user);
+
+static struct sip_outbound_publisher *sip_outbound_publish_client_get_publisher(
+ struct ast_sip_outbound_publish_client *client, const char *user)
+{
+ struct sip_outbound_publisher *publisher;
+
+ /*
+ * Lock before searching since there could be a race between searching and adding.
+ * Just use the load_lock since we might need to lock it anyway (if adding) and
+ * also it simplifies the code (otherwise we'd have to lock the publishers, no-
+ * lock the search and pass a flag to 'add publisher to no-lock the potential link).
+ */
+ ast_rwlock_wrlock(&load_lock);
+ publisher = ao2_find(client->publishers, user, OBJ_SEARCH_KEY);
+ if (!publisher) {
+ if (!(publisher = sip_outbound_publish_client_add_publisher(client, user))) {
+ ast_rwlock_unlock(&load_lock);
+ return NULL;
+ }
+ }
+ ast_rwlock_unlock(&load_lock);
+
+ return publisher;
+}
+
+const char *ast_sip_publish_client_get_user_from_uri(struct ast_sip_outbound_publish_client *client, const char *user,
+ char *uri, size_t size)
+{
+ struct sip_outbound_publisher *publisher;
+
+ publisher = sip_outbound_publish_client_get_publisher(client, user);
+ if (!publisher) {
+ return NULL;
+ }
+
+ ast_copy_string(uri, publisher->from_uri, size);
+ ao2_ref(publisher, -1);
+
+ return uri;
+}
+
+const char *ast_sip_publish_client_get_to_uri(struct ast_sip_outbound_publish_client *client)
+{
+ struct ast_sip_outbound_publish *publish = client->publish;
+
+ return S_OR(publish->to_uri, S_OR(publish->server_uri, ""));
+}
+
+const char *ast_sip_publish_client_get_user_to_uri(struct ast_sip_outbound_publish_client *client, const char *user,
+ char *uri, size_t size)
+{
+ struct sip_outbound_publisher *publisher;
+
+ publisher = sip_outbound_publish_client_get_publisher(client, user);
+ if (!publisher) {
+ return NULL;
+ }
+
+ ast_copy_string(uri, publisher->to_uri, size);
+ ao2_ref(publisher, -1);
+
+ return uri;
+}
+
int ast_sip_register_event_publisher_handler(struct ast_sip_event_publisher_handler *handler)
{
struct ast_sip_event_publisher_handler *existing;
ao2_find(client->datastores, name, OBJ_SEARCH_KEY | OBJ_UNLINK | OBJ_NODATA);
}
-static int sip_publish_client_service_queue(void *data)
+static int sip_publisher_service_queue(void *data)
{
- RAII_VAR(struct ast_sip_outbound_publish_client *, client, data, ao2_cleanup);
- SCOPED_AO2LOCK(lock, client);
+ RAII_VAR(struct sip_outbound_publisher *, publisher, data, ao2_cleanup);
+ SCOPED_AO2LOCK(lock, publisher);
struct sip_outbound_publish_message *message;
pjsip_tx_data *tdata;
pj_status_t status;
- if (client->destroy || client->sending || !(message = AST_LIST_FIRST(&client->queue))) {
+ if (publisher->destroy || publisher->sending || !(message = AST_LIST_FIRST(&publisher->queue))) {
return 0;
}
- if (pjsip_publishc_publish(client->client, PJ_FALSE, &tdata) != PJ_SUCCESS) {
+ if (pjsip_publishc_publish(publisher->client, PJ_FALSE, &tdata) != PJ_SUCCESS) {
goto fatal;
}
goto fatal;
}
- status = pjsip_publishc_send(client->client, tdata);
+ set_transport(publisher, tdata);
+
+ status = pjsip_publishc_send(publisher->client, tdata);
if (status == PJ_EBUSY) {
/* We attempted to send the message but something else got there first */
goto service;
goto fatal;
}
- client->sending = message;
+ publisher->sending = message;
return 0;
fatal:
- AST_LIST_REMOVE_HEAD(&client->queue, entry);
+ AST_LIST_REMOVE_HEAD(&publisher->queue, entry);
ast_free(message);
service:
- if (ast_sip_push_task(NULL, sip_publish_client_service_queue, ao2_bump(client))) {
- ao2_ref(client, -1);
+ if (ast_sip_push_task(publisher->serializer, sip_publisher_service_queue, ao2_bump(publisher))) {
+ ao2_ref(publisher, -1);
}
return -1;
}
-int ast_sip_publish_client_send(struct ast_sip_outbound_publish_client *client,
- const struct ast_sip_body *body)
+static int publisher_client_send(void *obj, void *arg, void *data, int flags)
{
- SCOPED_AO2LOCK(lock, client);
+ struct sip_outbound_publisher *publisher = obj;
+ const struct ast_sip_body *body = arg;
struct sip_outbound_publish_message *message;
size_t type_len = 0, subtype_len = 0, body_text_len = 0;
- int res;
+ int *res = data;
- if (!client->client) {
+ *res = -1;
+ if (!publisher->client) {
return -1;
}
message->body.body_text = strcpy(dst, body->body_text);
}
- AST_LIST_INSERT_TAIL(&client->queue, message, entry);
+ AST_LIST_INSERT_TAIL(&publisher->queue, message, entry);
- res = ast_sip_push_task(NULL, sip_publish_client_service_queue, ao2_bump(client));
- if (res) {
- ao2_ref(client, -1);
+ *res = ast_sip_push_task(publisher->serializer, sip_publisher_service_queue, ao2_bump(publisher));
+ if (*res) {
+ ao2_ref(publisher, -1);
}
+ return *res;
+}
+
+int ast_sip_publish_client_send(struct ast_sip_outbound_publish_client *client,
+ const struct ast_sip_body *body)
+{
+ SCOPED_AO2LOCK(lock, client);
+ int res = 0;
+
+ ao2_callback_data(client->publishers, OBJ_NODATA,
+ publisher_client_send, (void *)body, &res);
return res;
}
+static int sip_outbound_publisher_set_uri(
+ pj_pool_t *pool, const char *uri, const char *user, pj_str_t *res_uri)
+{
+ pj_str_t tmp;
+ pjsip_uri *parsed;
+ pjsip_sip_uri *parsed_uri;
+ int size;
+
+ pj_strdup2_with_null(pool, &tmp, uri);
+ if (!(parsed = pjsip_parse_uri(pool, tmp.ptr, tmp.slen, 0))) {
+ return -1;
+ }
+
+ if (!(parsed_uri = pjsip_uri_get_uri(parsed))) {
+ return -1;
+ }
+
+ if (!ast_strlen_zero(user)) {
+ pj_strdup2(pool, &parsed_uri->user, user);
+ }
+
+ res_uri->ptr = (char*) pj_pool_alloc(pool, pjsip_max_url_size);
+ if (!res_uri->ptr) {
+ return -1;
+ }
+
+ if ((size = pjsip_uri_print(PJSIP_URI_IN_OTHER, parsed_uri, res_uri->ptr,
+ pjsip_max_url_size - 1)) <= 0) {
+ return -1;
+ }
+ res_uri->ptr[size] = '\0';
+ res_uri->slen = size;
+
+ return 0;
+}
+
+static int sip_outbound_publisher_set_uris(
+ pj_pool_t *pool, struct sip_outbound_publisher *publisher,
+ pj_str_t *server_uri, pj_str_t *to_uri, pj_str_t *from_uri)
+{
+ struct ast_sip_outbound_publish *publish = publisher->owner->publish;
+
+ if (sip_outbound_publisher_set_uri(pool, publish->server_uri, publisher->user, server_uri)) {
+ ast_log(LOG_ERROR, "Invalid server URI '%s' specified on outbound publish '%s'\n",
+ publish->server_uri, ast_sorcery_object_get_id(publish));
+ return -1;
+ }
+
+ if (ast_strlen_zero(publish->to_uri)) {
+ to_uri->ptr = server_uri->ptr;
+ to_uri->slen = server_uri->slen;
+ } else if (sip_outbound_publisher_set_uri(pool, publish->to_uri, publisher->user, to_uri)) {
+ ast_log(LOG_ERROR, "Invalid to URI '%s' specified on outbound publish '%s'\n",
+ publish->to_uri, ast_sorcery_object_get_id(publish));
+ return -1;
+ }
+
+ publisher->to_uri = ast_strdup(to_uri->ptr);
+ if (!publisher->to_uri) {
+ return -1;
+ }
+
+ if (ast_strlen_zero(publish->from_uri)) {
+ from_uri->ptr = server_uri->ptr;
+ from_uri->slen = server_uri->slen;
+ } else if (sip_outbound_publisher_set_uri(pool, publish->from_uri, publisher->user, from_uri)) {
+ ast_log(LOG_ERROR, "Invalid from URI '%s' specified on outbound publish '%s'\n",
+ publish->from_uri, ast_sorcery_object_get_id(publish));
+ return -1;
+ }
+
+ publisher->from_uri = ast_strdup(from_uri->ptr);
+ if (!publisher->from_uri) {
+ return -1;
+ }
+
+ return 0;
+}
+
+static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param);
+
+/*! \brief Helper function that allocates a pjsip publish client and configures it */
+static int sip_outbound_publisher_init(void *data)
+{
+ struct sip_outbound_publisher *publisher = data;
+ RAII_VAR(struct ast_sip_outbound_publish *, publish, NULL, ao2_cleanup);
+ pjsip_publishc_opt opt = {
+ .queue_request = PJ_FALSE,
+ };
+ pj_pool_t *pool;
+ pj_str_t event, server_uri, to_uri, from_uri;
+
+ if (publisher->client) {
+ return 0;
+ }
+
+ if (pjsip_publishc_create(ast_sip_get_pjsip_endpoint(), &opt,
+ ao2_bump(publisher), sip_outbound_publish_callback,
+ &publisher->client) != PJ_SUCCESS) {
+ ao2_ref(publisher, -1);
+ return -1;
+ }
+
+ publish = ao2_bump(publisher->owner->publish);
+
+ if (!ast_strlen_zero(publish->outbound_proxy)) {
+ pjsip_route_hdr route_set, *route;
+ static const pj_str_t ROUTE_HNAME = { "Route", 5 };
+
+ pj_list_init(&route_set);
+
+ if (!(route = pjsip_parse_hdr(pjsip_publishc_get_pool(publisher->client), &ROUTE_HNAME,
+ (char*)publish->outbound_proxy, strlen(publish->outbound_proxy), NULL))) {
+ pjsip_publishc_destroy(publisher->client);
+ return -1;
+ }
+ pj_list_insert_nodes_before(&route_set, route);
+
+ pjsip_publishc_set_route_set(publisher->client, &route_set);
+ }
+
+ pool = pjsip_endpt_create_pool(ast_sip_get_pjsip_endpoint(), "URI Validation",
+ pjsip_max_url_size, pjsip_max_url_size);
+ if (!pool) {
+ ast_log(LOG_ERROR, "Could not create pool for URI validation on outbound publish '%s'\n",
+ ast_sorcery_object_get_id(publish));
+ pjsip_publishc_destroy(publisher->client);
+ return -1;
+ }
+
+ if (sip_outbound_publisher_set_uris(pool, publisher, &server_uri, &from_uri, &to_uri)) {
+ pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
+ pjsip_publishc_destroy(publisher->client);
+ return -1;
+ }
+
+ pj_cstr(&event, publish->event);
+ if (pjsip_publishc_init(publisher->client, &event, &server_uri, &from_uri, &to_uri,
+ publish->expiration != PJ_SUCCESS)) {
+ ast_log(LOG_ERROR, "Failed to initialize publishing client on outbound publish '%s'\n",
+ ast_sorcery_object_get_id(publish));
+ pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
+ pjsip_publishc_destroy(publisher->client);
+ return -1;
+ }
+
+ pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
+ return 0;
+}
+
+static int sip_outbound_publisher_reinit(void *obj, void *arg, int flags)
+{
+ return sip_outbound_publisher_init(obj);
+}
+
+static int sip_outbound_publisher_reinit_all(void *data)
+{
+ ao2_callback(data, OBJ_NODATA, sip_outbound_publisher_reinit, NULL);
+ return 0;
+}
+
/*! \brief Destructor function for publish client */
-static void sip_outbound_publish_client_destroy(void *obj)
+static void sip_outbound_publisher_destroy(void *obj)
{
- struct ast_sip_outbound_publish_client *client = obj;
+ struct sip_outbound_publisher *publisher = obj;
struct sip_outbound_publish_message *message;
/* You might be tempted to think "the publish client isn't being destroyed" but it actually is - just elsewhere */
- while ((message = AST_LIST_REMOVE_HEAD(&client->queue, entry))) {
+ while ((message = AST_LIST_REMOVE_HEAD(&publisher->queue, entry))) {
ast_free(message);
}
- ao2_cleanup(client->datastores);
- ao2_cleanup(client->publish);
+ ao2_cleanup(publisher->owner);
+ ast_free(publisher->from_uri);
+ ast_free(publisher->to_uri);
+
+ ast_taskprocessor_unreference(publisher->serializer);
+}
+
+static struct sip_outbound_publisher *sip_outbound_publisher_alloc(
+ struct ast_sip_outbound_publish_client *client, const char *user)
+{
+ struct sip_outbound_publisher *publisher;
+ char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
+
+ publisher = ao2_alloc(sizeof(*publisher) + (user ? strlen(user) : 0) + 1,
+ sip_outbound_publisher_destroy);
+ if (!publisher) {
+ return NULL;
+ }
+
+ /*
+ * Bump the ref to the client. This essentially creates a circular reference,
+ * but it is needed in order to make sure the client object doesn't get pulled
+ * out from under us when the publisher stops publishing.
+ *
+ * The circular reference is alleviated by calling cancel_and_unpublish for
+ * each client, from the state's destructor. By calling it there all references
+ * to the publishers should go to zero, thus calling the publisher's destructor.
+ * This in turn removes the client reference we added here. The state then removes
+ * its reference to the client, which should take it to zero.
+ */
+ publisher->owner = ao2_bump(client);
+ publisher->timer.user_data = publisher;
+ publisher->timer.cb = sip_outbound_publish_timer_cb;
+ if (user) {
+ strcpy(publisher->user, user);
+ } else {
+ *publisher->user = '\0';
+ }
+
+ ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/outpub/%s",
+ ast_sorcery_object_get_id(client->publish));
+
+ publisher->serializer = ast_sip_create_serializer_group(tps_name,
+ shutdown_group);
+ if (!publisher->serializer) {
+ ao2_ref(publisher, -1);
+ return NULL;
+ }
+
+ if (ast_sip_push_task_synchronous(NULL, sip_outbound_publisher_init, publisher)) {
+ ast_log(LOG_ERROR, "Unable to create publisher for outbound publish '%s'\n",
+ ast_sorcery_object_get_id(client->publish));
+ ao2_ref(publisher, -1);
+ return NULL;
+ }
+
+ return publisher;
+}
- /* if unloading the module and all objects have been unpublished
- send the signal to finish unloading */
- if (unloading.is_unloading) {
- ast_mutex_lock(&unloading.lock);
- if (--unloading.count == 0) {
- ast_cond_signal(&unloading.cond);
+static struct sip_outbound_publisher *sip_outbound_publish_client_add_publisher(
+ struct ast_sip_outbound_publish_client *client, const char *user)
+{
+ struct sip_outbound_publisher *publisher =
+ sip_outbound_publisher_alloc(client, user);
+
+ if (!publisher) {
+ return NULL;
+ }
+
+ if (!ao2_link(client->publishers, publisher)) {
+ /*
+ * No need to bump the reference here. The task will take care of
+ * removing the reference.
+ */
+ if (ast_sip_push_task(publisher->serializer, cancel_refresh_timer_task, publisher)) {
+ ao2_ref(publisher, -1);
}
- ast_mutex_unlock(&unloading.lock);
+ return NULL;
}
+
+ return publisher;
+}
+
+int ast_sip_publish_client_user_send(struct ast_sip_outbound_publish_client *client,
+ const char *user, const struct ast_sip_body *body)
+{
+ struct sip_outbound_publisher *publisher;
+ int res;
+
+ publisher = sip_outbound_publish_client_get_publisher(client, user);
+ if (!publisher) {
+ return -1;
+ }
+
+ publisher_client_send(publisher, (void *)body, &res, 0);
+ ao2_ref(publisher, -1);
+ return res;
+}
+
+void ast_sip_publish_client_remove(struct ast_sip_outbound_publish_client *client,
+ const char *user)
+{
+ SCOPED_WRLOCK(lock, &load_lock);
+ ao2_find(client->publishers, user, OBJ_SEARCH_KEY | OBJ_UNLINK | OBJ_NODATA);
}
static int explicit_publish_destroy(void *data)
{
- struct ast_sip_outbound_publish_client *client = data;
+ struct sip_outbound_publisher *publisher = data;
- pjsip_publishc_destroy(client->client);
- ao2_ref(client, -1);
+ /*
+ * If there is no pjsip publishing client then we obviously don't need
+ * to destroy it. Also, the ref for the Asterisk publishing client that
+ * pjsip had would not exist or should already be gone as well.
+ */
+ if (publisher->client) {
+ pjsip_publishc_destroy(publisher->client);
+ ao2_ref(publisher, -1);
+ }
+
+ ao2_ref(publisher, -1);
return 0;
}
/*! \brief Helper function which cancels and un-publishes a no longer used client */
-static int cancel_and_unpublish(struct ast_sip_outbound_publish_client *client)
+static int cancel_and_unpublish(void *obj, void *arg, int flags)
{
- struct ast_sip_event_publisher_handler *handler;
- SCOPED_AO2LOCK(lock, client);
+ struct sip_outbound_publisher *publisher = obj;
+ struct ast_sip_outbound_publish_client *client = publisher->owner;
+
+ SCOPED_AO2LOCK(lock, publisher);
if (!client->started) {
- /* If the client was never started, there's nothing to unpublish, so just
- * destroy the publication and remove its reference to the client.
+ /* If the publisher was never started, there's nothing to unpublish, so just
+ * destroy the publication and remove its reference to the publisher.
*/
- ast_sip_push_task(NULL, explicit_publish_destroy, client);
+ if (ast_sip_push_task(publisher->serializer, explicit_publish_destroy, ao2_bump(publisher))) {
+ ao2_ref(publisher, -1);
+ }
return 0;
}
- handler = find_publisher_handler_for_event_name(client->publish->event);
- if (handler) {
- handler->stop_publishing(client);
- }
-
- client->started = 0;
- if (ast_sip_push_task(NULL, cancel_refresh_timer_task, ao2_bump(client))) {
+ if (ast_sip_push_task(publisher->serializer, cancel_refresh_timer_task, ao2_bump(publisher))) {
ast_log(LOG_WARNING, "Could not stop refresh timer on outbound publish '%s'\n",
ast_sorcery_object_get_id(client->publish));
- ao2_ref(client, -1);
+ ao2_ref(publisher, -1);
}
/* If nothing is being sent right now send the unpublish - the destroy will happen in the subsequent callback */
- if (!client->sending) {
- if (ast_sip_push_task(NULL, send_unpublish_task, ao2_bump(client))) {
+ if (!publisher->sending) {
+ if (ast_sip_push_task(publisher->serializer, send_unpublish_task, ao2_bump(publisher))) {
ast_log(LOG_WARNING, "Could not send unpublish message on outbound publish '%s'\n",
ast_sorcery_object_get_id(client->publish));
- ao2_ref(client, -1);
+ ao2_ref(publisher, -1);
}
}
- client->destroy = 1;
+ publisher->destroy = 1;
return 0;
}
+/*! \brief Destructor function for publish client */
+static void sip_outbound_publish_client_destroy(void *obj)
+{
+ struct ast_sip_outbound_publish_client *client = obj;
+
+ ao2_cleanup(client->datastores);
+
+ /*
+ * The client's publishers have already been unpublished and destroyed
+ * by this point, so it is safe to finally remove the reference to the
+ * publish object. The client needed to hold a reference to it until
+ * the publishers were done with it.
+ */
+ ao2_cleanup(client->publish);
+}
+
/*! \brief Destructor function for publish state */
static void sip_outbound_publish_state_destroy(void *obj)
{
struct ast_sip_outbound_publish_state *state = obj;
- cancel_and_unpublish(state->client);
+ stop_publishing(state->client, NULL);
+ /*
+ * Since the state is being destroyed the associated client needs to also
+ * be destroyed. However simply removing the reference to the client will
+ * not initiate client destruction since the client's publisher(s) hold a
+ * reference to the client object as well. So we need to unpublish the
+ * the client's publishers here, which will remove the publisher's client
+ * reference during that process.
+ *
+ * That being said we don't want to remove the client's reference to the
+ * publish object just yet. We'll hold off on that until client destruction
+ * itself. This is because the publishers need access to the client's
+ * publish object while they are unpublishing.
+ */
+ ao2_callback(state->client->publishers, OBJ_NODATA | OBJ_UNLINK, cancel_and_unpublish, NULL);
+ ao2_cleanup(state->client->publishers);
+
+ state->client->started = 0;
ao2_cleanup(state->client);
}
return 1;
}
-static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param);
-
-/*! \brief Helper function that allocates a pjsip publish client and configures it */
-static int sip_outbound_publish_client_alloc(void *data)
-{
- struct ast_sip_outbound_publish_client *client = data;
- RAII_VAR(struct ast_sip_outbound_publish *, publish, NULL, ao2_cleanup);
- pjsip_publishc_opt opt = {
- .queue_request = PJ_FALSE,
- };
- pj_str_t event, server_uri, to_uri, from_uri;
- pj_status_t status;
-
- if (client->client) {
- return 0;
- } else if (pjsip_publishc_create(ast_sip_get_pjsip_endpoint(), &opt, ao2_bump(client), sip_outbound_publish_callback,
- &client->client) != PJ_SUCCESS) {
- ao2_ref(client, -1);
- return -1;
- }
-
- publish = ao2_bump(client->publish);
-
- if (!ast_strlen_zero(publish->outbound_proxy)) {
- pjsip_route_hdr route_set, *route;
- static const pj_str_t ROUTE_HNAME = { "Route", 5 };
-
- pj_list_init(&route_set);
-
- if (!(route = pjsip_parse_hdr(pjsip_publishc_get_pool(client->client), &ROUTE_HNAME,
- (char*)publish->outbound_proxy, strlen(publish->outbound_proxy), NULL))) {
- pjsip_publishc_destroy(client->client);
- return -1;
- }
- pj_list_insert_nodes_before(&route_set, route);
-
- pjsip_publishc_set_route_set(client->client, &route_set);
- }
-
- pj_cstr(&event, publish->event);
- pj_cstr(&server_uri, publish->server_uri);
- pj_cstr(&to_uri, S_OR(publish->to_uri, publish->server_uri));
- pj_cstr(&from_uri, S_OR(publish->from_uri, publish->server_uri));
-
- status = pjsip_publishc_init(client->client, &event, &server_uri, &from_uri, &to_uri,
- publish->expiration);
- if (status == PJSIP_EINVALIDURI) {
- pj_pool_t *pool;
- pj_str_t tmp;
- pjsip_uri *uri;
-
- pool = pjsip_endpt_create_pool(ast_sip_get_pjsip_endpoint(), "URI Validation", 256, 256);
- if (!pool) {
- ast_log(LOG_ERROR, "Could not create pool for URI validation on outbound publish '%s'\n",
- ast_sorcery_object_get_id(publish));
- pjsip_publishc_destroy(client->client);
- return -1;
- }
-
- pj_strdup2_with_null(pool, &tmp, publish->server_uri);
- uri = pjsip_parse_uri(pool, tmp.ptr, tmp.slen, 0);
- if (!uri) {
- ast_log(LOG_ERROR, "Invalid server URI '%s' specified on outbound publish '%s'\n",
- publish->server_uri, ast_sorcery_object_get_id(publish));
- }
-
- if (!ast_strlen_zero(publish->to_uri)) {
- pj_strdup2_with_null(pool, &tmp, publish->to_uri);
- uri = pjsip_parse_uri(pool, tmp.ptr, tmp.slen, 0);
- if (!uri) {
- ast_log(LOG_ERROR, "Invalid to URI '%s' specified on outbound publish '%s'\n",
- publish->to_uri, ast_sorcery_object_get_id(publish));
- }
- }
-
- if (!ast_strlen_zero(publish->from_uri)) {
- pj_strdup2_with_null(pool, &tmp, publish->from_uri);
- uri = pjsip_parse_uri(pool, tmp.ptr, tmp.slen, 0);
- if (!uri) {
- ast_log(LOG_ERROR, "Invalid from URI '%s' specified on outbound publish '%s'\n",
- publish->from_uri, ast_sorcery_object_get_id(publish));
- }
- }
-
- pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
- pjsip_publishc_destroy(client->client);
- return -1;
- } else if (status != PJ_SUCCESS) {
- pjsip_publishc_destroy(client->client);
- return -1;
- }
-
- return 0;
-}
-
/*! \brief Callback function for publish client responses */
static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param)
{
- RAII_VAR(struct ast_sip_outbound_publish_client *, client, ao2_bump(param->token), ao2_cleanup);
- RAII_VAR(struct ast_sip_outbound_publish *, publish, ao2_bump(client->publish), ao2_cleanup);
- SCOPED_AO2LOCK(lock, client);
+#define DESTROY_CLIENT() do { \
+ pjsip_publishc_destroy(publisher->client); \
+ publisher->client = NULL; \
+ ao2_ref(publisher, -1); } while (0)
+
+ RAII_VAR(struct sip_outbound_publisher *, publisher, ao2_bump(param->token), ao2_cleanup);
+ RAII_VAR(struct ast_sip_outbound_publish *, publish, ao2_bump(publisher->owner->publish), ao2_cleanup);
+ SCOPED_AO2LOCK(lock, publisher);
pjsip_tx_data *tdata;
- if (client->destroy) {
- if (client->sending) {
- client->sending = NULL;
+ if (publisher->destroy) {
+ if (publisher->sending) {
+ publisher->sending = NULL;
- if (!ast_sip_push_task(NULL, send_unpublish_task, ao2_bump(client))) {
+ if (!ast_sip_push_task(publisher->serializer, send_unpublish_task, ao2_bump(publisher))) {
return;
}
ast_log(LOG_WARNING, "Could not send unpublish message on outbound publish '%s'\n",
ast_sorcery_object_get_id(publish));
- ao2_ref(client, -1);
+ ao2_ref(publisher, -1);
}
- /* Once the destroy is called this callback will not get called any longer, so drop the client ref */
- pjsip_publishc_destroy(client->client);
- ao2_ref(client, -1);
+ /* Once the destroy is called this callback will not get called any longer, so drop the publisher ref */
+ DESTROY_CLIENT();
return;
}
if (!ast_sip_create_request_with_auth(&publish->outbound_auths,
param->rdata, tsx->last_tx, &tdata)) {
- pjsip_publishc_send(client->client, tdata);
+ set_transport(publisher, tdata);
+ pjsip_publishc_send(publisher->client, tdata);
}
- client->auth_attempts++;
-
- if (client->auth_attempts == publish->max_auth_attempts) {
- pjsip_publishc_destroy(client->client);
- client->client = NULL;
+ publisher->auth_attempts++;
+ if (publisher->auth_attempts == publish->max_auth_attempts) {
+ DESTROY_CLIENT();
ast_log(LOG_ERROR, "Reached maximum number of PUBLISH authentication attempts on outbound publish '%s'\n",
ast_sorcery_object_get_id(publish));
return;
}
- client->auth_attempts = 0;
+ publisher->auth_attempts = 0;
if (param->code == 412) {
- pjsip_publishc_destroy(client->client);
- client->client = NULL;
-
- if (sip_outbound_publish_client_alloc(publish)) {
+ DESTROY_CLIENT();
+ if (sip_outbound_publisher_init(publisher)) {
ast_log(LOG_ERROR, "Failed to create a new outbound publish client for '%s' on 412 response\n",
ast_sorcery_object_get_id(publish));
goto end;
}
/* Setting this to NULL will cause a new PUBLISH to get created and sent for the same underlying body */
- client->sending = NULL;
+ publisher->sending = NULL;
} else if (param->code == 423) {
/* Update the expiration with the new expiration time if available */
pjsip_expires_hdr *expires;
expires = pjsip_msg_find_hdr(param->rdata->msg_info.msg, PJSIP_H_MIN_EXPIRES, NULL);
if (!expires || !expires->ivalue) {
+ DESTROY_CLIENT();
ast_log(LOG_ERROR, "Received 423 response on outbound publish '%s' without a Min-Expires header\n",
ast_sorcery_object_get_id(publish));
- pjsip_publishc_destroy(client->client);
- client->client = NULL;
goto end;
}
- pjsip_publishc_update_expires(client->client, expires->ivalue);
- client->sending = NULL;
- } else if (client->sending) {
+ pjsip_publishc_update_expires(publisher->client, expires->ivalue);
+ publisher->sending = NULL;
+ } else if (publisher->sending) {
/* Remove the message currently being sent so that when the queue is serviced another will get sent */
- AST_LIST_REMOVE_HEAD(&client->queue, entry);
- ast_free(client->sending);
- client->sending = NULL;
+ AST_LIST_REMOVE_HEAD(&publisher->queue, entry);
+ ast_free(publisher->sending);
+ publisher->sending = NULL;
if (!param->rdata) {
ast_log(LOG_NOTICE, "No response received for outbound publish '%s'\n",
ast_sorcery_object_get_id(publish));
}
}
- if (AST_LIST_EMPTY(&client->queue)) {
- schedule_publish_refresh(client, param->expiration);
+ if (AST_LIST_EMPTY(&publisher->queue)) {
+ schedule_publish_refresh(publisher, param->expiration);
}
end:
- if (!client->client) {
+ if (!publisher->client) {
struct sip_outbound_publish_message *message;
- while ((message = AST_LIST_REMOVE_HEAD(&client->queue, entry))) {
+ while ((message = AST_LIST_REMOVE_HEAD(&publisher->queue, entry))) {
ast_free(message);
}
} else {
- if (ast_sip_push_task(NULL, sip_publish_client_service_queue, ao2_bump(client))) {
- ao2_ref(client, -1);
+ if (ast_sip_push_task(publisher->serializer, sip_publisher_service_queue, ao2_bump(publisher))) {
+ ao2_ref(publisher, -1);
}
}
}
return NULL;
}
- state->client->timer.user_data = state->client;
- state->client->timer.cb = sip_outbound_publish_timer_cb;
+ state->client->publishers = ao2_container_alloc(DATASTORE_BUCKETS, sip_outbound_publisher_hash_fn,
+ sip_outbound_publisher_cmp_fn);
+ if (!state->client->publishers) {
+ ao2_ref(state, -1);
+ return NULL;
+ }
+
state->client->publish = ao2_bump(publish);
strcpy(state->id, id);
return state;
}
-/*! \brief Apply function which finds or allocates a state structure */
-static int sip_outbound_publish_apply(const struct ast_sorcery *sorcery, void *obj)
+static int validate_publish_config(struct ast_sip_outbound_publish *publish)
{
- RAII_VAR(struct ao2_container *, states, ao2_global_obj_ref(current_states), ao2_cleanup);
- RAII_VAR(struct ast_sip_outbound_publish_state *, state, NULL, ao2_cleanup);
- struct ast_sip_outbound_publish *applied = obj;
-
- if (ast_strlen_zero(applied->server_uri)) {
+ if (ast_strlen_zero(publish->server_uri)) {
ast_log(LOG_ERROR, "No server URI specified on outbound publish '%s'\n",
- ast_sorcery_object_get_id(applied));
+ ast_sorcery_object_get_id(publish));
return -1;
- } else if (ast_strlen_zero(applied->event)) {
+ } else if (ast_strlen_zero(publish->event)) {
ast_log(LOG_ERROR, "No event type specified for outbound publish '%s'\n",
- ast_sorcery_object_get_id(applied));
+ ast_sorcery_object_get_id(publish));
return -1;
}
+ return 0;
+}
+
+static int current_state_reusable(struct ast_sip_outbound_publish *publish,
+ struct ast_sip_outbound_publish_state *current_state)
+{
+ struct ast_sip_outbound_publish *old_publish;
+ /*
+ * Don't maintain the old state/client objects if the multi_user option changed.
+ */
+ if ((!publish->multi_user && current_state->client->publish->multi_user) ||
+ (publish->multi_user && !current_state->client->publish->multi_user)) {
+ return 0;
+ }
+
+
+ if (!can_reuse_publish(current_state->client->publish, publish)) {
+ /*
+ * Something significant has changed in the configuration, so we are
+ * unable to use the old state object. The current state needs to go
+ * away and a new one needs to be created.
+ */
+ return 0;
+ }
+
+ /*
+ * We can reuse the current state object so keep it, but swap out the
+ * underlying publish object with the new one.
+ */
+ old_publish = current_state->client->publish;
+ current_state->client->publish = publish;
+ if (ast_sip_push_task_synchronous(
+ NULL, sip_outbound_publisher_reinit_all, current_state->client->publishers)) {
+ /*
+ * If the state object fails to re-initialize then swap
+ * the old publish info back in.
+ */
+ current_state->client->publish = publish;
+ ast_log(LOG_ERROR, "Unable to reinitialize client(s) for outbound publish '%s'\n",
+ ast_sorcery_object_get_id(current_state->client->publish));
+ return -1;
+ }
+
+ /*
+ * Since we swapped out the publish object the new one needs a ref
+ * while the old one needs to go away.
+ */
+ ao2_ref(current_state->client->publish, +1);
+ ao2_cleanup(old_publish);
+
+ /* Tell the caller that the current state object should be used */
+ return 1;
+}
+
+/*! \brief Apply function which finds or allocates a state structure */
+static int sip_outbound_publish_apply(const struct ast_sorcery *sorcery, void *obj)
+{
+#define ADD_TO_NEW_STATES(__obj) \
+ do { if (__obj) { \
+ ao2_link(new_states, __obj); \
+ ao2_ref(__obj, -1); } } while (0)
+
+ struct ast_sip_outbound_publish *applied = obj;
+ struct ast_sip_outbound_publish_state *current_state, *new_state;
+ struct sip_outbound_publisher *publisher = NULL;
+ int res;
+
+ /*
+ * New states are being loaded or reloaded. We'll need to add the new
+ * object if created/updated, or keep the old object if an error occurs.
+ */
if (!new_states) {
- /* make sure new_states has been allocated as we will be adding to it */
new_states = ao2_container_alloc_options(
AO2_ALLOC_OPT_LOCK_NOLOCK, DEFAULT_STATE_BUCKETS,
outbound_publish_state_hash, outbound_publish_state_cmp);
}
}
- if (states) {
- state = ao2_find(states, ast_sorcery_object_get_id(obj), OBJ_SEARCH_KEY);
- if (state) {
- if (can_reuse_publish(state->client->publish, applied)) {
- ao2_replace(state->client->publish, applied);
- } else {
- ao2_ref(state, -1);
- state = NULL;
- }
- }
+ /* If there is current state we'll want to maintain it if any errors occur */
+ current_state = sip_publish_state_get(ast_sorcery_object_get_id(applied));
+
+ if ((res = validate_publish_config(applied))) {
+ ADD_TO_NEW_STATES(current_state);
+ return res;
}
- if (!state) {
- state = sip_outbound_publish_state_alloc(applied);
- if (!state) {
- ast_log(LOG_ERROR, "Unable to create state for outbound publish '%s'\n",
- ast_sorcery_object_get_id(applied));
- return -1;
- };
+ if (current_state && (res = current_state_reusable(applied, current_state))) {
+ /*
+ * The current state object was able to be reused, or an error
+ * occurred. Either way we keep the current state and be done.
+ */
+ ADD_TO_NEW_STATES(current_state);
+ return res == 1 ? 0 : -1;
}
- if (ast_sip_push_task_synchronous(NULL, sip_outbound_publish_client_alloc, state->client)) {
- ast_log(LOG_ERROR, "Unable to create client for outbound publish '%s'\n",
+ /*
+ * No current state was found or it was unable to be reused. Either way
+ * we'll need to create a new state object.
+ */
+ new_state = sip_outbound_publish_state_alloc(applied);
+ if (!new_state) {
+ ast_log(LOG_ERROR, "Unable to create state for outbound publish '%s'\n",
ast_sorcery_object_get_id(applied));
+ ADD_TO_NEW_STATES(current_state);
+ return -1;
+ };
+
+ if (!applied->multi_user &&
+ !(publisher = sip_outbound_publish_client_add_publisher(new_state->client, NULL))) {
+ ADD_TO_NEW_STATES(current_state);
+ ao2_ref(new_state, -1);
return -1;
}
+ ao2_cleanup(publisher);
- ao2_link(new_states, state);
- return 0;
+ ADD_TO_NEW_STATES(new_state);
+ ao2_cleanup(current_state);
+ return res;
}
static int outbound_auth_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
return ast_sip_auth_vector_init(&publish->outbound_auths, var->value);
}
+
+static int unload_module(void)
+{
+ int remaining;
+
+ ast_sorcery_object_unregister(ast_sip_get_sorcery(), "outbound-publish");
+
+ ao2_global_obj_release(current_states);
+
+ /* Wait for publication serializers to get destroyed. */
+ ast_debug(2, "Waiting for publication to complete for unload.\n");
+ remaining = ast_serializer_shutdown_group_join(shutdown_group, MAX_UNLOAD_TIMEOUT_TIME);
+ if (remaining) {
+ ast_log(LOG_WARNING, "Unload incomplete. Could not stop %d outbound publications. Try again later.\n",
+ remaining);
+ return -1;
+ }
+
+ ast_debug(2, "Successful shutdown.\n");
+
+ ao2_cleanup(shutdown_group);
+ shutdown_group = NULL;
+
+ return 0;
+}
+
static int load_module(void)
{
CHECK_PJSIP_MODULE_LOADED();
+ /* As of pjproject 2.4.5, PJSIP_MAX_URL_SIZE isn't exposed yet but we try anyway. */
+ ast_pjproject_get_buildopt("PJSIP_MAX_URL_SIZE", "%d", &pjsip_max_url_size);
+
+ shutdown_group = ast_serializer_shutdown_group_alloc();
+ if (!shutdown_group) {
+ return AST_MODULE_LOAD_FAILURE;
+ }
+
ast_sorcery_apply_config(ast_sip_get_sorcery(), "res_pjsip_outbound_publish");
ast_sorcery_apply_default(ast_sip_get_sorcery(), "outbound-publish", "config", "pjsip.conf,criteria=type=outbound-publish");
if (ast_sorcery_object_register(ast_sip_get_sorcery(), "outbound-publish", sip_outbound_publish_alloc, NULL,
sip_outbound_publish_apply)) {
+ ast_log(LOG_ERROR, "Unable to register 'outbound-publish' type with sorcery\n");
+ unload_module();
return AST_MODULE_LOAD_DECLINE;
}
ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "outbound_proxy", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_outbound_publish, outbound_proxy));
ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "expiration", "3600", OPT_UINT_T, 0, FLDSET(struct ast_sip_outbound_publish, expiration));
ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "max_auth_attempts", "5", OPT_UINT_T, 0, FLDSET(struct ast_sip_outbound_publish, max_auth_attempts));
+ ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "transport", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_outbound_publish, transport));
ast_sorcery_object_field_register_custom(ast_sip_get_sorcery(), "outbound-publish", "outbound_auth", "", outbound_auth_handler, NULL, NULL, 0, 0);
+ ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "multi_user", "no", OPT_BOOL_T, 1, FLDSET(struct ast_sip_outbound_publish, multi_user));
ast_sorcery_reload_object(ast_sip_get_sorcery(), "outbound-publish");
return 0;
}
-static int unload_module(void)
-{
- struct timeval start = ast_tvnow();
- struct timespec end = {
- .tv_sec = start.tv_sec + 10,
- .tv_nsec = start.tv_usec * 1000
- };
- int res = 0;
- struct ao2_container *states = ao2_global_obj_ref(current_states);
-
- if (!states || !(unloading.count = ao2_container_count(states))) {
- return 0;
- }
- ao2_ref(states, -1);
-
- ast_mutex_init(&unloading.lock);
- ast_cond_init(&unloading.cond, NULL);
- ast_mutex_lock(&unloading.lock);
-
- unloading.is_unloading = 1;
- ao2_global_obj_release(current_states);
-
- /* wait for items to unpublish */
- ast_verb(5, "Waiting to complete unpublishing task(s)\n");
- while (unloading.count) {
- res = ast_cond_timedwait(&unloading.cond, &unloading.lock, &end);
- }
- ast_mutex_unlock(&unloading.lock);
-
- ast_mutex_destroy(&unloading.lock);
- ast_cond_destroy(&unloading.cond);
-
- if (res) {
- ast_verb(5, "At least %d items were unable to unpublish "
- "in the allowed time\n", unloading.count);
- } else {
- ast_verb(5, "All items successfully unpublished\n");
- }
-
- return res;
-}
-
AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS | AST_MODFLAG_LOAD_ORDER, "PJSIP Outbound Publish Support",
- .load = load_module,
- .reload = reload_module,
- .unload = unload_module,
- .load_pri = AST_MODPRI_CHANNEL_DEPEND,
- );
+ .load = load_module,
+ .reload = reload_module,
+ .unload = unload_module,
+ .load_pri = AST_MODPRI_CHANNEL_DEPEND,
+);