2 * Asterisk -- An open source telephony toolkit.
4 * Copyright (C) 2014, Digium, Inc.
6 * Joshua Colp <jcolp@digium.com>
8 * See http://www.asterisk.org for more information about
9 * the Asterisk project. Please do not directly contact
10 * any of the maintainers of this project for assistance;
11 * the project provides a web site, mailing lists and IRC
12 * channels for your use.
14 * This program is free software, distributed under the terms of
15 * the GNU General Public License Version 2. See the LICENSE file
16 * at the top of the source tree.
20 <depend>pjproject</depend>
21 <depend>res_pjsip</depend>
22 <support_level>core</support_level>
28 #include <pjsip_simple.h>
30 #include "asterisk/res_pjsip.h"
31 #include "asterisk/res_pjsip_outbound_publish.h"
32 #include "asterisk/module.h"
33 #include "asterisk/taskprocessor.h"
34 #include "asterisk/datastore.h"
37 <configInfo name="res_pjsip_outbound_publish" language="en_US">
38 <synopsis>SIP resource for outbound publish</synopsis>
40 <emphasis>Outbound Publish</emphasis>
42 <para>This module allows <literal>res_pjsip</literal> to publish to other SIP servers.</para>
44 <configFile name="pjsip.conf">
45 <configObject name="outbound-publish">
46 <synopsis>The configuration for outbound publish</synopsis>
48 Publish is <emphasis>COMPLETELY</emphasis> separate from the rest of
49 <literal>pjsip.conf</literal>. A minimal configuration consists of
50 setting a <literal>server_uri</literal> and <literal>event</literal>.
52 <configOption name="expiration" default="3600">
53 <synopsis>Expiration time for publications in seconds</synopsis>
55 <configOption name="outbound_auth" default="">
56 <synopsis>Authentication object to be used for outbound publishes.</synopsis>
58 <configOption name="outbound_proxy" default="">
59 <synopsis>SIP URI of the outbound proxy used to send publishes</synopsis>
61 <configOption name="server_uri">
62 <synopsis>SIP URI of the server and entity to publish to</synopsis>
64 This is the URI at which to find the entity and server to send the outbound PUBLISH to.
65 This URI is used as the request URI of the outbound PUBLISH request from Asterisk.
68 <configOption name="from_uri">
69 <synopsis>SIP URI to use in the From header</synopsis>
71 This is the URI that will be placed into the From header of outgoing PUBLISH
72 messages. If no URI is specified then the URI provided in <literal>server_uri</literal>
76 <configOption name="to_uri">
77 <synopsis>SIP URI to use in the To header</synopsis>
79 This is the URI that will be placed into the To header of outgoing PUBLISH
80 messages. If no URI is specified then the URI provided in <literal>server_uri</literal>
84 <configOption name="event" default="">
85 <synopsis>Event type of the PUBLISH.</synopsis>
87 <configOption name="max_auth_attempts" default="5">
88 <synopsis>Maximum number of authentication attempts before stopping the publication.</synopsis>
90 <configOption name="type">
91 <synopsis>Must be of type 'outbound-publish'.</synopsis>
98 /*! \brief Queued outbound publish message */
99 struct sip_outbound_publish_message {
100 /*! \brief Optional body */
101 struct ast_sip_body body;
102 /*! \brief Linked list information */
103 AST_LIST_ENTRY(sip_outbound_publish_message) entry;
104 /*! \brief Extra space for body contents */
105 char body_contents[0];
108 /*! \brief Outbound publish information */
109 struct ast_sip_outbound_publish {
110 /*! \brief Sorcery object details */
111 SORCERY_OBJECT(details);
112 /*! \brief Stringfields */
113 AST_DECLARE_STRING_FIELDS(
114 /*! \brief URI for the entity and server */
115 AST_STRING_FIELD(server_uri);
116 /*! \brief URI for the From header */
117 AST_STRING_FIELD(from_uri);
118 /*! \brief URI for the To header */
119 AST_STRING_FIELD(to_uri);
120 /*! \brief Outbound proxy to use */
121 AST_STRING_FIELD(outbound_proxy);
122 /*! \brief The event type to publish */
123 AST_STRING_FIELD(event);
125 /*! \brief Requested expiration time */
126 unsigned int expiration;
127 /*! \brief Maximum number of auth attempts before stopping the publish client */
128 unsigned int max_auth_attempts;
129 /*! \brief Configured authentication credentials */
130 struct ast_sip_auth_vector outbound_auths;
133 /*! \brief Outbound publish client state information (persists for lifetime that publish should exist) */
134 struct ast_sip_outbound_publish_client {
135 /*! \brief Underlying publish client */
136 pjsip_publishc *client;
137 /*! \brief Timer entry for refreshing publish */
138 pj_timer_entry timer;
139 /*! \brief Publisher datastores set up by handlers */
140 struct ao2_container *datastores;
141 /*! \brief The number of auth attempts done */
142 unsigned int auth_attempts;
143 /*! \brief Queue of outgoing publish messages to send*/
144 AST_LIST_HEAD_NOLOCK(, sip_outbound_publish_message) queue;
145 /*! \brief The message currently being sent */
146 struct sip_outbound_publish_message *sending;
147 /*! \brief Publish client has been fully started and event type informed */
148 unsigned int started;
149 /*! \brief Publish client should be destroyed */
150 unsigned int destroy;
151 /*! \brief Outbound publish information */
152 struct ast_sip_outbound_publish *publish;
155 /*! \brief Outbound publish state information (persists for lifetime of a publish) */
156 struct ast_sip_outbound_publish_state {
157 /*! \brief Outbound publish client */
158 struct ast_sip_outbound_publish_client *client;
159 /* publish state id lookup key - same as publish configuration id */
163 /*! \brief Unloading data */
164 struct unloading_data {
171 /*! \brief Default number of client state container buckets */
172 #define DEFAULT_STATE_BUCKETS 31
173 static AO2_GLOBAL_OBJ_STATIC(current_states);
174 /*! \brief Used on [re]loads to hold new state data */
175 static struct ao2_container *new_states;
177 /*! \brief hashing function for state objects */
178 static int outbound_publish_state_hash(const void *obj, const int flags)
180 const struct ast_sip_outbound_publish_state *object;
183 switch (flags & OBJ_SEARCH_MASK) {
187 case OBJ_SEARCH_OBJECT:
195 return ast_str_hash(key);
198 /*! \brief comparator function for client objects */
199 static int outbound_publish_state_cmp(void *obj, void *arg, int flags)
201 const struct ast_sip_outbound_publish_state *object_left = obj;
202 const struct ast_sip_outbound_publish_state *object_right = arg;
203 const char *right_key = arg;
206 switch (flags & OBJ_SEARCH_MASK) {
207 case OBJ_SEARCH_OBJECT:
208 right_key = object_right->id;
211 cmp = strcmp(object_left->id, right_key);
213 case OBJ_SEARCH_PARTIAL_KEY:
214 /* Not supported by container. */
227 static struct ao2_container *get_publishes_and_update_state(void)
229 struct ao2_container *container;
231 container = ast_sorcery_retrieve_by_fields(
232 ast_sip_get_sorcery(), "outbound-publish",
233 AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL);
239 ao2_global_obj_replace_unref(current_states, new_states);
240 ao2_cleanup(new_states);
246 AST_RWLIST_HEAD_STATIC(publisher_handlers, ast_sip_event_publisher_handler);
248 static void sub_add_handler(struct ast_sip_event_publisher_handler *handler)
250 AST_RWLIST_INSERT_TAIL(&publisher_handlers, handler, next);
251 ast_module_ref(ast_module_info->self);
254 static struct ast_sip_event_publisher_handler *find_publisher_handler_for_event_name(const char *event_name)
256 struct ast_sip_event_publisher_handler *iter;
258 AST_RWLIST_TRAVERSE(&publisher_handlers, iter, next) {
259 if (!strcmp(iter->event_name, event_name)) {
266 /*! \brief Helper function which cancels the refresh timer on a client */
267 static void cancel_publish_refresh(struct ast_sip_outbound_publish_client *client)
269 if (pj_timer_heap_cancel(pjsip_endpt_get_timer_heap(ast_sip_get_pjsip_endpoint()), &client->timer)) {
270 /* The timer was successfully cancelled, drop the refcount of the client */
275 /*! \brief Helper function which sets up the timer to send publication */
276 static void schedule_publish_refresh(struct ast_sip_outbound_publish_client *client, int expiration)
278 struct ast_sip_outbound_publish *publish = ao2_bump(client->publish);
279 pj_time_val delay = { .sec = 0, };
281 cancel_publish_refresh(client);
283 if (expiration > 0) {
284 delay.sec = expiration - PJSIP_PUBLISHC_DELAY_BEFORE_REFRESH;
286 if (publish->expiration && ((delay.sec > publish->expiration) || !delay.sec)) {
287 delay.sec = publish->expiration;
289 if (delay.sec < PJSIP_PUBLISHC_DELAY_BEFORE_REFRESH) {
290 delay.sec = PJSIP_PUBLISHC_DELAY_BEFORE_REFRESH;
294 if (pjsip_endpt_schedule_timer(ast_sip_get_pjsip_endpoint(), &client->timer, &delay) != PJ_SUCCESS) {
295 ast_log(LOG_WARNING, "Failed to pass timed publish refresh to scheduler\n");
298 ao2_ref(publish, -1);
301 /*! \brief Publish client timer callback function */
302 static void sip_outbound_publish_timer_cb(pj_timer_heap_t *timer_heap, struct pj_timer_entry *entry)
304 struct ast_sip_outbound_publish_client *client = entry->user_data;
307 if (AST_LIST_EMPTY(&client->queue)) {
308 /* If there are no outstanding messages send an empty PUBLISH message so our publication doesn't expire */
309 ast_sip_publish_client_send(client, NULL);
316 /*! \brief Task for cancelling a refresh timer */
317 static int cancel_refresh_timer_task(void *data)
319 struct ast_sip_outbound_publish_client *client = data;
321 cancel_publish_refresh(client);
327 /*! \brief Task for sending an unpublish */
328 static int send_unpublish_task(void *data)
330 struct ast_sip_outbound_publish_client *client = data;
331 pjsip_tx_data *tdata;
333 if (pjsip_publishc_unpublish(client->client, &tdata) == PJ_SUCCESS) {
334 pjsip_publishc_send(client->client, tdata);
342 /*! \brief Helper function which starts or stops publish clients when applicable */
343 static void sip_outbound_publish_synchronize(struct ast_sip_event_publisher_handler *removed)
345 RAII_VAR(struct ao2_container *, publishes, get_publishes_and_update_state(), ao2_cleanup);
346 struct ao2_container *states;
347 struct ao2_iterator i;
348 struct ast_sip_outbound_publish_state *state;
354 states = ao2_global_obj_ref(current_states);
359 i = ao2_iterator_init(states, 0);
360 while ((state = ao2_iterator_next(&i))) {
361 struct ast_sip_outbound_publish *publish = ao2_bump(state->client->publish);
362 struct ast_sip_event_publisher_handler *handler = find_publisher_handler_for_event_name(publish->event);
364 if (!state->client->started) {
365 /* If the publisher client has not yet been started try to start it */
367 ast_debug(2, "Could not find handler for event '%s' for outbound publish client '%s'\n",
368 publish->event, ast_sorcery_object_get_id(publish));
369 } else if (handler->start_publishing(publish, state->client)) {
370 ast_log(LOG_ERROR, "Failed to start outbound publish with event '%s' for client '%s'\n",
371 publish->event, ast_sorcery_object_get_id(publish));
373 state->client->started = 1;
375 } else if (state->client->started && !handler && removed && !strcmp(publish->event, removed->event_name)) {
376 /* If the publisher client has been started but it is going away stop it */
377 removed->stop_publishing(state->client);
378 state->client->started = 0;
379 if (ast_sip_push_task(NULL, cancel_refresh_timer_task, ao2_bump(state->client))) {
380 ast_log(LOG_WARNING, "Could not stop refresh timer on client '%s'\n",
381 ast_sorcery_object_get_id(publish));
382 ao2_ref(state->client, -1);
385 ao2_ref(publish, -1);
388 ao2_iterator_destroy(&i);
392 struct ast_sip_outbound_publish_client *ast_sip_publish_client_get(const char *name)
394 RAII_VAR(struct ao2_container *, states,
395 ao2_global_obj_ref(current_states), ao2_cleanup);
396 RAII_VAR(struct ast_sip_outbound_publish_state *, state, NULL, ao2_cleanup);
402 state = ao2_find(states, name, OBJ_SEARCH_KEY);
407 ao2_ref(state->client, +1);
408 return state->client;
411 int ast_sip_register_event_publisher_handler(struct ast_sip_event_publisher_handler *handler)
413 struct ast_sip_event_publisher_handler *existing;
414 SCOPED_LOCK(lock, &publisher_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
416 if (!handler->start_publishing || !handler->stop_publishing) {
417 ast_log(LOG_ERROR, "Handler does not implement required callbacks. Cannot register\n");
419 } else if (ast_strlen_zero(handler->event_name)) {
420 ast_log(LOG_ERROR, "No event package specified for event publisher handler. Cannot register\n");
424 existing = find_publisher_handler_for_event_name(handler->event_name);
426 ast_log(LOG_ERROR, "Unable to register event publisher handler for event %s. "
427 "A handler is already registered\n", handler->event_name);
431 sub_add_handler(handler);
433 sip_outbound_publish_synchronize(NULL);
438 void ast_sip_unregister_event_publisher_handler(struct ast_sip_event_publisher_handler *handler)
440 struct ast_sip_event_publisher_handler *iter;
441 SCOPED_LOCK(lock, &publisher_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
442 AST_RWLIST_TRAVERSE_SAFE_BEGIN(&publisher_handlers, iter, next) {
443 if (handler == iter) {
444 AST_RWLIST_REMOVE_CURRENT(next);
445 ast_module_unref(ast_module_info->self);
449 AST_RWLIST_TRAVERSE_SAFE_END;
451 sip_outbound_publish_synchronize(handler);
454 /*! \brief Destructor function for publish information */
455 static void sip_outbound_publish_destroy(void *obj)
457 struct ast_sip_outbound_publish *publish = obj;
459 ast_sip_auth_vector_destroy(&publish->outbound_auths);
461 ast_string_field_free_memory(publish);
464 /*! \brief Allocator function for publish information */
465 static void *sip_outbound_publish_alloc(const char *name)
467 struct ast_sip_outbound_publish *publish = ast_sorcery_generic_alloc(sizeof(*publish),
468 sip_outbound_publish_destroy);
470 if (!publish || ast_string_field_init(publish, 256)) {
471 ao2_cleanup(publish);
478 static void sip_outbound_publish_datastore_destroy(void *obj)
480 struct ast_datastore *datastore = obj;
482 /* Using the destroy function (if present) destroy the data */
483 if (datastore->info->destroy != NULL && datastore->data != NULL) {
484 datastore->info->destroy(datastore->data);
485 datastore->data = NULL;
488 ast_free((void *) datastore->uid);
489 datastore->uid = NULL;
492 struct ast_datastore *ast_sip_publish_client_alloc_datastore(const struct ast_datastore_info *info, const char *uid)
494 RAII_VAR(struct ast_datastore *, datastore, NULL, ao2_cleanup);
495 const char *uid_ptr = uid;
496 char uuid_buf[AST_UUID_STR_LEN];
502 datastore = ao2_alloc(sizeof(*datastore), sip_outbound_publish_datastore_destroy);
507 datastore->info = info;
508 if (ast_strlen_zero(uid)) {
509 /* They didn't provide an ID so we'll provide one ourself */
510 uid_ptr = ast_uuid_generate_str(uuid_buf, sizeof(uuid_buf));
513 datastore->uid = ast_strdup(uid_ptr);
514 if (!datastore->uid) {
518 ao2_ref(datastore, +1);
522 int ast_sip_publish_client_add_datastore(struct ast_sip_outbound_publish_client *client,
523 struct ast_datastore *datastore)
525 ast_assert(datastore != NULL);
526 ast_assert(datastore->info != NULL);
527 ast_assert(!ast_strlen_zero(datastore->uid));
529 if (!ao2_link(client->datastores, datastore)) {
535 struct ast_datastore *ast_sip_publish_client_get_datastore(struct ast_sip_outbound_publish_client *client,
538 return ao2_find(client->datastores, name, OBJ_SEARCH_KEY);
541 void ast_sip_publish_client_remove_datastore(struct ast_sip_outbound_publish_client *client,
544 ao2_find(client->datastores, name, OBJ_SEARCH_KEY | OBJ_UNLINK | OBJ_NODATA);
547 static int sip_publish_client_service_queue(void *data)
549 RAII_VAR(struct ast_sip_outbound_publish_client *, client, data, ao2_cleanup);
550 SCOPED_AO2LOCK(lock, client);
551 struct sip_outbound_publish_message *message;
552 pjsip_tx_data *tdata;
555 if (client->destroy || client->sending || !(message = AST_LIST_FIRST(&client->queue))) {
559 if (pjsip_publishc_publish(client->client, PJ_FALSE, &tdata) != PJ_SUCCESS) {
563 if (!ast_strlen_zero(message->body.type) && !ast_strlen_zero(message->body.subtype) &&
564 ast_sip_add_body(tdata, &message->body)) {
565 pjsip_tx_data_dec_ref(tdata);
569 status = pjsip_publishc_send(client->client, tdata);
570 if (status == PJ_EBUSY) {
571 /* We attempted to send the message but something else got there first */
573 } else if (status != PJ_SUCCESS) {
577 client->sending = message;
582 AST_LIST_REMOVE_HEAD(&client->queue, entry);
586 if (ast_sip_push_task(NULL, sip_publish_client_service_queue, ao2_bump(client))) {
592 int ast_sip_publish_client_send(struct ast_sip_outbound_publish_client *client,
593 const struct ast_sip_body *body)
595 SCOPED_AO2LOCK(lock, client);
596 struct sip_outbound_publish_message *message;
597 size_t type_len = 0, subtype_len = 0, body_text_len = 0;
600 if (!client->client) {
604 /* If a body is present we need more space for the contents of it */
606 type_len = strlen(body->type) + 1;
607 subtype_len = strlen(body->subtype) + 1;
608 body_text_len = strlen(body->body_text) + 1;
611 message = ast_calloc(1, sizeof(*message) + type_len + subtype_len + body_text_len);
617 char *dst = message->body_contents;
619 message->body.type = strcpy(dst, body->type);
621 message->body.subtype = strcpy(dst, body->subtype);
623 message->body.body_text = strcpy(dst, body->body_text);
626 AST_LIST_INSERT_TAIL(&client->queue, message, entry);
628 res = ast_sip_push_task(NULL, sip_publish_client_service_queue, ao2_bump(client));
636 /*! \brief Destructor function for publish client */
637 static void sip_outbound_publish_client_destroy(void *obj)
639 struct ast_sip_outbound_publish_client *client = obj;
640 struct sip_outbound_publish_message *message;
642 /* You might be tempted to think "the publish client isn't being destroyed" but it actually is - just elsewhere */
644 while ((message = AST_LIST_REMOVE_HEAD(&client->queue, entry))) {
648 ao2_cleanup(client->datastores);
649 ao2_cleanup(client->publish);
651 /* if unloading the module and all objects have been unpublished
652 send the signal to finish unloading */
653 if (unloading.is_unloading) {
654 ast_mutex_lock(&unloading.lock);
655 if (--unloading.count == 0) {
656 ast_cond_signal(&unloading.cond);
658 ast_mutex_unlock(&unloading.lock);
662 static int explicit_publish_destroy(void *data)
664 struct ast_sip_outbound_publish_client *client = data;
666 pjsip_publishc_destroy(client->client);
672 /*! \brief Helper function which cancels and un-publishes a no longer used client */
673 static int cancel_and_unpublish(struct ast_sip_outbound_publish_client *client)
675 struct ast_sip_event_publisher_handler *handler;
676 SCOPED_AO2LOCK(lock, client);
678 if (!client->started) {
679 /* If the client was never started, there's nothing to unpublish, so just
680 * destroy the publication and remove its reference to the client.
682 ast_sip_push_task(NULL, explicit_publish_destroy, client);
686 handler = find_publisher_handler_for_event_name(client->publish->event);
688 handler->stop_publishing(client);
692 if (ast_sip_push_task(NULL, cancel_refresh_timer_task, ao2_bump(client))) {
693 ast_log(LOG_WARNING, "Could not stop refresh timer on outbound publish '%s'\n",
694 ast_sorcery_object_get_id(client->publish));
698 /* If nothing is being sent right now send the unpublish - the destroy will happen in the subsequent callback */
699 if (!client->sending) {
700 if (ast_sip_push_task(NULL, send_unpublish_task, ao2_bump(client))) {
701 ast_log(LOG_WARNING, "Could not send unpublish message on outbound publish '%s'\n",
702 ast_sorcery_object_get_id(client->publish));
710 /*! \brief Destructor function for publish state */
711 static void sip_outbound_publish_state_destroy(void *obj)
713 struct ast_sip_outbound_publish_state *state = obj;
715 cancel_and_unpublish(state->client);
716 ao2_cleanup(state->client);
721 * \brief Check if a publish can be reused
723 * This checks if the existing outbound publish's configuration differs from a newly-applied
726 * \param existing The pre-existing outbound publish
727 * \param applied The newly-created publish
729 static int can_reuse_publish(struct ast_sip_outbound_publish *existing, struct ast_sip_outbound_publish *applied)
733 if (strcmp(existing->server_uri, applied->server_uri) || strcmp(existing->from_uri, applied->from_uri) ||
734 strcmp(existing->to_uri, applied->to_uri) || strcmp(existing->outbound_proxy, applied->outbound_proxy) ||
735 strcmp(existing->event, applied->event) ||
736 AST_VECTOR_SIZE(&existing->outbound_auths) != AST_VECTOR_SIZE(&applied->outbound_auths)) {
740 for (i = 0; i < AST_VECTOR_SIZE(&existing->outbound_auths); ++i) {
741 if (strcmp(AST_VECTOR_GET(&existing->outbound_auths, i), AST_VECTOR_GET(&applied->outbound_auths, i))) {
749 static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param);
751 /*! \brief Helper function that allocates a pjsip publish client and configures it */
752 static int sip_outbound_publish_client_alloc(void *data)
754 struct ast_sip_outbound_publish_client *client = data;
755 RAII_VAR(struct ast_sip_outbound_publish *, publish, NULL, ao2_cleanup);
756 pjsip_publishc_opt opt = {
757 .queue_request = PJ_FALSE,
759 pj_str_t event, server_uri, to_uri, from_uri;
762 if (client->client) {
764 } else if (pjsip_publishc_create(ast_sip_get_pjsip_endpoint(), &opt, ao2_bump(client), sip_outbound_publish_callback,
765 &client->client) != PJ_SUCCESS) {
770 publish = ao2_bump(client->publish);
772 if (!ast_strlen_zero(publish->outbound_proxy)) {
773 pjsip_route_hdr route_set, *route;
774 static const pj_str_t ROUTE_HNAME = { "Route", 5 };
776 pj_list_init(&route_set);
778 if (!(route = pjsip_parse_hdr(pjsip_publishc_get_pool(client->client), &ROUTE_HNAME,
779 (char*)publish->outbound_proxy, strlen(publish->outbound_proxy), NULL))) {
780 pjsip_publishc_destroy(client->client);
783 pj_list_insert_nodes_before(&route_set, route);
785 pjsip_publishc_set_route_set(client->client, &route_set);
788 pj_cstr(&event, publish->event);
789 pj_cstr(&server_uri, publish->server_uri);
790 pj_cstr(&to_uri, S_OR(publish->to_uri, publish->server_uri));
791 pj_cstr(&from_uri, S_OR(publish->from_uri, publish->server_uri));
793 status = pjsip_publishc_init(client->client, &event, &server_uri, &from_uri, &to_uri,
794 publish->expiration);
795 if (status == PJSIP_EINVALIDURI) {
800 pool = pjsip_endpt_create_pool(ast_sip_get_pjsip_endpoint(), "URI Validation", 256, 256);
802 ast_log(LOG_ERROR, "Could not create pool for URI validation on outbound publish '%s'\n",
803 ast_sorcery_object_get_id(publish));
804 pjsip_publishc_destroy(client->client);
808 pj_strdup2_with_null(pool, &tmp, publish->server_uri);
809 uri = pjsip_parse_uri(pool, tmp.ptr, tmp.slen, 0);
811 ast_log(LOG_ERROR, "Invalid server URI '%s' specified on outbound publish '%s'\n",
812 publish->server_uri, ast_sorcery_object_get_id(publish));
815 if (!ast_strlen_zero(publish->to_uri)) {
816 pj_strdup2_with_null(pool, &tmp, publish->to_uri);
817 uri = pjsip_parse_uri(pool, tmp.ptr, tmp.slen, 0);
819 ast_log(LOG_ERROR, "Invalid to URI '%s' specified on outbound publish '%s'\n",
820 publish->to_uri, ast_sorcery_object_get_id(publish));
824 if (!ast_strlen_zero(publish->from_uri)) {
825 pj_strdup2_with_null(pool, &tmp, publish->from_uri);
826 uri = pjsip_parse_uri(pool, tmp.ptr, tmp.slen, 0);
828 ast_log(LOG_ERROR, "Invalid from URI '%s' specified on outbound publish '%s'\n",
829 publish->from_uri, ast_sorcery_object_get_id(publish));
833 pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
834 pjsip_publishc_destroy(client->client);
836 } else if (status != PJ_SUCCESS) {
837 pjsip_publishc_destroy(client->client);
844 /*! \brief Callback function for publish client responses */
845 static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param)
847 RAII_VAR(struct ast_sip_outbound_publish_client *, client, ao2_bump(param->token), ao2_cleanup);
848 RAII_VAR(struct ast_sip_outbound_publish *, publish, ao2_bump(client->publish), ao2_cleanup);
849 SCOPED_AO2LOCK(lock, client);
850 pjsip_tx_data *tdata;
852 if (client->destroy) {
853 if (client->sending) {
854 client->sending = NULL;
856 if (!ast_sip_push_task(NULL, send_unpublish_task, ao2_bump(client))) {
859 ast_log(LOG_WARNING, "Could not send unpublish message on outbound publish '%s'\n",
860 ast_sorcery_object_get_id(publish));
863 /* Once the destroy is called this callback will not get called any longer, so drop the client ref */
864 pjsip_publishc_destroy(client->client);
869 if (param->code == 401 || param->code == 407) {
870 if (!ast_sip_create_request_with_auth(&publish->outbound_auths,
871 param->rdata, pjsip_rdata_get_tsx(param->rdata), &tdata)) {
872 pjsip_publishc_send(client->client, tdata);
874 client->auth_attempts++;
876 if (client->auth_attempts == publish->max_auth_attempts) {
877 pjsip_publishc_destroy(client->client);
878 client->client = NULL;
880 ast_log(LOG_ERROR, "Reached maximum number of PUBLISH authentication attempts on outbound publish '%s'\n",
881 ast_sorcery_object_get_id(publish));
888 client->auth_attempts = 0;
890 if (param->code == 412) {
891 pjsip_publishc_destroy(client->client);
892 client->client = NULL;
894 if (sip_outbound_publish_client_alloc(publish)) {
895 ast_log(LOG_ERROR, "Failed to create a new outbound publish client for '%s' on 412 response\n",
896 ast_sorcery_object_get_id(publish));
900 /* Setting this to NULL will cause a new PUBLISH to get created and sent for the same underlying body */
901 client->sending = NULL;
902 } else if (param->code == 423) {
903 /* Update the expiration with the new expiration time if available */
904 pjsip_expires_hdr *expires;
906 expires = pjsip_msg_find_hdr(param->rdata->msg_info.msg, PJSIP_H_MIN_EXPIRES, NULL);
907 if (!expires || !expires->ivalue) {
908 ast_log(LOG_ERROR, "Received 423 response on outbound publish '%s' without a Min-Expires header\n",
909 ast_sorcery_object_get_id(publish));
910 pjsip_publishc_destroy(client->client);
911 client->client = NULL;
915 pjsip_publishc_update_expires(client->client, expires->ivalue);
916 client->sending = NULL;
917 } else if (client->sending) {
918 /* Remove the message currently being sent so that when the queue is serviced another will get sent */
919 AST_LIST_REMOVE_HEAD(&client->queue, entry);
920 ast_free(client->sending);
921 client->sending = NULL;
923 ast_log(LOG_NOTICE, "No response received for outbound publish '%s'\n",
924 ast_sorcery_object_get_id(publish));
928 if (AST_LIST_EMPTY(&client->queue)) {
929 schedule_publish_refresh(client, param->expiration);
933 if (!client->client) {
934 struct sip_outbound_publish_message *message;
936 while ((message = AST_LIST_REMOVE_HEAD(&client->queue, entry))) {
940 if (ast_sip_push_task(NULL, sip_publish_client_service_queue, ao2_bump(client))) {
946 #define DATASTORE_BUCKETS 53
948 static int datastore_hash(const void *obj, int flags)
950 const struct ast_datastore *datastore;
953 switch (flags & OBJ_SEARCH_MASK) {
957 case OBJ_SEARCH_OBJECT:
959 uid = datastore->uid;
962 /* Hash can only work on something with a full key. */
967 return ast_str_hash(uid);
970 static int datastore_cmp(void *obj, void *arg, int flags)
972 const struct ast_datastore *object_left = obj;
973 const struct ast_datastore *object_right = arg;
974 const char *right_key = arg;
977 switch (flags & OBJ_SEARCH_MASK) {
978 case OBJ_SEARCH_OBJECT:
979 right_key = object_right->uid;
982 cmp = strcmp(object_left->uid, right_key);
984 case OBJ_SEARCH_PARTIAL_KEY:
985 cmp = strncmp(object_left->uid, right_key, strlen(right_key));
989 * What arg points to is specific to this traversal callback
990 * and has no special meaning to astobj2.
999 * At this point the traversal callback is identical to a sorted
1005 /*! \brief Allocator function for publish client */
1006 static struct ast_sip_outbound_publish_state *sip_outbound_publish_state_alloc(
1007 struct ast_sip_outbound_publish *publish)
1009 const char *id = ast_sorcery_object_get_id(publish);
1010 struct ast_sip_outbound_publish_state *state =
1011 ao2_alloc(sizeof(*state) + strlen(id) + 1, sip_outbound_publish_state_destroy);
1017 state->client = ao2_alloc(sizeof(*state->client), sip_outbound_publish_client_destroy);
1018 if (!state->client) {
1023 state->client->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp);
1024 if (!state->client->datastores) {
1029 state->client->timer.user_data = state->client;
1030 state->client->timer.cb = sip_outbound_publish_timer_cb;
1031 state->client->publish = ao2_bump(publish);
1033 strcpy(state->id, id);
1037 /*! \brief Apply function which finds or allocates a state structure */
1038 static int sip_outbound_publish_apply(const struct ast_sorcery *sorcery, void *obj)
1040 RAII_VAR(struct ao2_container *, states, ao2_global_obj_ref(current_states), ao2_cleanup);
1041 RAII_VAR(struct ast_sip_outbound_publish_state *, state, NULL, ao2_cleanup);
1042 struct ast_sip_outbound_publish *applied = obj;
1044 if (ast_strlen_zero(applied->server_uri)) {
1045 ast_log(LOG_ERROR, "No server URI specified on outbound publish '%s'\n",
1046 ast_sorcery_object_get_id(applied));
1048 } else if (ast_strlen_zero(applied->event)) {
1049 ast_log(LOG_ERROR, "No event type specified for outbound publish '%s'\n",
1050 ast_sorcery_object_get_id(applied));
1055 /* make sure new_states has been allocated as we will be adding to it */
1056 new_states = ao2_container_alloc_options(
1057 AO2_ALLOC_OPT_LOCK_NOLOCK, DEFAULT_STATE_BUCKETS,
1058 outbound_publish_state_hash, outbound_publish_state_cmp);
1061 ast_log(LOG_ERROR, "Unable to allocate new states container\n");
1067 state = ao2_find(states, ast_sorcery_object_get_id(obj), OBJ_SEARCH_KEY);
1069 if (can_reuse_publish(state->client->publish, applied)) {
1070 ao2_replace(state->client->publish, applied);
1079 state = sip_outbound_publish_state_alloc(applied);
1081 ast_log(LOG_ERROR, "Unable to create state for outbound publish '%s'\n",
1082 ast_sorcery_object_get_id(applied));
1087 if (ast_sip_push_task_synchronous(NULL, sip_outbound_publish_client_alloc, state->client)) {
1088 ast_log(LOG_ERROR, "Unable to create client for outbound publish '%s'\n",
1089 ast_sorcery_object_get_id(applied));
1093 ao2_link(new_states, state);
1097 static int outbound_auth_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
1099 struct ast_sip_outbound_publish *publish = obj;
1101 return ast_sip_auth_vector_init(&publish->outbound_auths, var->value);
1104 static int load_module(void)
1106 CHECK_PJSIP_MODULE_LOADED();
1108 ast_sorcery_apply_config(ast_sip_get_sorcery(), "res_pjsip_outbound_publish");
1109 ast_sorcery_apply_default(ast_sip_get_sorcery(), "outbound-publish", "config", "pjsip.conf,criteria=type=outbound-publish");
1111 if (ast_sorcery_object_register(ast_sip_get_sorcery(), "outbound-publish", sip_outbound_publish_alloc, NULL,
1112 sip_outbound_publish_apply)) {
1113 return AST_MODULE_LOAD_DECLINE;
1116 ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "type", "", OPT_NOOP_T, 0, 0);
1117 ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "server_uri", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_outbound_publish, server_uri));
1118 ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "from_uri", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_outbound_publish, from_uri));
1119 ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "event", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_outbound_publish, event));
1120 ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "to_uri", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_outbound_publish, to_uri));
1121 ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "outbound_proxy", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_outbound_publish, outbound_proxy));
1122 ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "expiration", "3600", OPT_UINT_T, 0, FLDSET(struct ast_sip_outbound_publish, expiration));
1123 ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "max_auth_attempts", "5", OPT_UINT_T, 0, FLDSET(struct ast_sip_outbound_publish, max_auth_attempts));
1124 ast_sorcery_object_field_register_custom(ast_sip_get_sorcery(), "outbound-publish", "outbound_auth", "", outbound_auth_handler, NULL, NULL, 0, 0);
1126 ast_sorcery_reload_object(ast_sip_get_sorcery(), "outbound-publish");
1128 AST_RWLIST_RDLOCK(&publisher_handlers);
1129 sip_outbound_publish_synchronize(NULL);
1130 AST_RWLIST_UNLOCK(&publisher_handlers);
1132 pjsip_publishc_init_module(ast_sip_get_pjsip_endpoint());
1134 return AST_MODULE_LOAD_SUCCESS;
1137 static int reload_module(void)
1139 ast_sorcery_reload_object(ast_sip_get_sorcery(), "outbound-publish");
1141 AST_RWLIST_RDLOCK(&publisher_handlers);
1142 sip_outbound_publish_synchronize(NULL);
1143 AST_RWLIST_UNLOCK(&publisher_handlers);
1147 static int unload_module(void)
1149 struct timeval start = ast_tvnow();
1150 struct timespec end = {
1151 .tv_sec = start.tv_sec + 10,
1152 .tv_nsec = start.tv_usec * 1000
1155 struct ao2_container *states = ao2_global_obj_ref(current_states);
1157 if (!states || !(unloading.count = ao2_container_count(states))) {
1160 ao2_ref(states, -1);
1162 ast_mutex_init(&unloading.lock);
1163 ast_cond_init(&unloading.cond, NULL);
1164 ast_mutex_lock(&unloading.lock);
1166 unloading.is_unloading = 1;
1167 ao2_global_obj_release(current_states);
1169 /* wait for items to unpublish */
1170 ast_verb(5, "Waiting to complete unpublishing task(s)\n");
1171 while (unloading.count) {
1172 res = ast_cond_timedwait(&unloading.cond, &unloading.lock, &end);
1174 ast_mutex_unlock(&unloading.lock);
1176 ast_mutex_destroy(&unloading.lock);
1177 ast_cond_destroy(&unloading.cond);
1180 ast_verb(5, "At least %d items were unable to unpublish "
1181 "in the allowed time\n", unloading.count);
1183 ast_verb(5, "All items successfully unpublished\n");
1189 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS | AST_MODFLAG_LOAD_ORDER, "PJSIP Outbound Publish Support",
1190 .load = load_module,
1191 .reload = reload_module,
1192 .unload = unload_module,
1193 .load_pri = AST_MODPRI_CHANNEL_DEPEND,