res_pjsip_outbound_publish: Add transport for outbound PUBLISH
[asterisk/asterisk.git] / res / res_pjsip_outbound_publish.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2014, Digium, Inc.
5  *
6  * Joshua Colp <jcolp@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*** MODULEINFO
20         <depend>pjproject</depend>
21         <depend>res_pjsip</depend>
22         <support_level>core</support_level>
23  ***/
24
25 #include "asterisk.h"
26
27 #include <pjsip.h>
28 #include <pjsip_simple.h>
29
30 #include "asterisk/res_pjsip.h"
31 #include "asterisk/res_pjsip_outbound_publish.h"
32 #include "asterisk/module.h"
33 #include "asterisk/taskprocessor.h"
34 #include "asterisk/datastore.h"
35
36 /*** DOCUMENTATION
37         <configInfo name="res_pjsip_outbound_publish" language="en_US">
38                 <synopsis>SIP resource for outbound publish</synopsis>
39                 <description><para>
40                         <emphasis>Outbound Publish</emphasis>
41                         </para>
42                         <para>This module allows <literal>res_pjsip</literal> to publish to other SIP servers.</para>
43                 </description>
44                 <configFile name="pjsip.conf">
45                         <configObject name="outbound-publish">
46                                 <synopsis>The configuration for outbound publish</synopsis>
47                                 <description><para>
48                                         Publish is <emphasis>COMPLETELY</emphasis> separate from the rest of
49                                         <literal>pjsip.conf</literal>. A minimal configuration consists of
50                                         setting a <literal>server_uri</literal> and <literal>event</literal>.
51                                 </para></description>
52                                 <configOption name="expiration" default="3600">
53                                         <synopsis>Expiration time for publications in seconds</synopsis>
54                                 </configOption>
55                                 <configOption name="outbound_auth" default="">
56                                         <synopsis>Authentication object to be used for outbound publishes.</synopsis>
57                                 </configOption>
58                                 <configOption name="outbound_proxy" default="">
59                                         <synopsis>SIP URI of the outbound proxy used to send publishes</synopsis>
60                                 </configOption>
61                                 <configOption name="server_uri">
62                                         <synopsis>SIP URI of the server and entity to publish to</synopsis>
63                                         <description><para>
64                                                 This is the URI at which to find the entity and server to send the outbound PUBLISH to.
65                                                 This URI is used as the request URI of the outbound PUBLISH request from Asterisk.
66                                         </para></description>
67                                 </configOption>
68                                 <configOption name="from_uri">
69                                         <synopsis>SIP URI to use in the From header</synopsis>
70                                         <description><para>
71                                                 This is the URI that will be placed into the From header of outgoing PUBLISH
72                                                 messages. If no URI is specified then the URI provided in <literal>server_uri</literal>
73                                                 will be used.
74                                         </para></description>
75                                 </configOption>
76                                 <configOption name="to_uri">
77                                         <synopsis>SIP URI to use in the To header</synopsis>
78                                         <description><para>
79                                                 This is the URI that will be placed into the To header of outgoing PUBLISH
80                                                 messages. If no URI is specified then the URI provided in <literal>server_uri</literal>
81                                                 will be used.
82                                         </para></description>
83                                 </configOption>
84                                 <configOption name="event" default="">
85                                         <synopsis>Event type of the PUBLISH.</synopsis>
86                                 </configOption>
87                                 <configOption name="max_auth_attempts" default="5">
88                                         <synopsis>Maximum number of authentication attempts before stopping the publication.</synopsis>
89                                 </configOption>
90                                 <configOption name="transport">
91                                         <synopsis>Transport used for outbound publish</synopsis>
92                                         <description>
93                                                 <note><para>A <replaceable>transport</replaceable> configured in
94                                                 <literal>pjsip.conf</literal>. As with other <literal>res_pjsip</literal> modules, this will use the first available transport of the appropriate type if unconfigured.</para></note>
95                                         </description>
96                                 </configOption>
97                                 <configOption name="type">
98                                         <synopsis>Must be of type 'outbound-publish'.</synopsis>
99                                 </configOption>
100                         </configObject>
101                 </configFile>
102         </configInfo>
103  ***/
104
105 /*! \brief Queued outbound publish message */
106 struct sip_outbound_publish_message {
107         /*! \brief Optional body */
108         struct ast_sip_body body;
109         /*! \brief Linked list information */
110         AST_LIST_ENTRY(sip_outbound_publish_message) entry;
111         /*! \brief Extra space for body contents */
112         char body_contents[0];
113 };
114
115 /*! \brief Outbound publish information */
116 struct ast_sip_outbound_publish {
117         /*! \brief Sorcery object details */
118         SORCERY_OBJECT(details);
119         /*! \brief Stringfields */
120         AST_DECLARE_STRING_FIELDS(
121                 /*! \brief URI for the entity and server */
122                 AST_STRING_FIELD(server_uri);
123                 /*! \brief URI for the From header */
124                 AST_STRING_FIELD(from_uri);
125                 /*! \brief URI for the To header */
126                 AST_STRING_FIELD(to_uri);
127                 /*! \brief Explicit transport to use for publish */
128                 AST_STRING_FIELD(transport);
129                 /*! \brief Outbound proxy to use */
130                 AST_STRING_FIELD(outbound_proxy);
131                 /*! \brief The event type to publish */
132                 AST_STRING_FIELD(event);
133         );
134         /*! \brief Requested expiration time */
135         unsigned int expiration;
136         /*! \brief Maximum number of auth attempts before stopping the publish client */
137         unsigned int max_auth_attempts;
138         /*! \brief Configured authentication credentials */
139         struct ast_sip_auth_vector outbound_auths;
140 };
141
142 /*! \brief Outbound publish client state information (persists for lifetime that publish should exist) */
143 struct ast_sip_outbound_publish_client {
144         /*! \brief Underlying publish client */
145         pjsip_publishc *client;
146         /*! \brief Timer entry for refreshing publish */
147         pj_timer_entry timer;
148         /*! \brief Publisher datastores set up by handlers */
149         struct ao2_container *datastores;
150         /*! \brief The number of auth attempts done */
151         unsigned int auth_attempts;
152         /*! \brief Queue of outgoing publish messages to send*/
153         AST_LIST_HEAD_NOLOCK(, sip_outbound_publish_message) queue;
154         /*! \brief The message currently being sent */
155         struct sip_outbound_publish_message *sending;
156         /*! \brief Publish client has been fully started and event type informed */
157         unsigned int started;
158         /*! \brief Publish client should be destroyed */
159         unsigned int destroy;
160         /*! \brief Outbound publish information */
161         struct ast_sip_outbound_publish *publish;
162         /*! \brief The name of the transport to be used for the publish */
163         char *transport_name;
164 };
165
166 /*! \brief Outbound publish state information (persists for lifetime of a publish) */
167 struct ast_sip_outbound_publish_state {
168         /*! \brief Outbound publish client */
169         struct ast_sip_outbound_publish_client *client;
170         /* publish state id lookup key - same as publish configuration id */
171         char id[0];
172 };
173
174 /*! \brief Unloading data */
175 struct unloading_data {
176         int is_unloading;
177         int count;
178         ast_mutex_t lock;
179         ast_cond_t cond;
180 } unloading;
181
182 /*! \brief Default number of client state container buckets */
183 #define DEFAULT_STATE_BUCKETS 31
184 static AO2_GLOBAL_OBJ_STATIC(current_states);
185 /*! \brief Used on [re]loads to hold new state data */
186 static struct ao2_container *new_states;
187
188 /*! \brief hashing function for state objects */
189 static int outbound_publish_state_hash(const void *obj, const int flags)
190 {
191         const struct ast_sip_outbound_publish_state *object;
192         const char *key;
193
194         switch (flags & OBJ_SEARCH_MASK) {
195         case OBJ_SEARCH_KEY:
196                 key = obj;
197                 break;
198         case OBJ_SEARCH_OBJECT:
199                 object = obj;
200                 key = object->id;
201                 break;
202         default:
203                 ast_assert(0);
204                 return 0;
205         }
206         return ast_str_hash(key);
207 }
208
209 /*! \brief comparator function for client objects */
210 static int outbound_publish_state_cmp(void *obj, void *arg, int flags)
211 {
212         const struct ast_sip_outbound_publish_state *object_left = obj;
213         const struct ast_sip_outbound_publish_state *object_right = arg;
214         const char *right_key = arg;
215         int cmp;
216
217         switch (flags & OBJ_SEARCH_MASK) {
218         case OBJ_SEARCH_OBJECT:
219                 right_key = object_right->id;
220                 /* Fall through */
221         case OBJ_SEARCH_KEY:
222                 cmp = strcmp(object_left->id, right_key);
223                 break;
224         case OBJ_SEARCH_PARTIAL_KEY:
225                 /* Not supported by container. */
226                 ast_assert(0);
227                 return 0;
228         default:
229                 cmp = 0;
230                 break;
231         }
232         if (cmp) {
233                 return 0;
234         }
235         return CMP_MATCH;
236 }
237
238 static struct ao2_container *get_publishes_and_update_state(void)
239 {
240         struct ao2_container *container;
241
242         container = ast_sorcery_retrieve_by_fields(
243                 ast_sip_get_sorcery(), "outbound-publish",
244                 AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL);
245
246         if (!new_states) {
247                 return container;
248         }
249
250         ao2_global_obj_replace_unref(current_states, new_states);
251         ao2_cleanup(new_states);
252         new_states = NULL;
253
254         return container;
255 }
256
257 AST_RWLIST_HEAD_STATIC(publisher_handlers, ast_sip_event_publisher_handler);
258
259 static void sub_add_handler(struct ast_sip_event_publisher_handler *handler)
260 {
261         AST_RWLIST_INSERT_TAIL(&publisher_handlers, handler, next);
262         ast_module_ref(ast_module_info->self);
263 }
264
265 static struct ast_sip_event_publisher_handler *find_publisher_handler_for_event_name(const char *event_name)
266 {
267         struct ast_sip_event_publisher_handler *iter;
268
269         AST_RWLIST_TRAVERSE(&publisher_handlers, iter, next) {
270                 if (!strcmp(iter->event_name, event_name)) {
271                         break;
272                 }
273         }
274         return iter;
275 }
276
277 /*! \brief Helper function which cancels the refresh timer on a client */
278 static void cancel_publish_refresh(struct ast_sip_outbound_publish_client *client)
279 {
280         if (pj_timer_heap_cancel(pjsip_endpt_get_timer_heap(ast_sip_get_pjsip_endpoint()), &client->timer)) {
281                 /* The timer was successfully cancelled, drop the refcount of the client */
282                 ao2_ref(client, -1);
283         }
284 }
285
286 /*! \brief Helper function which sets up the timer to send publication */
287 static void schedule_publish_refresh(struct ast_sip_outbound_publish_client *client, int expiration)
288 {
289         struct ast_sip_outbound_publish *publish = ao2_bump(client->publish);
290         pj_time_val delay = { .sec = 0, };
291
292         cancel_publish_refresh(client);
293
294         if (expiration > 0) {
295                 delay.sec = expiration - PJSIP_PUBLISHC_DELAY_BEFORE_REFRESH;
296         }
297         if (publish->expiration && ((delay.sec > publish->expiration) || !delay.sec)) {
298                 delay.sec = publish->expiration;
299         }
300         if (delay.sec < PJSIP_PUBLISHC_DELAY_BEFORE_REFRESH) {
301                 delay.sec = PJSIP_PUBLISHC_DELAY_BEFORE_REFRESH;
302         }
303
304         ao2_ref(client, +1);
305         if (pjsip_endpt_schedule_timer(ast_sip_get_pjsip_endpoint(), &client->timer, &delay) != PJ_SUCCESS) {
306                 ast_log(LOG_WARNING, "Failed to pass timed publish refresh to scheduler\n");
307                 ao2_ref(client, -1);
308         }
309         ao2_ref(publish, -1);
310 }
311
312 /*! \brief Publish client timer callback function */
313 static void sip_outbound_publish_timer_cb(pj_timer_heap_t *timer_heap, struct pj_timer_entry *entry)
314 {
315         struct ast_sip_outbound_publish_client *client = entry->user_data;
316
317         ao2_lock(client);
318         if (AST_LIST_EMPTY(&client->queue)) {
319                 /* If there are no outstanding messages send an empty PUBLISH message so our publication doesn't expire */
320                 ast_sip_publish_client_send(client, NULL);
321         }
322         ao2_unlock(client);
323
324         ao2_ref(client, -1);
325 }
326
327 /*! \brief Task for cancelling a refresh timer */
328 static int cancel_refresh_timer_task(void *data)
329 {
330         struct ast_sip_outbound_publish_client *client = data;
331
332         cancel_publish_refresh(client);
333         ao2_ref(client, -1);
334
335         return 0;
336 }
337
338 /*! \brief Task for sending an unpublish */
339 static int send_unpublish_task(void *data)
340 {
341         struct ast_sip_outbound_publish_client *client = data;
342         pjsip_tx_data *tdata;
343
344         if (pjsip_publishc_unpublish(client->client, &tdata) == PJ_SUCCESS) {
345                 if (!ast_strlen_zero(client->transport_name)) {
346                         pjsip_tpselector selector = { .type = PJSIP_TPSELECTOR_NONE, };
347                         ast_sip_set_tpselector_from_transport_name(client->transport_name, &selector);
348                         pjsip_tx_data_set_transport(tdata, &selector);
349                 }
350
351                 pjsip_publishc_send(client->client, tdata);
352         }
353
354         ao2_ref(client, -1);
355
356         return 0;
357 }
358
359 /*! \brief Helper function which starts or stops publish clients when applicable */
360 static void sip_outbound_publish_synchronize(struct ast_sip_event_publisher_handler *removed)
361 {
362         RAII_VAR(struct ao2_container *, publishes, get_publishes_and_update_state(), ao2_cleanup);
363         struct ao2_container *states;
364         struct ao2_iterator i;
365         struct ast_sip_outbound_publish_state *state;
366
367         if (!publishes) {
368                 return;
369         }
370
371         states = ao2_global_obj_ref(current_states);
372         if (!states) {
373                 return;
374         }
375
376         i = ao2_iterator_init(states, 0);
377         while ((state = ao2_iterator_next(&i))) {
378                 struct ast_sip_outbound_publish *publish = ao2_bump(state->client->publish);
379                 struct ast_sip_event_publisher_handler *handler = find_publisher_handler_for_event_name(publish->event);
380
381                 if (!state->client->started) {
382                         /* If the publisher client has not yet been started try to start it */
383                         if (!handler) {
384                                 ast_debug(2, "Could not find handler for event '%s' for outbound publish client '%s'\n",
385                                           publish->event, ast_sorcery_object_get_id(publish));
386                         } else if (handler->start_publishing(publish, state->client)) {
387                                 ast_log(LOG_ERROR, "Failed to start outbound publish with event '%s' for client '%s'\n",
388                                         publish->event, ast_sorcery_object_get_id(publish));
389                         } else {
390                                 state->client->started = 1;
391                         }
392                 } else if (state->client->started && !handler && removed && !strcmp(publish->event, removed->event_name)) {
393                         /* If the publisher client has been started but it is going away stop it */
394                         removed->stop_publishing(state->client);
395                         state->client->started = 0;
396                         if (ast_sip_push_task(NULL, cancel_refresh_timer_task, ao2_bump(state->client))) {
397                                 ast_log(LOG_WARNING, "Could not stop refresh timer on client '%s'\n",
398                                         ast_sorcery_object_get_id(publish));
399                                 ao2_ref(state->client, -1);
400                         }
401                 }
402                 ao2_ref(publish, -1);
403                 ao2_ref(state, -1);
404         }
405         ao2_iterator_destroy(&i);
406         ao2_ref(states, -1);
407 }
408
409 struct ast_sip_outbound_publish_client *ast_sip_publish_client_get(const char *name)
410 {
411         RAII_VAR(struct ao2_container *, states,
412                  ao2_global_obj_ref(current_states), ao2_cleanup);
413         RAII_VAR(struct ast_sip_outbound_publish_state *, state, NULL, ao2_cleanup);
414
415         if (!states) {
416                 return NULL;
417         }
418
419         state = ao2_find(states, name, OBJ_SEARCH_KEY);
420         if (!state) {
421                 return NULL;
422         }
423
424         ao2_ref(state->client, +1);
425         return state->client;
426 }
427
428 int ast_sip_register_event_publisher_handler(struct ast_sip_event_publisher_handler *handler)
429 {
430         struct ast_sip_event_publisher_handler *existing;
431         SCOPED_LOCK(lock, &publisher_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
432
433         if (!handler->start_publishing || !handler->stop_publishing) {
434                 ast_log(LOG_ERROR, "Handler does not implement required callbacks. Cannot register\n");
435                 return -1;
436         } else if (ast_strlen_zero(handler->event_name)) {
437                 ast_log(LOG_ERROR, "No event package specified for event publisher handler. Cannot register\n");
438                 return -1;
439         }
440
441         existing = find_publisher_handler_for_event_name(handler->event_name);
442         if (existing) {
443                 ast_log(LOG_ERROR, "Unable to register event publisher handler for event %s. "
444                                 "A handler is already registered\n", handler->event_name);
445                 return -1;
446         }
447
448         sub_add_handler(handler);
449
450         sip_outbound_publish_synchronize(NULL);
451
452         return 0;
453 }
454
455 void ast_sip_unregister_event_publisher_handler(struct ast_sip_event_publisher_handler *handler)
456 {
457         struct ast_sip_event_publisher_handler *iter;
458         SCOPED_LOCK(lock, &publisher_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
459         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&publisher_handlers, iter, next) {
460                 if (handler == iter) {
461                         AST_RWLIST_REMOVE_CURRENT(next);
462                         ast_module_unref(ast_module_info->self);
463                         break;
464                 }
465         }
466         AST_RWLIST_TRAVERSE_SAFE_END;
467
468         sip_outbound_publish_synchronize(handler);
469 }
470
471 /*! \brief Destructor function for publish information */
472 static void sip_outbound_publish_destroy(void *obj)
473 {
474         struct ast_sip_outbound_publish *publish = obj;
475
476         ast_sip_auth_vector_destroy(&publish->outbound_auths);
477
478         ast_string_field_free_memory(publish);
479 }
480
481 /*! \brief Allocator function for publish information */
482 static void *sip_outbound_publish_alloc(const char *name)
483 {
484         struct ast_sip_outbound_publish *publish = ast_sorcery_generic_alloc(sizeof(*publish),
485                 sip_outbound_publish_destroy);
486
487         if (!publish || ast_string_field_init(publish, 256)) {
488                 ao2_cleanup(publish);
489                 return NULL;
490         }
491
492         return publish;
493 }
494
495 static void sip_outbound_publish_datastore_destroy(void *obj)
496 {
497         struct ast_datastore *datastore = obj;
498
499         /* Using the destroy function (if present) destroy the data */
500         if (datastore->info->destroy != NULL && datastore->data != NULL) {
501                 datastore->info->destroy(datastore->data);
502                 datastore->data = NULL;
503         }
504
505         ast_free((void *) datastore->uid);
506         datastore->uid = NULL;
507 }
508
509 struct ast_datastore *ast_sip_publish_client_alloc_datastore(const struct ast_datastore_info *info, const char *uid)
510 {
511         RAII_VAR(struct ast_datastore *, datastore, NULL, ao2_cleanup);
512         const char *uid_ptr = uid;
513         char uuid_buf[AST_UUID_STR_LEN];
514
515         if (!info) {
516                 return NULL;
517         }
518
519         datastore = ao2_alloc(sizeof(*datastore), sip_outbound_publish_datastore_destroy);
520         if (!datastore) {
521                 return NULL;
522         }
523
524         datastore->info = info;
525         if (ast_strlen_zero(uid)) {
526                 /* They didn't provide an ID so we'll provide one ourself */
527                 uid_ptr = ast_uuid_generate_str(uuid_buf, sizeof(uuid_buf));
528         }
529
530         datastore->uid = ast_strdup(uid_ptr);
531         if (!datastore->uid) {
532                 return NULL;
533         }
534
535         ao2_ref(datastore, +1);
536         return datastore;
537 }
538
539 int ast_sip_publish_client_add_datastore(struct ast_sip_outbound_publish_client *client,
540         struct ast_datastore *datastore)
541 {
542         ast_assert(datastore != NULL);
543         ast_assert(datastore->info != NULL);
544         ast_assert(!ast_strlen_zero(datastore->uid));
545
546         if (!ao2_link(client->datastores, datastore)) {
547                 return -1;
548         }
549         return 0;
550 }
551
552 struct ast_datastore *ast_sip_publish_client_get_datastore(struct ast_sip_outbound_publish_client *client,
553         const char *name)
554 {
555         return ao2_find(client->datastores, name, OBJ_SEARCH_KEY);
556 }
557
558 void ast_sip_publish_client_remove_datastore(struct ast_sip_outbound_publish_client *client,
559         const char *name)
560 {
561         ao2_find(client->datastores, name, OBJ_SEARCH_KEY | OBJ_UNLINK | OBJ_NODATA);
562 }
563
564 static int sip_publish_client_service_queue(void *data)
565 {
566         RAII_VAR(struct ast_sip_outbound_publish_client *, client, data, ao2_cleanup);
567         SCOPED_AO2LOCK(lock, client);
568         struct sip_outbound_publish_message *message;
569         pjsip_tx_data *tdata;
570         pj_status_t status;
571
572         if (client->destroy || client->sending || !(message = AST_LIST_FIRST(&client->queue))) {
573                 return 0;
574         }
575
576         if (pjsip_publishc_publish(client->client, PJ_FALSE, &tdata) != PJ_SUCCESS) {
577                 goto fatal;
578         }
579
580         if (!ast_strlen_zero(message->body.type) && !ast_strlen_zero(message->body.subtype) &&
581                 ast_sip_add_body(tdata, &message->body)) {
582                 pjsip_tx_data_dec_ref(tdata);
583                 goto fatal;
584         }
585
586         if (!ast_strlen_zero(client->transport_name)) {
587                 pjsip_tpselector selector = { .type = PJSIP_TPSELECTOR_NONE, };
588                 ast_sip_set_tpselector_from_transport_name(client->transport_name, &selector);
589                 pjsip_tx_data_set_transport(tdata, &selector);
590         }
591
592         status = pjsip_publishc_send(client->client, tdata);
593         if (status == PJ_EBUSY) {
594                 /* We attempted to send the message but something else got there first */
595                 goto service;
596         } else if (status != PJ_SUCCESS) {
597                 goto fatal;
598         }
599
600         client->sending = message;
601
602         return 0;
603
604 fatal:
605         AST_LIST_REMOVE_HEAD(&client->queue, entry);
606         ast_free(message);
607
608 service:
609         if (ast_sip_push_task(NULL, sip_publish_client_service_queue, ao2_bump(client))) {
610                 ao2_ref(client, -1);
611         }
612         return -1;
613 }
614
615 int ast_sip_publish_client_send(struct ast_sip_outbound_publish_client *client,
616         const struct ast_sip_body *body)
617 {
618         SCOPED_AO2LOCK(lock, client);
619         struct sip_outbound_publish_message *message;
620         size_t type_len = 0, subtype_len = 0, body_text_len = 0;
621         int res;
622
623         if (!client->client) {
624                 return -1;
625         }
626
627         /* If a body is present we need more space for the contents of it */
628         if (body) {
629                 type_len = strlen(body->type) + 1;
630                 subtype_len = strlen(body->subtype) + 1;
631                 body_text_len = strlen(body->body_text) + 1;
632         }
633
634         message = ast_calloc(1, sizeof(*message) + type_len + subtype_len + body_text_len);
635         if (!message) {
636                 return -1;
637         }
638
639         if (body) {
640                 char *dst = message->body_contents;
641
642                 message->body.type = strcpy(dst, body->type);
643                 dst += type_len;
644                 message->body.subtype = strcpy(dst, body->subtype);
645                 dst += subtype_len;
646                 message->body.body_text = strcpy(dst, body->body_text);
647         }
648
649         AST_LIST_INSERT_TAIL(&client->queue, message, entry);
650
651         res = ast_sip_push_task(NULL, sip_publish_client_service_queue, ao2_bump(client));
652         if (res) {
653                 ao2_ref(client, -1);
654         }
655
656         return res;
657 }
658
659 /*! \brief Destructor function for publish client */
660 static void sip_outbound_publish_client_destroy(void *obj)
661 {
662         struct ast_sip_outbound_publish_client *client = obj;
663         struct sip_outbound_publish_message *message;
664
665         /* You might be tempted to think "the publish client isn't being destroyed" but it actually is - just elsewhere */
666
667         while ((message = AST_LIST_REMOVE_HEAD(&client->queue, entry))) {
668                 ast_free(message);
669         }
670
671         ao2_cleanup(client->datastores);
672         ao2_cleanup(client->publish);
673         ast_free(client->transport_name);
674
675         /* if unloading the module and all objects have been unpublished
676            send the signal to finish unloading */
677         if (unloading.is_unloading) {
678                 ast_mutex_lock(&unloading.lock);
679                 if (--unloading.count == 0) {
680                         ast_cond_signal(&unloading.cond);
681                 }
682                 ast_mutex_unlock(&unloading.lock);
683         }
684 }
685
686 static int explicit_publish_destroy(void *data)
687 {
688         struct ast_sip_outbound_publish_client *client = data;
689
690         pjsip_publishc_destroy(client->client);
691         ao2_ref(client, -1);
692
693         return 0;
694 }
695
696 /*! \brief Helper function which cancels and un-publishes a no longer used client */
697 static int cancel_and_unpublish(struct ast_sip_outbound_publish_client *client)
698 {
699         struct ast_sip_event_publisher_handler *handler;
700         SCOPED_AO2LOCK(lock, client);
701
702         if (!client->started) {
703                 /* If the client was never started, there's nothing to unpublish, so just
704                  * destroy the publication and remove its reference to the client.
705                  */
706                 ast_sip_push_task(NULL, explicit_publish_destroy, client);
707                 return 0;
708         }
709
710         handler = find_publisher_handler_for_event_name(client->publish->event);
711         if (handler) {
712                 handler->stop_publishing(client);
713         }
714
715         client->started = 0;
716         if (ast_sip_push_task(NULL, cancel_refresh_timer_task, ao2_bump(client))) {
717                 ast_log(LOG_WARNING, "Could not stop refresh timer on outbound publish '%s'\n",
718                         ast_sorcery_object_get_id(client->publish));
719                 ao2_ref(client, -1);
720         }
721
722         /* If nothing is being sent right now send the unpublish - the destroy will happen in the subsequent callback */
723         if (!client->sending) {
724                 if (ast_sip_push_task(NULL, send_unpublish_task, ao2_bump(client))) {
725                         ast_log(LOG_WARNING, "Could not send unpublish message on outbound publish '%s'\n",
726                                 ast_sorcery_object_get_id(client->publish));
727                         ao2_ref(client, -1);
728                 }
729         }
730         client->destroy = 1;
731         return 0;
732 }
733
734 /*! \brief Destructor function for publish state */
735 static void sip_outbound_publish_state_destroy(void *obj)
736 {
737         struct ast_sip_outbound_publish_state *state = obj;
738
739         cancel_and_unpublish(state->client);
740         ao2_cleanup(state->client);
741 }
742
743 /*!
744  * \internal
745  * \brief Check if a publish can be reused
746  *
747  * This checks if the existing outbound publish's configuration differs from a newly-applied
748  * outbound publish.
749  *
750  * \param existing The pre-existing outbound publish
751  * \param applied The newly-created publish
752  */
753 static int can_reuse_publish(struct ast_sip_outbound_publish *existing, struct ast_sip_outbound_publish *applied)
754 {
755         int i;
756
757         if (strcmp(existing->server_uri, applied->server_uri) || strcmp(existing->from_uri, applied->from_uri) ||
758                 strcmp(existing->to_uri, applied->to_uri) || strcmp(existing->outbound_proxy, applied->outbound_proxy) ||
759                 strcmp(existing->event, applied->event) ||
760                 AST_VECTOR_SIZE(&existing->outbound_auths) != AST_VECTOR_SIZE(&applied->outbound_auths)) {
761                 return 0;
762         }
763
764         for (i = 0; i < AST_VECTOR_SIZE(&existing->outbound_auths); ++i) {
765                 if (strcmp(AST_VECTOR_GET(&existing->outbound_auths, i), AST_VECTOR_GET(&applied->outbound_auths, i))) {
766                         return 0;
767                 }
768         }
769
770         return 1;
771 }
772
773 static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param);
774
775 /*! \brief Helper function that allocates a pjsip publish client and configures it */
776 static int sip_outbound_publish_client_alloc(void *data)
777 {
778         struct ast_sip_outbound_publish_client *client = data;
779         RAII_VAR(struct ast_sip_outbound_publish *, publish, NULL, ao2_cleanup);
780         pjsip_publishc_opt opt = {
781                 .queue_request = PJ_FALSE,
782         };
783         pj_str_t event, server_uri, to_uri, from_uri;
784         pj_status_t status;
785
786         if (client->client) {
787                 return 0;
788         } else if (pjsip_publishc_create(ast_sip_get_pjsip_endpoint(), &opt, ao2_bump(client), sip_outbound_publish_callback,
789                 &client->client) != PJ_SUCCESS) {
790                 ao2_ref(client, -1);
791                 return -1;
792         }
793
794         publish = ao2_bump(client->publish);
795
796         if (!ast_strlen_zero(publish->outbound_proxy)) {
797                 pjsip_route_hdr route_set, *route;
798                 static const pj_str_t ROUTE_HNAME = { "Route", 5 };
799
800                 pj_list_init(&route_set);
801
802                 if (!(route = pjsip_parse_hdr(pjsip_publishc_get_pool(client->client), &ROUTE_HNAME,
803                         (char*)publish->outbound_proxy, strlen(publish->outbound_proxy), NULL))) {
804                         pjsip_publishc_destroy(client->client);
805                         return -1;
806                 }
807                 pj_list_insert_nodes_before(&route_set, route);
808
809                 pjsip_publishc_set_route_set(client->client, &route_set);
810         }
811
812         pj_cstr(&event, publish->event);
813         pj_cstr(&server_uri, publish->server_uri);
814         pj_cstr(&to_uri, S_OR(publish->to_uri, publish->server_uri));
815         pj_cstr(&from_uri, S_OR(publish->from_uri, publish->server_uri));
816
817         status = pjsip_publishc_init(client->client, &event, &server_uri, &from_uri, &to_uri,
818                 publish->expiration);
819         if (status == PJSIP_EINVALIDURI) {
820                 pj_pool_t *pool;
821                 pj_str_t tmp;
822                 pjsip_uri *uri;
823
824                 pool = pjsip_endpt_create_pool(ast_sip_get_pjsip_endpoint(), "URI Validation", 256, 256);
825                 if (!pool) {
826                         ast_log(LOG_ERROR, "Could not create pool for URI validation on outbound publish '%s'\n",
827                                 ast_sorcery_object_get_id(publish));
828                         pjsip_publishc_destroy(client->client);
829                         return -1;
830                 }
831
832                 pj_strdup2_with_null(pool, &tmp, publish->server_uri);
833                 uri = pjsip_parse_uri(pool, tmp.ptr, tmp.slen, 0);
834                 if (!uri) {
835                         ast_log(LOG_ERROR, "Invalid server URI '%s' specified on outbound publish '%s'\n",
836                                 publish->server_uri, ast_sorcery_object_get_id(publish));
837                 }
838
839                 if (!ast_strlen_zero(publish->to_uri)) {
840                         pj_strdup2_with_null(pool, &tmp, publish->to_uri);
841                         uri = pjsip_parse_uri(pool, tmp.ptr, tmp.slen, 0);
842                         if (!uri) {
843                                 ast_log(LOG_ERROR, "Invalid to URI '%s' specified on outbound publish '%s'\n",
844                                         publish->to_uri, ast_sorcery_object_get_id(publish));
845                         }
846                 }
847
848                 if (!ast_strlen_zero(publish->from_uri)) {
849                         pj_strdup2_with_null(pool, &tmp, publish->from_uri);
850                         uri = pjsip_parse_uri(pool, tmp.ptr, tmp.slen, 0);
851                         if (!uri) {
852                                 ast_log(LOG_ERROR, "Invalid from URI '%s' specified on outbound publish '%s'\n",
853                                         publish->from_uri, ast_sorcery_object_get_id(publish));
854                         }
855                 }
856
857                 pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
858                 pjsip_publishc_destroy(client->client);
859                 return -1;
860         } else if (status != PJ_SUCCESS) {
861                 pjsip_publishc_destroy(client->client);
862                 return -1;
863         }
864
865         return 0;
866 }
867
868 /*! \brief Callback function for publish client responses */
869 static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param)
870 {
871         RAII_VAR(struct ast_sip_outbound_publish_client *, client, ao2_bump(param->token), ao2_cleanup);
872         RAII_VAR(struct ast_sip_outbound_publish *, publish, ao2_bump(client->publish), ao2_cleanup);
873         SCOPED_AO2LOCK(lock, client);
874         pjsip_tx_data *tdata;
875
876         if (client->destroy) {
877                 if (client->sending) {
878                         client->sending = NULL;
879
880                         if (!ast_sip_push_task(NULL, send_unpublish_task, ao2_bump(client))) {
881                                 return;
882                         }
883                         ast_log(LOG_WARNING, "Could not send unpublish message on outbound publish '%s'\n",
884                                 ast_sorcery_object_get_id(publish));
885                         ao2_ref(client, -1);
886                 }
887                 /* Once the destroy is called this callback will not get called any longer, so drop the client ref */
888                 pjsip_publishc_destroy(client->client);
889                 ao2_ref(client, -1);
890                 return;
891         }
892
893         if (param->code == 401 || param->code == 407) {
894                 pjsip_transaction *tsx = pjsip_rdata_get_tsx(param->rdata);
895
896                 if (!ast_sip_create_request_with_auth(&publish->outbound_auths,
897                                 param->rdata, tsx->last_tx, &tdata)) {
898                         if (!ast_strlen_zero(client->transport_name)) {
899                                 pjsip_tpselector selector = { .type = PJSIP_TPSELECTOR_NONE, };
900                                 ast_sip_set_tpselector_from_transport_name(client->transport_name, &selector);
901                                 pjsip_tx_data_set_transport(tdata, &selector);
902                         }
903                         pjsip_publishc_send(client->client, tdata);
904                 }
905                 client->auth_attempts++;
906
907                 if (client->auth_attempts == publish->max_auth_attempts) {
908                         pjsip_publishc_destroy(client->client);
909                         client->client = NULL;
910
911                         ast_log(LOG_ERROR, "Reached maximum number of PUBLISH authentication attempts on outbound publish '%s'\n",
912                                 ast_sorcery_object_get_id(publish));
913
914                         goto end;
915                 }
916                 return;
917         }
918
919         client->auth_attempts = 0;
920
921         if (param->code == 412) {
922                 pjsip_publishc_destroy(client->client);
923                 client->client = NULL;
924
925                 if (sip_outbound_publish_client_alloc(client)) {
926                         ast_log(LOG_ERROR, "Failed to create a new outbound publish client for '%s' on 412 response\n",
927                                 ast_sorcery_object_get_id(publish));
928                         goto end;
929                 }
930
931                 /* Setting this to NULL will cause a new PUBLISH to get created and sent for the same underlying body */
932                 client->sending = NULL;
933         } else if (param->code == 423) {
934                 /* Update the expiration with the new expiration time if available */
935                 pjsip_expires_hdr *expires;
936
937                 expires = pjsip_msg_find_hdr(param->rdata->msg_info.msg, PJSIP_H_MIN_EXPIRES, NULL);
938                 if (!expires || !expires->ivalue) {
939                         ast_log(LOG_ERROR, "Received 423 response on outbound publish '%s' without a Min-Expires header\n",
940                                 ast_sorcery_object_get_id(publish));
941                         pjsip_publishc_destroy(client->client);
942                         client->client = NULL;
943                         goto end;
944                 }
945
946                 pjsip_publishc_update_expires(client->client, expires->ivalue);
947                 client->sending = NULL;
948         } else if (client->sending) {
949                 /* Remove the message currently being sent so that when the queue is serviced another will get sent */
950                 AST_LIST_REMOVE_HEAD(&client->queue, entry);
951                 ast_free(client->sending);
952                 client->sending = NULL;
953                 if (!param->rdata) {
954                         ast_log(LOG_NOTICE, "No response received for outbound publish '%s'\n",
955                                 ast_sorcery_object_get_id(publish));
956                 }
957         }
958
959         if (AST_LIST_EMPTY(&client->queue)) {
960                 schedule_publish_refresh(client, param->expiration);
961         }
962
963 end:
964         if (!client->client) {
965                 struct sip_outbound_publish_message *message;
966
967                 while ((message = AST_LIST_REMOVE_HEAD(&client->queue, entry))) {
968                         ast_free(message);
969                 }
970         } else {
971                 if (ast_sip_push_task(NULL, sip_publish_client_service_queue, ao2_bump(client))) {
972                         ao2_ref(client, -1);
973                 }
974         }
975 }
976
977 #define DATASTORE_BUCKETS 53
978
979 static int datastore_hash(const void *obj, int flags)
980 {
981         const struct ast_datastore *datastore;
982         const char *uid;
983
984         switch (flags & OBJ_SEARCH_MASK) {
985         case OBJ_SEARCH_KEY:
986                 uid = obj;
987                 break;
988         case OBJ_SEARCH_OBJECT:
989                 datastore = obj;
990                 uid = datastore->uid;
991                 break;
992         default:
993                 /* Hash can only work on something with a full key. */
994                 ast_assert(0);
995                 return 0;
996         }
997
998         return ast_str_hash(uid);
999 }
1000
1001 static int datastore_cmp(void *obj, void *arg, int flags)
1002 {
1003         const struct ast_datastore *object_left = obj;
1004         const struct ast_datastore *object_right = arg;
1005         const char *right_key = arg;
1006         int cmp;
1007
1008         switch (flags & OBJ_SEARCH_MASK) {
1009         case OBJ_SEARCH_OBJECT:
1010                 right_key = object_right->uid;
1011                 /* Fall through */
1012         case OBJ_SEARCH_KEY:
1013                 cmp = strcmp(object_left->uid, right_key);
1014                 break;
1015         case OBJ_SEARCH_PARTIAL_KEY:
1016         cmp = strncmp(object_left->uid, right_key, strlen(right_key));
1017                 break;
1018         default:
1019                 /*
1020                  * What arg points to is specific to this traversal callback
1021                  * and has no special meaning to astobj2.
1022                  */
1023                 cmp = 0;
1024                 break;
1025         }
1026         if (cmp) {
1027                 return 0;
1028         }
1029         /*
1030          * At this point the traversal callback is identical to a sorted
1031          * container.
1032          */
1033         return CMP_MATCH;
1034 }
1035
1036 /*! \brief Allocator function for publish client */
1037 static struct ast_sip_outbound_publish_state *sip_outbound_publish_state_alloc(
1038         struct ast_sip_outbound_publish *publish)
1039 {
1040         const char *id = ast_sorcery_object_get_id(publish);
1041         struct ast_sip_outbound_publish_state *state =
1042                 ao2_alloc(sizeof(*state) + strlen(id) + 1, sip_outbound_publish_state_destroy);
1043
1044         if (!state) {
1045                 return NULL;
1046         }
1047
1048         state->client = ao2_alloc(sizeof(*state->client), sip_outbound_publish_client_destroy);
1049         if (!state->client) {
1050                 ao2_ref(state, -1);
1051                 return NULL;
1052         }
1053
1054         state->client->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp);
1055         if (!state->client->datastores) {
1056                 ao2_ref(state, -1);
1057                 return NULL;
1058         }
1059
1060         state->client->timer.user_data = state->client;
1061         state->client->timer.cb = sip_outbound_publish_timer_cb;
1062         state->client->transport_name = ast_strdup(publish->transport);
1063         state->client->publish = ao2_bump(publish);
1064
1065         strcpy(state->id, id);
1066         return state;
1067 }
1068
1069 /*! \brief Apply function which finds or allocates a state structure */
1070 static int sip_outbound_publish_apply(const struct ast_sorcery *sorcery, void *obj)
1071 {
1072         RAII_VAR(struct ao2_container *, states, ao2_global_obj_ref(current_states), ao2_cleanup);
1073         RAII_VAR(struct ast_sip_outbound_publish_state *, state, NULL, ao2_cleanup);
1074         struct ast_sip_outbound_publish *applied = obj;
1075
1076         if (ast_strlen_zero(applied->server_uri)) {
1077                 ast_log(LOG_ERROR, "No server URI specified on outbound publish '%s'\n",
1078                         ast_sorcery_object_get_id(applied));
1079                 return -1;
1080         } else if (ast_strlen_zero(applied->event)) {
1081                 ast_log(LOG_ERROR, "No event type specified for outbound publish '%s'\n",
1082                         ast_sorcery_object_get_id(applied));
1083                 return -1;
1084         }
1085
1086         if (!new_states) {
1087                 /* make sure new_states has been allocated as we will be adding to it */
1088                 new_states = ao2_container_alloc_options(
1089                         AO2_ALLOC_OPT_LOCK_NOLOCK, DEFAULT_STATE_BUCKETS,
1090                         outbound_publish_state_hash, outbound_publish_state_cmp);
1091
1092                 if (!new_states) {
1093                         ast_log(LOG_ERROR, "Unable to allocate new states container\n");
1094                         return -1;
1095                 }
1096         }
1097
1098         if (states) {
1099                 state = ao2_find(states, ast_sorcery_object_get_id(obj), OBJ_SEARCH_KEY);
1100                 if (state) {
1101                         if (can_reuse_publish(state->client->publish, applied)) {
1102                                 ao2_replace(state->client->publish, applied);
1103                         } else {
1104                                 ao2_ref(state, -1);
1105                                 state = NULL;
1106                         }
1107                 }
1108         }
1109
1110         if (!state) {
1111                 state = sip_outbound_publish_state_alloc(applied);
1112                 if (!state) {
1113                         ast_log(LOG_ERROR, "Unable to create state for outbound publish '%s'\n",
1114                                 ast_sorcery_object_get_id(applied));
1115                         return -1;
1116                 };
1117         }
1118
1119         if (ast_sip_push_task_synchronous(NULL, sip_outbound_publish_client_alloc, state->client)) {
1120                 ast_log(LOG_ERROR, "Unable to create client for outbound publish '%s'\n",
1121                         ast_sorcery_object_get_id(applied));
1122                 return -1;
1123         }
1124
1125         ao2_link(new_states, state);
1126         return 0;
1127 }
1128
1129 static int outbound_auth_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
1130 {
1131         struct ast_sip_outbound_publish *publish = obj;
1132
1133         return ast_sip_auth_vector_init(&publish->outbound_auths, var->value);
1134 }
1135
1136 static int load_module(void)
1137 {
1138         CHECK_PJSIP_MODULE_LOADED();
1139
1140         ast_sorcery_apply_config(ast_sip_get_sorcery(), "res_pjsip_outbound_publish");
1141         ast_sorcery_apply_default(ast_sip_get_sorcery(), "outbound-publish", "config", "pjsip.conf,criteria=type=outbound-publish");
1142
1143         if (ast_sorcery_object_register(ast_sip_get_sorcery(), "outbound-publish", sip_outbound_publish_alloc, NULL,
1144                 sip_outbound_publish_apply)) {
1145                 return AST_MODULE_LOAD_DECLINE;
1146         }
1147
1148         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "type", "", OPT_NOOP_T, 0, 0);
1149         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "server_uri", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_outbound_publish, server_uri));
1150         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "from_uri", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_outbound_publish, from_uri));
1151         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "event", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_outbound_publish, event));
1152         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "to_uri", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_outbound_publish, to_uri));
1153         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "outbound_proxy", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_outbound_publish, outbound_proxy));
1154         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "expiration", "3600", OPT_UINT_T, 0, FLDSET(struct ast_sip_outbound_publish, expiration));
1155         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "max_auth_attempts", "5", OPT_UINT_T, 0, FLDSET(struct ast_sip_outbound_publish, max_auth_attempts));
1156         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "transport", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_outbound_publish, transport));
1157         ast_sorcery_object_field_register_custom(ast_sip_get_sorcery(), "outbound-publish", "outbound_auth", "", outbound_auth_handler, NULL, NULL, 0, 0);
1158
1159         ast_sorcery_reload_object(ast_sip_get_sorcery(), "outbound-publish");
1160
1161         AST_RWLIST_RDLOCK(&publisher_handlers);
1162         sip_outbound_publish_synchronize(NULL);
1163         AST_RWLIST_UNLOCK(&publisher_handlers);
1164
1165         pjsip_publishc_init_module(ast_sip_get_pjsip_endpoint());
1166
1167         return AST_MODULE_LOAD_SUCCESS;
1168 }
1169
1170 static int reload_module(void)
1171 {
1172         ast_sorcery_reload_object(ast_sip_get_sorcery(), "outbound-publish");
1173
1174         AST_RWLIST_RDLOCK(&publisher_handlers);
1175         sip_outbound_publish_synchronize(NULL);
1176         AST_RWLIST_UNLOCK(&publisher_handlers);
1177         return 0;
1178 }
1179
1180 static int unload_module(void)
1181 {
1182         struct timeval start = ast_tvnow();
1183         struct timespec end = {
1184                 .tv_sec = start.tv_sec + 10,
1185                 .tv_nsec = start.tv_usec * 1000
1186         };
1187         int res = 0;
1188         struct ao2_container *states = ao2_global_obj_ref(current_states);
1189
1190         if (!states || !(unloading.count = ao2_container_count(states))) {
1191                 return 0;
1192         }
1193         ao2_ref(states, -1);
1194
1195         ast_mutex_init(&unloading.lock);
1196         ast_cond_init(&unloading.cond, NULL);
1197         ast_mutex_lock(&unloading.lock);
1198
1199         unloading.is_unloading = 1;
1200         ao2_global_obj_release(current_states);
1201
1202         /* wait for items to unpublish */
1203         ast_verb(5, "Waiting to complete unpublishing task(s)\n");
1204         while (unloading.count) {
1205                 res = ast_cond_timedwait(&unloading.cond, &unloading.lock, &end);
1206         }
1207         ast_mutex_unlock(&unloading.lock);
1208
1209         ast_mutex_destroy(&unloading.lock);
1210         ast_cond_destroy(&unloading.cond);
1211
1212         if (res) {
1213                 ast_verb(5, "At least %d items were unable to unpublish "
1214                         "in the allowed time\n", unloading.count);
1215         } else {
1216                 ast_verb(5, "All items successfully unpublished\n");
1217         }
1218
1219         return res;
1220 }
1221
1222 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS | AST_MODFLAG_LOAD_ORDER, "PJSIP Outbound Publish Support",
1223         .load = load_module,
1224         .reload = reload_module,
1225         .unload = unload_module,
1226         .load_pri = AST_MODPRI_CHANNEL_DEPEND,
1227 );