res_pjsip_outbound_publish: Add module which provides outbound PUBLISH support.
[asterisk/asterisk.git] / res / res_pjsip_outbound_publish.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2014, Digium, Inc.
5  *
6  * Joshua Colp <jcolp@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*** MODULEINFO
20         <depend>pjproject</depend>
21         <depend>res_pjsip</depend>
22         <support_level>core</support_level>
23  ***/
24
25 #include "asterisk.h"
26
27 #include <pjsip.h>
28 #include <pjsip_simple.h>
29
30 #include "asterisk/res_pjsip.h"
31 #include "asterisk/res_pjsip_outbound_publish.h"
32 #include "asterisk/module.h"
33 #include "asterisk/taskprocessor.h"
34 #include "asterisk/datastore.h"
35
36 /*** DOCUMENTATION
37         <configInfo name="res_pjsip_outbound_publish" language="en_US">
38                 <synopsis>SIP resource for outbound publish</synopsis>
39                 <description><para>
40                         <emphasis>Outbound Publish</emphasis>
41                         </para>
42                         <para>This module allows <literal>res_pjsip</literal> to publish to other SIP servers.</para>
43                 </description>
44                 <configFile name="pjsip.conf">
45                         <configObject name="outbound-publish">
46                                 <synopsis>The configuration for outbound publish</synopsis>
47                                 <description><para>
48                                         Publish is <emphasis>COMPLETELY</emphasis> separate from the rest of
49                                         <literal>pjsip.conf</literal>. A minimal configuration consists of
50                                         setting a <literal>server_uri</literal> and <literal>event</literal>.
51                                 </para></description>
52                                 <configOption name="expiration" default="3600">
53                                         <synopsis>Expiration time for publications in seconds</synopsis>
54                                 </configOption>
55                                 <configOption name="outbound_auth" default="">
56                                         <synopsis>Authentication object to be used for outbound publishes.</synopsis>
57                                 </configOption>
58                                 <configOption name="outbound_proxy" default="">
59                                         <synopsis>SIP URI of the outbound proxy used to send publishes</synopsis>
60                                 </configOption>
61                                 <configOption name="server_uri">
62                                         <synopsis>SIP URI of the server and entity to publish to</synopsis>
63                                         <description><para>
64                                                 This is the URI at which to find the entity and server to send the outbound PUBLISH to.
65                                                 This URI is used as the request URI of the outbound PUBLISH request from Asterisk.
66                                         </para></description>
67                                 </configOption>
68                                 <configOption name="from_uri">
69                                         <synopsis>SIP URI to use in the From header</synopsis>
70                                         <description><para>
71                                                 This is the URI that will be placed into the From header of outgoing PUBLISH
72                                                 messages. If no URI is specified then the URI provided in <literal>server_uri</literal>
73                                                 will be used.
74                                         </para></description>
75                                 </configOption>
76                                 <configOption name="to_uri">
77                                         <synopsis>SIP URI to use in the To header</synopsis>
78                                         <description><para>
79                                                 This is the URI that will be placed into the To header of outgoing PUBLISH
80                                                 messages. If no URI is specified then the URI provided in <literal>server_uri</literal>
81                                                 will be used.
82                                         </para></description>
83                                 </configOption>
84                                 <configOption name="event" default="">
85                                         <synopsis>Event type of the PUBLISH.</synopsis>
86                                 </configOption>
87                                 <configOption name="max_auth_attempts" default="5">
88                                         <synopsis>Maximum number of authentication attempts before stopping the publication.</synopsis>
89                                 </configOption>
90                                 <configOption name="type">
91                                         <synopsis>Must be of type 'outbound-publish'.</synopsis>
92                                 </configOption>
93                         </configObject>
94                 </configFile>
95         </configInfo>
96  ***/
97
98 /*! \brief Queued outbound publish message */
99 struct sip_outbound_publish_message {
100         /*! \brief Optional body */
101         struct ast_sip_body body;
102         /*! \brief Linked list information */
103         AST_LIST_ENTRY(sip_outbound_publish_message) entry;
104         /*! \brief Extra space for body contents */
105         char body_contents[0];
106 };
107
108 /*! \brief Outbound publish client state information (persists for lifetime that publish should exist) */
109 struct ast_sip_outbound_publish_client {
110         /*! \brief Underlying publish client */
111         pjsip_publishc *client;
112         /*! \brief Timer entry for refreshing publish */
113         pj_timer_entry timer;
114         /*! \brief Publisher datastores set up by handlers */
115         struct ao2_container *datastores;
116         /*! \brief The number of auth attempts done */
117         unsigned int auth_attempts;
118         /*! \brief Queue of outgoing publish messages to send*/
119         AST_LIST_HEAD_NOLOCK(, sip_outbound_publish_message) queue;
120         /*! \brief The message currently being sent */
121         struct sip_outbound_publish_message *sending;
122         /*! \brief Publish client has been fully started and event type informed */
123         unsigned int started;
124         /*! \brief Publish client should be destroyed */
125         unsigned int destroy;
126 };
127
128 /*! \brief Outbound publish information */
129 struct ast_sip_outbound_publish {
130         /*! \brief Sorcery object details */
131         SORCERY_OBJECT(details);
132         /*! \brief Stringfields */
133         AST_DECLARE_STRING_FIELDS(
134                 /*! \brief URI for the entity and server */
135                 AST_STRING_FIELD(server_uri);
136                 /*! \brief URI for the From header */
137                 AST_STRING_FIELD(from_uri);
138                 /*! \brief URI for the To header */
139                 AST_STRING_FIELD(to_uri);
140                 /*! \brief Outbound proxy to use */
141                 AST_STRING_FIELD(outbound_proxy);
142                 /*! \brief The event type to publish */
143                 AST_STRING_FIELD(event);
144         );
145         /*! \brief Requested expiration time */
146         unsigned int expiration;
147         /*! \brief Maximum number of auth attempts before stopping the publish client */
148         unsigned int max_auth_attempts;
149         /*! \brief Configured authentication credentials */
150         struct ast_sip_auth_vector outbound_auths;
151         /*! \brief Outbound publish state */
152         struct ast_sip_outbound_publish_client *state;
153 };
154
155 AST_RWLIST_HEAD_STATIC(publisher_handlers, ast_sip_event_publisher_handler);
156
157 /*! \brief Container of currently active publish clients */
158 static AO2_GLOBAL_OBJ_STATIC(active);
159
160 static void sub_add_handler(struct ast_sip_event_publisher_handler *handler)
161 {
162         AST_RWLIST_INSERT_TAIL(&publisher_handlers, handler, next);
163         ast_module_ref(ast_module_info->self);
164 }
165
166 static struct ast_sip_event_publisher_handler *find_publisher_handler_for_event_name(const char *event_name)
167 {
168         struct ast_sip_event_publisher_handler *iter;
169
170         AST_RWLIST_TRAVERSE(&publisher_handlers, iter, next) {
171                 if (!strcmp(iter->event_name, event_name)) {
172                         break;
173                 }
174         }
175         return iter;
176 }
177
178 /*! \brief Helper function which cancels the refresh timer on a client */
179 static void cancel_publish_refresh(struct ast_sip_outbound_publish_client *client)
180 {
181         if (pj_timer_heap_cancel(pjsip_endpt_get_timer_heap(ast_sip_get_pjsip_endpoint()), &client->timer)) {
182                 /* The timer was successfully cancelled, drop the refcount of the client */
183                 ao2_ref(client, -1);
184         }
185 }
186
187 /*! \brief Helper function which sets up the timer to send publication */
188 static void schedule_publish_refresh(struct ast_sip_outbound_publish *publish, pjsip_rx_data *rdata)
189 {
190         pj_time_val delay = { .sec = 0, };
191         pjsip_expires_hdr *expires;
192
193         cancel_publish_refresh(publish->state);
194
195         /* Determine when we should refresh - we favor the Expires header if possible */
196         expires = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
197         if (expires) {
198                 delay.sec = expires->ivalue - PJSIP_PUBLISHC_DELAY_BEFORE_REFRESH;
199         }
200         if (publish->expiration && ((delay.sec > publish->expiration) || !delay.sec)) {
201                 delay.sec = publish->expiration;
202         }
203         if (delay.sec < PJSIP_PUBLISHC_DELAY_BEFORE_REFRESH) {
204                 delay.sec = PJSIP_PUBLISHC_DELAY_BEFORE_REFRESH;
205         }
206
207         ao2_ref(publish->state, +1);
208         if (pjsip_endpt_schedule_timer(ast_sip_get_pjsip_endpoint(), &publish->state->timer, &delay) != PJ_SUCCESS) {
209                 ast_log(LOG_WARNING, "Failed to pass timed publish refresh to scheduler\n");
210                 ao2_ref(publish->state, -1);
211         }
212 }
213
214 /*! \brief Publish client timer callback function */
215 static void sip_outbound_publish_timer_cb(pj_timer_heap_t *timer_heap, struct pj_timer_entry *entry)
216 {
217         struct ast_sip_outbound_publish_client *client = entry->user_data;
218
219         ao2_lock(client);
220         if (AST_LIST_EMPTY(&client->queue)) {
221                 /* If there are no outstanding messages send an empty PUBLISH message so our publication doesn't expire */
222                 ast_sip_publish_client_send(client, NULL);
223         }
224         ao2_unlock(client);
225
226         ao2_ref(client, -1);
227 }
228
229 /*! \brief Task for cancelling a refresh timer */
230 static int cancel_refresh_timer_task(void *data)
231 {
232         struct ast_sip_outbound_publish_client *state = data;
233
234         cancel_publish_refresh(state);
235         ao2_ref(state, -1);
236
237         return 0;
238 }
239
240 /*! \brief Task for sending an unpublish */
241 static int send_unpublish_task(void *data)
242 {
243         struct ast_sip_outbound_publish_client *state = data;
244         pjsip_tx_data *tdata;
245
246         if (pjsip_publishc_unpublish(state->client, &tdata) == PJ_SUCCESS) {
247                 pjsip_publishc_send(state->client, tdata);
248         }
249
250         ao2_ref(state, -1);
251
252         return 0;
253 }
254
255 /*! \brief Helper function which starts or stops publish clients when applicable */
256 static void sip_outbound_publish_synchronize(struct ast_sip_event_publisher_handler *removed)
257 {
258         RAII_VAR(struct ao2_container *, publishes, ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(), "outbound-publish", AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL), ao2_cleanup);
259         struct ao2_iterator i;
260         struct ast_sip_outbound_publish *publish;
261
262         if (!publishes) {
263                 return;
264         }
265
266         i = ao2_iterator_init(publishes, 0);
267         while ((publish = ao2_iterator_next(&i))) {
268                 struct ast_sip_event_publisher_handler *handler = find_publisher_handler_for_event_name(publish->event);
269
270                 if (!publish->state->started) {
271                         /* If the publisher client has not yet been started try to start it */
272                         if (!handler) {
273                                 ast_debug(2, "Could not find handler for event '%s' for outbound publish client '%s'\n",
274                                         publish->event, ast_sorcery_object_get_id(publish));
275                         } else if (handler->start_publishing(publish, publish->state)) {
276                                 ast_log(LOG_ERROR, "Failed to start outbound publish with event '%s' for client '%s'\n",
277                                         publish->event, ast_sorcery_object_get_id(publish));
278                         } else {
279                                 publish->state->started = 1;
280                         }
281                 } else if (publish->state->started && !handler && removed && !strcmp(publish->event, removed->event_name)) {
282                         /* If the publisher client has been started but it is going away stop it */
283                         removed->stop_publishing(publish->state);
284                         publish->state->started = 0;
285                         if (ast_sip_push_task(NULL, cancel_refresh_timer_task, ao2_bump(publish->state))) {
286                                 ast_log(LOG_WARNING, "Could not stop refresh timer on client '%s'\n",
287                                         ast_sorcery_object_get_id(publish));
288                                 ao2_ref(publish->state, -1);
289                         }
290                 }
291                 ao2_ref(publish, -1);
292         }
293         ao2_iterator_destroy(&i);
294 }
295
296 struct ast_sip_outbound_publish_client *ast_sip_publish_client_get(const char *name)
297 {
298         RAII_VAR(struct ast_sip_outbound_publish *, publish, ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "outbound-publish", name), ao2_cleanup);
299
300         if (!publish) {
301                 return NULL;
302         }
303
304         return ao2_bump(publish->state);
305 }
306
307 int ast_sip_register_event_publisher_handler(struct ast_sip_event_publisher_handler *handler)
308 {
309         struct ast_sip_event_publisher_handler *existing;
310         SCOPED_LOCK(lock, &publisher_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
311
312         if (!handler->start_publishing || !handler->stop_publishing) {
313                 ast_log(LOG_ERROR, "Handler does not implement required callbacks. Cannot register\n");
314                 return -1;
315         } else if (ast_strlen_zero(handler->event_name)) {
316                 ast_log(LOG_ERROR, "No event package specified for event publisher handler. Cannot register\n");
317                 return -1;
318         }
319
320         existing = find_publisher_handler_for_event_name(handler->event_name);
321         if (existing) {
322                 ast_log(LOG_ERROR, "Unable to register event publisher handler for event %s. "
323                                 "A handler is already registered\n", handler->event_name);
324                 return -1;
325         }
326
327         sub_add_handler(handler);
328
329         sip_outbound_publish_synchronize(NULL);
330
331         return 0;
332 }
333
334 void ast_sip_unregister_event_publisher_handler(struct ast_sip_event_publisher_handler *handler)
335 {
336         struct ast_sip_event_publisher_handler *iter;
337         SCOPED_LOCK(lock, &publisher_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
338         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&publisher_handlers, iter, next) {
339                 if (handler == iter) {
340                         AST_RWLIST_REMOVE_CURRENT(next);
341                         ast_module_unref(ast_module_info->self);
342                         break;
343                 }
344         }
345         AST_RWLIST_TRAVERSE_SAFE_END;
346
347         sip_outbound_publish_synchronize(handler);
348 }
349
350 /*! \brief Destructor function for publish information */
351 static void sip_outbound_publish_destroy(void *obj)
352 {
353         struct ast_sip_outbound_publish *publish = obj;
354         SCOPED_LOCK(lock, &publisher_handlers, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
355         struct ast_sip_event_publisher_handler *handler = find_publisher_handler_for_event_name(publish->event);
356
357         if (handler) {
358                 handler->stop_publishing(publish->state);
359         }
360         if (publish->state) {
361                 cancel_publish_refresh(publish->state);
362                 ao2_ref(publish->state, -1);
363         }
364         ast_sip_auth_vector_destroy(&publish->outbound_auths);
365
366         ast_string_field_free_memory(publish);
367 }
368
369 /*! \brief Allocator function for publish information */
370 static void *sip_outbound_publish_alloc(const char *name)
371 {
372         struct ast_sip_outbound_publish *publish = ast_sorcery_generic_alloc(sizeof(*publish),
373                 sip_outbound_publish_destroy);
374
375         if (!publish || ast_string_field_init(publish, 256)) {
376                 ao2_cleanup(publish);
377                 return NULL;
378         }
379
380         return publish;
381 }
382
383 static void sip_outbound_publish_datastore_destroy(void *obj)
384 {
385         struct ast_datastore *datastore = obj;
386
387         /* Using the destroy function (if present) destroy the data */
388         if (datastore->info->destroy != NULL && datastore->data != NULL) {
389                 datastore->info->destroy(datastore->data);
390                 datastore->data = NULL;
391         }
392
393         ast_free((void *) datastore->uid);
394         datastore->uid = NULL;
395 }
396
397 struct ast_datastore *ast_sip_publish_client_alloc_datastore(const struct ast_datastore_info *info, const char *uid)
398 {
399         RAII_VAR(struct ast_datastore *, datastore, NULL, ao2_cleanup);
400         const char *uid_ptr = uid;
401         char uuid_buf[AST_UUID_STR_LEN];
402
403         if (!info) {
404                 return NULL;
405         }
406
407         datastore = ao2_alloc(sizeof(*datastore), sip_outbound_publish_datastore_destroy);
408         if (!datastore) {
409                 return NULL;
410         }
411
412         datastore->info = info;
413         if (ast_strlen_zero(uid)) {
414                 /* They didn't provide an ID so we'll provide one ourself */
415                 uid_ptr = ast_uuid_generate_str(uuid_buf, sizeof(uuid_buf));
416         }
417
418         datastore->uid = ast_strdup(uid_ptr);
419         if (!datastore->uid) {
420                 return NULL;
421         }
422
423         ao2_ref(datastore, +1);
424         return datastore;
425 }
426
427 int ast_sip_publish_client_add_datastore(struct ast_sip_outbound_publish_client *client,
428         struct ast_datastore *datastore)
429 {
430         ast_assert(datastore != NULL);
431         ast_assert(datastore->info != NULL);
432         ast_assert(!ast_strlen_zero(datastore->uid));
433
434         if (!ao2_link(client->datastores, datastore)) {
435                 return -1;
436         }
437         return 0;
438 }
439
440 struct ast_datastore *ast_sip_publish_client_get_datastore(struct ast_sip_outbound_publish_client *client,
441         const char *name)
442 {
443         return ao2_find(client->datastores, name, OBJ_SEARCH_KEY);
444 }
445
446 void ast_sip_publish_client_remove_datastore(struct ast_sip_outbound_publish_client *client,
447         const char *name)
448 {
449         ao2_find(client->datastores, name, OBJ_SEARCH_KEY | OBJ_UNLINK | OBJ_NODATA);
450 }
451
452 static int sip_publish_client_service_queue(void *data)
453 {
454         RAII_VAR(struct ast_sip_outbound_publish_client *, client, data, ao2_cleanup);
455         SCOPED_AO2LOCK(lock, client);
456         struct sip_outbound_publish_message *message;
457         pjsip_tx_data *tdata;
458         pj_status_t status;
459
460         if (client->destroy || client->sending || !(message = AST_LIST_FIRST(&client->queue))) {
461                 return 0;
462         }
463
464         if (pjsip_publishc_publish(client->client, PJ_FALSE, &tdata) != PJ_SUCCESS) {
465                 goto fatal;
466         }
467
468         if (!ast_strlen_zero(message->body.type) && !ast_strlen_zero(message->body.subtype) &&
469                 ast_sip_add_body(tdata, &message->body)) {
470                 pjsip_tx_data_dec_ref(tdata);
471                 goto fatal;
472         }
473
474         status = pjsip_publishc_send(client->client, tdata);
475         if (status == PJ_EBUSY) {
476                 /* We attempted to send the message but something else got there first */
477                 goto service;
478         } else if (status != PJ_SUCCESS) {
479                 goto fatal;
480         }
481
482         client->sending = message;
483
484         return 0;
485
486 fatal:
487         AST_LIST_REMOVE_HEAD(&client->queue, entry);
488         ast_free(message);
489
490 service:
491         if (ast_sip_push_task(NULL, sip_publish_client_service_queue, ao2_bump(client))) {
492                 ao2_ref(client, -1);
493         }
494         return -1;
495 }
496
497 int ast_sip_publish_client_send(struct ast_sip_outbound_publish_client *client,
498         const struct ast_sip_body *body)
499 {
500         SCOPED_AO2LOCK(lock, client);
501         struct sip_outbound_publish_message *message;
502         size_t type_len = 0, subtype_len = 0, body_text_len = 0;
503         int res;
504
505         if (!client->client) {
506                 return -1;
507         }
508
509         /* If a body is present we need more space for the contents of it */
510         if (body) {
511                 type_len = strlen(body->type) + 1;
512                 subtype_len = strlen(body->subtype) + 1;
513                 body_text_len = strlen(body->body_text) + 1;
514         }
515
516         message = ast_calloc(1, sizeof(*message) + type_len + subtype_len + body_text_len);
517         if (!message) {
518                 return -1;
519         }
520
521         if (body) {
522                 char *dst = message->body_contents;
523
524                 message->body.type = strcpy(dst, body->type);
525                 dst += type_len;
526                 message->body.subtype = strcpy(dst, body->subtype);
527                 dst += subtype_len;
528                 message->body.body_text = strcpy(dst, body->body_text);
529         }
530
531         AST_LIST_INSERT_TAIL(&client->queue, message, entry);
532
533         res = ast_sip_push_task(NULL, sip_publish_client_service_queue, ao2_bump(client));
534         if (res) {
535                 ao2_ref(client, -1);
536         }
537
538         return res;
539 }
540
541 /*! \brief Destructor function for publish state */
542 static void sip_outbound_publish_client_destroy(void *obj)
543 {
544         struct ast_sip_outbound_publish_client *state = obj;
545         struct sip_outbound_publish_message *message;
546
547         /* You might be tempted to think "the publish client isn't being destroyed" but it actually is - just elsewhere */
548
549         while ((message = AST_LIST_REMOVE_HEAD(&state->queue, entry))) {
550                 ast_free(message);
551         }
552
553         ao2_cleanup(state->datastores);
554 }
555
556 /*!
557  * \internal
558  * \brief Check if a publish can be reused
559  *
560  * This checks if the existing outbound publish's configuration differs from a newly-applied
561  * outbound publish.
562  *
563  * \param existing The pre-existing outbound publish
564  * \param applied The newly-created publish
565  */
566 static int can_reuse_publish(struct ast_sip_outbound_publish *existing, struct ast_sip_outbound_publish *applied)
567 {
568         int i;
569
570         if (strcmp(existing->server_uri, applied->server_uri) || strcmp(existing->from_uri, applied->from_uri) ||
571                 strcmp(existing->to_uri, applied->to_uri) || strcmp(existing->outbound_proxy, applied->outbound_proxy) ||
572                 strcmp(existing->event, applied->event) ||
573                 AST_VECTOR_SIZE(&existing->outbound_auths) != AST_VECTOR_SIZE(&applied->outbound_auths)) {
574                 return 0;
575         }
576
577         for (i = 0; i < AST_VECTOR_SIZE(&existing->outbound_auths); ++i) {
578                 if (strcmp(AST_VECTOR_GET(&existing->outbound_auths, i), AST_VECTOR_GET(&applied->outbound_auths, i))) {
579                         return 0;
580                 }
581         }
582
583         return 1;
584 }
585
586 static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param);
587
588 /*! \brief Helper function that allocates a pjsip publish client and configures it */
589 static int sip_outbound_publish_client_alloc(void *data)
590 {
591         struct ast_sip_outbound_publish *publish = data;
592         pjsip_publishc_opt opt = {
593                 .queue_request = PJ_FALSE,
594         };
595         pj_str_t event, server_uri, to_uri, from_uri;
596         pj_status_t status;
597
598         if (publish->state->client) {
599                 return 0;
600         } else if (pjsip_publishc_create(ast_sip_get_pjsip_endpoint(), &opt, ao2_bump(publish), sip_outbound_publish_callback,
601                 &publish->state->client) != PJ_SUCCESS) {
602                 ao2_ref(publish, -1);
603                 return -1;
604         }
605
606         if (!ast_strlen_zero(publish->outbound_proxy)) {
607                 pjsip_route_hdr route_set, *route;
608                 static const pj_str_t ROUTE_HNAME = { "Route", 5 };
609
610                 pj_list_init(&route_set);
611
612                 if (!(route = pjsip_parse_hdr(pjsip_publishc_get_pool(publish->state->client), &ROUTE_HNAME,
613                         (char*)publish->outbound_proxy, strlen(publish->outbound_proxy), NULL))) {
614                         pjsip_publishc_destroy(publish->state->client);
615                         return -1;
616                 }
617                 pj_list_insert_nodes_before(&route_set, route);
618
619                 pjsip_publishc_set_route_set(publish->state->client, &route_set);
620         }
621
622         pj_cstr(&event, publish->event);
623         pj_cstr(&server_uri, publish->server_uri);
624         pj_cstr(&to_uri, S_OR(publish->to_uri, publish->server_uri));
625         pj_cstr(&from_uri, S_OR(publish->from_uri, publish->server_uri));
626
627         status = pjsip_publishc_init(publish->state->client, &event, &server_uri, &from_uri, &to_uri,
628                 publish->expiration);
629         if (status == PJSIP_EINVALIDURI) {
630                 pj_pool_t *pool;
631                 pj_str_t tmp;
632                 pjsip_uri *uri;
633
634                 pool = pjsip_endpt_create_pool(ast_sip_get_pjsip_endpoint(), "URI Validation", 256, 256);
635                 if (!pool) {
636                         ast_log(LOG_ERROR, "Could not create pool for URI validation on outbound publish '%s'\n",
637                                 ast_sorcery_object_get_id(publish));
638                         pjsip_publishc_destroy(publish->state->client);
639                         return -1;
640                 }
641
642                 pj_strdup2_with_null(pool, &tmp, publish->server_uri);
643                 uri = pjsip_parse_uri(pool, tmp.ptr, tmp.slen, 0);
644                 if (!uri) {
645                         ast_log(LOG_ERROR, "Invalid server URI '%s' specified on outbound publish '%s'\n",
646                                 publish->server_uri, ast_sorcery_object_get_id(publish));
647                 }
648
649                 if (!ast_strlen_zero(publish->to_uri)) {
650                         pj_strdup2_with_null(pool, &tmp, publish->to_uri);
651                         uri = pjsip_parse_uri(pool, tmp.ptr, tmp.slen, 0);
652                         if (!uri) {
653                                 ast_log(LOG_ERROR, "Invalid to URI '%s' specified on outbound publish '%s'\n",
654                                         publish->to_uri, ast_sorcery_object_get_id(publish));
655                         }
656                 }
657
658                 if (!ast_strlen_zero(publish->from_uri)) {
659                         pj_strdup2_with_null(pool, &tmp, publish->from_uri);
660                         uri = pjsip_parse_uri(pool, tmp.ptr, tmp.slen, 0);
661                         if (!uri) {
662                                 ast_log(LOG_ERROR, "Invalid from URI '%s' specified on outbound publish '%s'\n",
663                                         publish->from_uri, ast_sorcery_object_get_id(publish));
664                         }
665                 }
666
667                 pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
668                 pjsip_publishc_destroy(publish->state->client);
669                 return -1;
670         } else if (status != PJ_SUCCESS) {
671                 pjsip_publishc_destroy(publish->state->client);
672                 return -1;
673         }
674
675         return 0;
676 }
677
678 /*! \brief Callback function for publish client responses */
679 static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param)
680 {
681         RAII_VAR(struct ast_sip_outbound_publish *, publish, ao2_bump(param->token), ao2_cleanup);
682         SCOPED_AO2LOCK(lock, publish->state);
683         pjsip_tx_data *tdata;
684
685         if (publish->state->destroy) {
686                 if (publish->state->sending) {
687                         publish->state->sending = NULL;
688                         if (!ast_sip_push_task(NULL, send_unpublish_task, ao2_bump(publish->state))) {
689                                 return;
690                         }
691                         ast_log(LOG_WARNING, "Could not send unpublish message on outbound publish '%s'\n",
692                                 ast_sorcery_object_get_id(publish));
693                         ao2_ref(publish->state, -1);
694                 }
695                 /* Once the destroy is called this callback will not get called any longer, so drop the publish ref */
696                 pjsip_publishc_destroy(publish->state->client);
697                 ao2_ref(publish, -1);
698                 return;
699         }
700
701         if (param->code == 401 || param->code == 407) {
702                 if (!ast_sip_create_request_with_auth(&publish->outbound_auths,
703                                 param->rdata, pjsip_rdata_get_tsx(param->rdata), &tdata)) {
704                         pjsip_publishc_send(publish->state->client, tdata);
705                 }
706                 publish->state->auth_attempts++;
707
708                 if (publish->state->auth_attempts == publish->max_auth_attempts) {
709                         pjsip_publishc_destroy(publish->state->client);
710                         publish->state->client = NULL;
711
712                         ast_log(LOG_ERROR, "Reached maximum number of PUBLISH authentication attempts on outbound publish '%s'\n",
713                                 ast_sorcery_object_get_id(publish));
714
715                         goto end;
716                 }
717
718                 return;
719         }
720
721         publish->state->auth_attempts = 0;
722
723         if (param->code == 412) {
724                 pjsip_publishc_destroy(publish->state->client);
725                 publish->state->client = NULL;
726
727                 if (sip_outbound_publish_client_alloc(publish)) {
728                         ast_log(LOG_ERROR, "Failed to create a new outbound publish client for '%s' on 412 response\n",
729                                 ast_sorcery_object_get_id(publish));
730                         goto end;
731                 }
732
733                 /* Setting this to NULL will cause a new PUBLISH to get created and sent for the same underlying body */
734                 publish->state->sending = NULL;
735         } else if (param->code == 423) {
736                 /* Update the expiration with the new expiration time if available */
737                 pjsip_expires_hdr *expires;
738
739                 expires = pjsip_msg_find_hdr(param->rdata->msg_info.msg, PJSIP_H_MIN_EXPIRES, NULL);
740                 if (!expires || !expires->ivalue) {
741                         ast_log(LOG_ERROR, "Received 423 response on outbound publish '%s' without a Min-Expires header\n",
742                                 ast_sorcery_object_get_id(publish));
743                         pjsip_publishc_destroy(publish->state->client);
744                         publish->state->client = NULL;
745                         goto end;
746                 }
747
748                 pjsip_publishc_update_expires(publish->state->client, expires->ivalue);
749                 publish->state->sending = NULL;
750         } else if (publish->state->sending) {
751                 /* Remove the message currently being sent so that when the queue is serviced another will get sent */
752                 AST_LIST_REMOVE_HEAD(&publish->state->queue, entry);
753                 ast_free(publish->state->sending);
754                 publish->state->sending = NULL;
755         }
756
757         if (AST_LIST_EMPTY(&publish->state->queue)) {
758                 schedule_publish_refresh(publish, param->rdata);
759         }
760
761 end:
762         if (!publish->state->client) {
763                 struct sip_outbound_publish_message *message;
764
765                 while ((message = AST_LIST_REMOVE_HEAD(&publish->state->queue, entry))) {
766                         ast_free(message);
767                 }
768         } else {
769                 if (ast_sip_push_task(NULL, sip_publish_client_service_queue, ao2_bump(publish->state))) {
770                         ao2_ref(publish->state, -1);
771                 }
772         }
773 }
774
775 #define DATASTORE_BUCKETS 53
776
777 static int datastore_hash(const void *obj, int flags)
778 {
779         const struct ast_datastore *datastore;
780         const char *uid;
781
782         switch (flags & OBJ_SEARCH_MASK) {
783         case OBJ_SEARCH_KEY:
784                 uid = obj;
785                 break;
786         case OBJ_SEARCH_OBJECT:
787                 datastore = obj;
788                 uid = datastore->uid;
789                 break;
790         default:
791                 /* Hash can only work on something with a full key. */
792                 ast_assert(0);
793                 return 0;
794         }
795
796         return ast_str_hash(uid);
797 }
798
799 static int datastore_cmp(void *obj, void *arg, int flags)
800 {
801         const struct ast_datastore *object_left = obj;
802         const struct ast_datastore *object_right = arg;
803         const char *right_key = arg;
804         int cmp;
805
806         switch (flags & OBJ_SEARCH_MASK) {
807         case OBJ_SEARCH_OBJECT:
808                 right_key = object_right->uid;
809                 /* Fall through */
810         case OBJ_SEARCH_KEY:
811                 cmp = strcmp(object_left->uid, right_key);
812                 break;
813         case OBJ_SEARCH_PARTIAL_KEY:
814         cmp = strncmp(object_left->uid, right_key, strlen(right_key));
815                 break;
816         default:
817                 /*
818                  * What arg points to is specific to this traversal callback
819                  * and has no special meaning to astobj2.
820                  */
821                 cmp = 0;
822                 break;
823         }
824         if (cmp) {
825                 return 0;
826         }
827         /*
828          * At this point the traversal callback is identical to a sorted
829          * container.
830          */
831         return CMP_MATCH;
832 }
833
834 /*! \brief Allocator function for publish client state */
835 static struct ast_sip_outbound_publish_client *sip_outbound_publish_state_alloc(void)
836 {
837         struct ast_sip_outbound_publish_client *state = ao2_alloc(sizeof(*state), sip_outbound_publish_client_destroy);
838
839         if (!state) {
840                 return NULL;
841         }
842
843         state->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp);
844         if (!state->datastores) {
845                 ao2_ref(state, -1);
846                 return NULL;
847         }
848
849         state->timer.user_data = state;
850         state->timer.cb = sip_outbound_publish_timer_cb;
851
852         return state;
853 }
854
855 /*! \brief Apply function which finds or allocates a state structure */
856 static int sip_outbound_publish_apply(const struct ast_sorcery *sorcery, void *obj)
857 {
858         RAII_VAR(struct ast_sip_outbound_publish *, existing, ast_sorcery_retrieve_by_id(sorcery, "outbound-publish", ast_sorcery_object_get_id(obj)), ao2_cleanup);
859         struct ast_sip_outbound_publish *applied = obj;
860
861         if (ast_strlen_zero(applied->server_uri)) {
862                 ast_log(LOG_ERROR, "No server URI specified on outbound publish '%s'\n",
863                         ast_sorcery_object_get_id(applied));
864                 return -1;
865         } else if (ast_strlen_zero(applied->event)) {
866                 ast_log(LOG_ERROR, "No event type specified for outbound publish '%s'\n",
867                         ast_sorcery_object_get_id(applied));
868                 return -1;
869         }
870
871         if (!existing) {
872                 /* If no existing publish exists we can just start fresh easily */
873                 applied->state = sip_outbound_publish_state_alloc();
874         } else {
875                 /* If there is an existing publish things are more complicated, we can immediately reuse this state if most stuff remains unchanged */
876                 if (can_reuse_publish(existing, applied)) {
877                         applied->state = existing->state;
878                         ao2_ref(applied->state, +1);
879                 } else {
880                         applied->state = sip_outbound_publish_state_alloc();
881                 }
882         }
883
884         if (!applied->state) {
885                 return -1;
886         }
887
888         return ast_sip_push_task_synchronous(NULL, sip_outbound_publish_client_alloc, applied);
889 }
890
891 static int outbound_auth_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
892 {
893         struct ast_sip_outbound_publish *publish = obj;
894
895         return ast_sip_auth_vector_init(&publish->outbound_auths, var->value);
896 }
897
898 /*! \brief Helper function which prunes old publish clients */
899 static void prune_publish_clients(const char *object_type)
900 {
901         struct ao2_container *old, *current;
902
903         old = ao2_global_obj_ref(active);
904         if (old) {
905                 struct ao2_iterator i;
906                 struct ast_sip_outbound_publish *existing;
907
908                 i = ao2_iterator_init(old, 0);
909                 for (; (existing = ao2_iterator_next(&i)); ao2_ref(existing, -1)) {
910                         struct ast_sip_outbound_publish *created;
911
912                         created = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "outbound-publish",
913                                 ast_sorcery_object_get_id(existing));
914                         if (created) {
915                                 if (created->state == existing->state) {
916                                         ao2_ref(created, -1);
917                                         continue;
918                                 }
919                                 ao2_ref(created, -1);
920                         }
921
922                         ao2_lock(existing->state);
923
924                         /* If this publish client is currently publishing stop and terminate any refresh timer */
925                         if (existing->state->started) {
926                                 struct ast_sip_event_publisher_handler *handler = find_publisher_handler_for_event_name(existing->event);
927
928                                 if (handler) {
929                                         handler->stop_publishing(existing->state);
930                                 }
931
932                                 if (ast_sip_push_task(NULL, cancel_refresh_timer_task, ao2_bump(existing->state))) {
933                                         ast_log(LOG_WARNING, "Could not stop refresh timer on outbound publish '%s'\n",
934                                                 ast_sorcery_object_get_id(existing));
935                                         ao2_ref(existing->state, -1);
936                                 }
937                         }
938
939                         /* If nothing is being sent right now send the unpublish - the destroy will happen in the subsequent callback */
940                         if (!existing->state->sending) {
941                                 if (ast_sip_push_task(NULL, send_unpublish_task, ao2_bump(existing->state))) {
942                                         ast_log(LOG_WARNING, "Could not send unpublish message on outbound publish '%s'\n",
943                                                 ast_sorcery_object_get_id(existing));
944                                         ao2_ref(existing->state, -1);
945                                 }
946                         }
947
948                         existing->state->destroy = 1;
949                         ao2_unlock(existing->state);
950                 }
951                 ao2_iterator_destroy(&i);
952
953                 ao2_ref(old, -1);
954         }
955
956         current = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(), "outbound-publish", AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL);
957         ao2_global_obj_replace_unref(active, current);
958 }
959
960 static struct ast_sorcery_observer outbound_publish_observer = {
961         .loaded = prune_publish_clients,
962 };
963
964 static int load_module(void)
965 {
966         ast_sorcery_apply_default(ast_sip_get_sorcery(), "outbound-publish", "config", "pjsip.conf,criteria=type=outbound-publish");
967
968         if (ast_sorcery_object_register(ast_sip_get_sorcery(), "outbound-publish", sip_outbound_publish_alloc, NULL,
969                 sip_outbound_publish_apply)) {
970                 return AST_MODULE_LOAD_DECLINE;
971         }
972
973         ast_sorcery_observer_add(ast_sip_get_sorcery(), "outbound-publish", &outbound_publish_observer);
974
975         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "type", "", OPT_NOOP_T, 0, 0);
976         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "server_uri", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_outbound_publish, server_uri));
977         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "from_uri", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_outbound_publish, from_uri));
978         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "event", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_outbound_publish, event));
979         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "to_uri", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_outbound_publish, to_uri));
980         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "outbound_proxy", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_outbound_publish, outbound_proxy));
981         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "expiration", "3600", OPT_UINT_T, 0, FLDSET(struct ast_sip_outbound_publish, expiration));
982         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "max_auth_attempts", "5", OPT_UINT_T, 0, FLDSET(struct ast_sip_outbound_publish, max_auth_attempts));
983         ast_sorcery_object_field_register_custom(ast_sip_get_sorcery(), "outbound-publish", "outbound_auth", "", outbound_auth_handler, NULL, NULL, 0, 0);
984         ast_sorcery_reload_object(ast_sip_get_sorcery(), "outbound-publish");
985
986         AST_RWLIST_RDLOCK(&publisher_handlers);
987         sip_outbound_publish_synchronize(NULL);
988         AST_RWLIST_UNLOCK(&publisher_handlers);
989
990         pjsip_publishc_init_module(ast_sip_get_pjsip_endpoint());
991
992         return AST_MODULE_LOAD_SUCCESS;
993 }
994
995 static int reload_module(void)
996 {
997         ast_sorcery_reload_object(ast_sip_get_sorcery(), "outbound-publish");
998
999         AST_RWLIST_RDLOCK(&publisher_handlers);
1000         sip_outbound_publish_synchronize(NULL);
1001         AST_RWLIST_UNLOCK(&publisher_handlers);
1002         return 0;
1003 }
1004
1005 static int unload_module(void)
1006 {
1007         return 0;
1008 }
1009
1010 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS | AST_MODFLAG_LOAD_ORDER, "PJSIP Outbound Publish Support",
1011                 .load = load_module,
1012                 .reload = reload_module,
1013                 .unload = unload_module,
1014                 .load_pri = AST_MODPRI_CHANNEL_DEPEND,
1015                );