9073f7c4f9ec61a37412f6822dd5abf49f17b1a6
[asterisk/asterisk.git] / res / res_pjsip_outbound_publish.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2014, Digium, Inc.
5  *
6  * Joshua Colp <jcolp@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*** MODULEINFO
20         <depend>pjproject</depend>
21         <depend>res_pjsip</depend>
22         <support_level>core</support_level>
23  ***/
24
25 #include "asterisk.h"
26
27 #include <pjsip.h>
28 #include <pjsip_simple.h>
29
30 #include "asterisk/res_pjsip.h"
31 #include "asterisk/res_pjsip_outbound_publish.h"
32 #include "asterisk/module.h"
33 #include "asterisk/taskprocessor.h"
34 #include "asterisk/datastore.h"
35
36 /*** DOCUMENTATION
37         <configInfo name="res_pjsip_outbound_publish" language="en_US">
38                 <synopsis>SIP resource for outbound publish</synopsis>
39                 <description><para>
40                         <emphasis>Outbound Publish</emphasis>
41                         </para>
42                         <para>This module allows <literal>res_pjsip</literal> to publish to other SIP servers.</para>
43                 </description>
44                 <configFile name="pjsip.conf">
45                         <configObject name="outbound-publish">
46                                 <synopsis>The configuration for outbound publish</synopsis>
47                                 <description><para>
48                                         Publish is <emphasis>COMPLETELY</emphasis> separate from the rest of
49                                         <literal>pjsip.conf</literal>. A minimal configuration consists of
50                                         setting a <literal>server_uri</literal> and <literal>event</literal>.
51                                 </para></description>
52                                 <configOption name="expiration" default="3600">
53                                         <synopsis>Expiration time for publications in seconds</synopsis>
54                                 </configOption>
55                                 <configOption name="outbound_auth" default="">
56                                         <synopsis>Authentication object to be used for outbound publishes.</synopsis>
57                                 </configOption>
58                                 <configOption name="outbound_proxy" default="">
59                                         <synopsis>SIP URI of the outbound proxy used to send publishes</synopsis>
60                                 </configOption>
61                                 <configOption name="server_uri">
62                                         <synopsis>SIP URI of the server and entity to publish to</synopsis>
63                                         <description><para>
64                                                 This is the URI at which to find the entity and server to send the outbound PUBLISH to.
65                                                 This URI is used as the request URI of the outbound PUBLISH request from Asterisk.
66                                         </para></description>
67                                 </configOption>
68                                 <configOption name="from_uri">
69                                         <synopsis>SIP URI to use in the From header</synopsis>
70                                         <description><para>
71                                                 This is the URI that will be placed into the From header of outgoing PUBLISH
72                                                 messages. If no URI is specified then the URI provided in <literal>server_uri</literal>
73                                                 will be used.
74                                         </para></description>
75                                 </configOption>
76                                 <configOption name="to_uri">
77                                         <synopsis>SIP URI to use in the To header</synopsis>
78                                         <description><para>
79                                                 This is the URI that will be placed into the To header of outgoing PUBLISH
80                                                 messages. If no URI is specified then the URI provided in <literal>server_uri</literal>
81                                                 will be used.
82                                         </para></description>
83                                 </configOption>
84                                 <configOption name="event" default="">
85                                         <synopsis>Event type of the PUBLISH.</synopsis>
86                                 </configOption>
87                                 <configOption name="max_auth_attempts" default="5">
88                                         <synopsis>Maximum number of authentication attempts before stopping the publication.</synopsis>
89                                 </configOption>
90                                 <configOption name="type">
91                                         <synopsis>Must be of type 'outbound-publish'.</synopsis>
92                                 </configOption>
93                         </configObject>
94                 </configFile>
95         </configInfo>
96  ***/
97
98 /*! \brief Queued outbound publish message */
99 struct sip_outbound_publish_message {
100         /*! \brief Optional body */
101         struct ast_sip_body body;
102         /*! \brief Linked list information */
103         AST_LIST_ENTRY(sip_outbound_publish_message) entry;
104         /*! \brief Extra space for body contents */
105         char body_contents[0];
106 };
107
108 /*! \brief Outbound publish information */
109 struct ast_sip_outbound_publish {
110         /*! \brief Sorcery object details */
111         SORCERY_OBJECT(details);
112         /*! \brief Stringfields */
113         AST_DECLARE_STRING_FIELDS(
114                 /*! \brief URI for the entity and server */
115                 AST_STRING_FIELD(server_uri);
116                 /*! \brief URI for the From header */
117                 AST_STRING_FIELD(from_uri);
118                 /*! \brief URI for the To header */
119                 AST_STRING_FIELD(to_uri);
120                 /*! \brief Outbound proxy to use */
121                 AST_STRING_FIELD(outbound_proxy);
122                 /*! \brief The event type to publish */
123                 AST_STRING_FIELD(event);
124         );
125         /*! \brief Requested expiration time */
126         unsigned int expiration;
127         /*! \brief Maximum number of auth attempts before stopping the publish client */
128         unsigned int max_auth_attempts;
129         /*! \brief Configured authentication credentials */
130         struct ast_sip_auth_vector outbound_auths;
131 };
132
133 /*! \brief Outbound publish client state information (persists for lifetime that publish should exist) */
134 struct ast_sip_outbound_publish_client {
135         /*! \brief Underlying publish client */
136         pjsip_publishc *client;
137         /*! \brief Timer entry for refreshing publish */
138         pj_timer_entry timer;
139         /*! \brief Publisher datastores set up by handlers */
140         struct ao2_container *datastores;
141         /*! \brief The number of auth attempts done */
142         unsigned int auth_attempts;
143         /*! \brief Queue of outgoing publish messages to send*/
144         AST_LIST_HEAD_NOLOCK(, sip_outbound_publish_message) queue;
145         /*! \brief The message currently being sent */
146         struct sip_outbound_publish_message *sending;
147         /*! \brief Publish client has been fully started and event type informed */
148         unsigned int started;
149         /*! \brief Publish client should be destroyed */
150         unsigned int destroy;
151         /*! \brief Outbound publish information */
152         struct ast_sip_outbound_publish *publish;
153 };
154
155 /*! \brief Outbound publish state information (persists for lifetime of a publish) */
156 struct ast_sip_outbound_publish_state {
157         /*! \brief Outbound publish client */
158         struct ast_sip_outbound_publish_client *client;
159         /* publish state id lookup key - same as publish configuration id */
160         char id[0];
161 };
162
163 /*! \brief Unloading data */
164 struct unloading_data {
165         int is_unloading;
166         int count;
167         ast_mutex_t lock;
168         ast_cond_t cond;
169 } unloading;
170
171 /*! \brief Default number of client state container buckets */
172 #define DEFAULT_STATE_BUCKETS 31
173 static AO2_GLOBAL_OBJ_STATIC(current_states);
174 /*! \brief Used on [re]loads to hold new state data */
175 static struct ao2_container *new_states;
176
177 /*! \brief hashing function for state objects */
178 static int outbound_publish_state_hash(const void *obj, const int flags)
179 {
180         const struct ast_sip_outbound_publish_state *object;
181         const char *key;
182
183         switch (flags & OBJ_SEARCH_MASK) {
184         case OBJ_SEARCH_KEY:
185                 key = obj;
186                 break;
187         case OBJ_SEARCH_OBJECT:
188                 object = obj;
189                 key = object->id;
190                 break;
191         default:
192                 ast_assert(0);
193                 return 0;
194         }
195         return ast_str_hash(key);
196 }
197
198 /*! \brief comparator function for client objects */
199 static int outbound_publish_state_cmp(void *obj, void *arg, int flags)
200 {
201         const struct ast_sip_outbound_publish_state *object_left = obj;
202         const struct ast_sip_outbound_publish_state *object_right = arg;
203         const char *right_key = arg;
204         int cmp;
205
206         switch (flags & OBJ_SEARCH_MASK) {
207         case OBJ_SEARCH_OBJECT:
208                 right_key = object_right->id;
209                 /* Fall through */
210         case OBJ_SEARCH_KEY:
211                 cmp = strcmp(object_left->id, right_key);
212                 break;
213         case OBJ_SEARCH_PARTIAL_KEY:
214                 /* Not supported by container. */
215                 ast_assert(0);
216                 return 0;
217         default:
218                 cmp = 0;
219                 break;
220         }
221         if (cmp) {
222                 return 0;
223         }
224         return CMP_MATCH;
225 }
226
227 static struct ao2_container *get_publishes_and_update_state(void)
228 {
229         struct ao2_container *container;
230
231         container = ast_sorcery_retrieve_by_fields(
232                 ast_sip_get_sorcery(), "outbound-publish",
233                 AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL);
234
235         if (!new_states) {
236                 return container;
237         }
238
239         ao2_global_obj_replace_unref(current_states, new_states);
240         ao2_cleanup(new_states);
241         new_states = NULL;
242
243         return container;
244 }
245
246 AST_RWLIST_HEAD_STATIC(publisher_handlers, ast_sip_event_publisher_handler);
247
248 static void sub_add_handler(struct ast_sip_event_publisher_handler *handler)
249 {
250         AST_RWLIST_INSERT_TAIL(&publisher_handlers, handler, next);
251         ast_module_ref(ast_module_info->self);
252 }
253
254 static struct ast_sip_event_publisher_handler *find_publisher_handler_for_event_name(const char *event_name)
255 {
256         struct ast_sip_event_publisher_handler *iter;
257
258         AST_RWLIST_TRAVERSE(&publisher_handlers, iter, next) {
259                 if (!strcmp(iter->event_name, event_name)) {
260                         break;
261                 }
262         }
263         return iter;
264 }
265
266 /*! \brief Helper function which cancels the refresh timer on a client */
267 static void cancel_publish_refresh(struct ast_sip_outbound_publish_client *client)
268 {
269         if (pj_timer_heap_cancel(pjsip_endpt_get_timer_heap(ast_sip_get_pjsip_endpoint()), &client->timer)) {
270                 /* The timer was successfully cancelled, drop the refcount of the client */
271                 ao2_ref(client, -1);
272         }
273 }
274
275 /*! \brief Helper function which sets up the timer to send publication */
276 static void schedule_publish_refresh(struct ast_sip_outbound_publish_client *client, pjsip_rx_data *rdata)
277 {
278         struct ast_sip_outbound_publish *publish = ao2_bump(client->publish);
279         pj_time_val delay = { .sec = 0, };
280         pjsip_expires_hdr *expires;
281
282         cancel_publish_refresh(client);
283
284         /* Determine when we should refresh - we favor the Expires header if possible */
285         expires = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
286         if (expires) {
287                 delay.sec = expires->ivalue - PJSIP_PUBLISHC_DELAY_BEFORE_REFRESH;
288         }
289         if (publish->expiration && ((delay.sec > publish->expiration) || !delay.sec)) {
290                 delay.sec = publish->expiration;
291         }
292         if (delay.sec < PJSIP_PUBLISHC_DELAY_BEFORE_REFRESH) {
293                 delay.sec = PJSIP_PUBLISHC_DELAY_BEFORE_REFRESH;
294         }
295
296         ao2_ref(client, +1);
297         if (pjsip_endpt_schedule_timer(ast_sip_get_pjsip_endpoint(), &client->timer, &delay) != PJ_SUCCESS) {
298                 ast_log(LOG_WARNING, "Failed to pass timed publish refresh to scheduler\n");
299                 ao2_ref(client, -1);
300         }
301         ao2_ref(publish, -1);
302 }
303
304 /*! \brief Publish client timer callback function */
305 static void sip_outbound_publish_timer_cb(pj_timer_heap_t *timer_heap, struct pj_timer_entry *entry)
306 {
307         struct ast_sip_outbound_publish_client *client = entry->user_data;
308
309         ao2_lock(client);
310         if (AST_LIST_EMPTY(&client->queue)) {
311                 /* If there are no outstanding messages send an empty PUBLISH message so our publication doesn't expire */
312                 ast_sip_publish_client_send(client, NULL);
313         }
314         ao2_unlock(client);
315
316         ao2_ref(client, -1);
317 }
318
319 /*! \brief Task for cancelling a refresh timer */
320 static int cancel_refresh_timer_task(void *data)
321 {
322         struct ast_sip_outbound_publish_client *client = data;
323
324         cancel_publish_refresh(client);
325         ao2_ref(client, -1);
326
327         return 0;
328 }
329
330 /*! \brief Task for sending an unpublish */
331 static int send_unpublish_task(void *data)
332 {
333         struct ast_sip_outbound_publish_client *client = data;
334         pjsip_tx_data *tdata;
335
336         if (pjsip_publishc_unpublish(client->client, &tdata) == PJ_SUCCESS) {
337                 pjsip_publishc_send(client->client, tdata);
338         }
339
340         ao2_ref(client, -1);
341
342         return 0;
343 }
344
345 /*! \brief Helper function which starts or stops publish clients when applicable */
346 static void sip_outbound_publish_synchronize(struct ast_sip_event_publisher_handler *removed)
347 {
348         RAII_VAR(struct ao2_container *, publishes, get_publishes_and_update_state(), ao2_cleanup);
349         struct ao2_container *states;
350         struct ao2_iterator i;
351         struct ast_sip_outbound_publish_state *state;
352
353         if (!publishes) {
354                 return;
355         }
356
357         states = ao2_global_obj_ref(current_states);
358         if (!states) {
359                 return;
360         }
361
362         i = ao2_iterator_init(states, 0);
363         while ((state = ao2_iterator_next(&i))) {
364                 struct ast_sip_outbound_publish *publish = ao2_bump(state->client->publish);
365                 struct ast_sip_event_publisher_handler *handler = find_publisher_handler_for_event_name(publish->event);
366
367                 if (!state->client->started) {
368                         /* If the publisher client has not yet been started try to start it */
369                         if (!handler) {
370                                 ast_debug(2, "Could not find handler for event '%s' for outbound publish client '%s'\n",
371                                           publish->event, ast_sorcery_object_get_id(publish));
372                         } else if (handler->start_publishing(publish, state->client)) {
373                                 ast_log(LOG_ERROR, "Failed to start outbound publish with event '%s' for client '%s'\n",
374                                         publish->event, ast_sorcery_object_get_id(publish));
375                         } else {
376                                 state->client->started = 1;
377                         }
378                 } else if (state->client->started && !handler && removed && !strcmp(publish->event, removed->event_name)) {
379                         /* If the publisher client has been started but it is going away stop it */
380                         removed->stop_publishing(state->client);
381                         state->client->started = 0;
382                         if (ast_sip_push_task(NULL, cancel_refresh_timer_task, ao2_bump(state->client))) {
383                                 ast_log(LOG_WARNING, "Could not stop refresh timer on client '%s'\n",
384                                         ast_sorcery_object_get_id(publish));
385                                 ao2_ref(state->client, -1);
386                         }
387                 }
388                 ao2_ref(publish, -1);
389                 ao2_ref(state, -1);
390         }
391         ao2_iterator_destroy(&i);
392         ao2_ref(states, -1);
393 }
394
395 struct ast_sip_outbound_publish_client *ast_sip_publish_client_get(const char *name)
396 {
397         RAII_VAR(struct ao2_container *, states,
398                  ao2_global_obj_ref(current_states), ao2_cleanup);
399         RAII_VAR(struct ast_sip_outbound_publish_state *, state, NULL, ao2_cleanup);
400
401         if (!states) {
402                 return NULL;
403         }
404
405         state = ao2_find(states, name, OBJ_SEARCH_KEY);
406         if (!state) {
407                 return NULL;
408         }
409
410         ao2_ref(state->client, +1);
411         return state->client;
412 }
413
414 int ast_sip_register_event_publisher_handler(struct ast_sip_event_publisher_handler *handler)
415 {
416         struct ast_sip_event_publisher_handler *existing;
417         SCOPED_LOCK(lock, &publisher_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
418
419         if (!handler->start_publishing || !handler->stop_publishing) {
420                 ast_log(LOG_ERROR, "Handler does not implement required callbacks. Cannot register\n");
421                 return -1;
422         } else if (ast_strlen_zero(handler->event_name)) {
423                 ast_log(LOG_ERROR, "No event package specified for event publisher handler. Cannot register\n");
424                 return -1;
425         }
426
427         existing = find_publisher_handler_for_event_name(handler->event_name);
428         if (existing) {
429                 ast_log(LOG_ERROR, "Unable to register event publisher handler for event %s. "
430                                 "A handler is already registered\n", handler->event_name);
431                 return -1;
432         }
433
434         sub_add_handler(handler);
435
436         sip_outbound_publish_synchronize(NULL);
437
438         return 0;
439 }
440
441 void ast_sip_unregister_event_publisher_handler(struct ast_sip_event_publisher_handler *handler)
442 {
443         struct ast_sip_event_publisher_handler *iter;
444         SCOPED_LOCK(lock, &publisher_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
445         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&publisher_handlers, iter, next) {
446                 if (handler == iter) {
447                         AST_RWLIST_REMOVE_CURRENT(next);
448                         ast_module_unref(ast_module_info->self);
449                         break;
450                 }
451         }
452         AST_RWLIST_TRAVERSE_SAFE_END;
453
454         sip_outbound_publish_synchronize(handler);
455 }
456
457 /*! \brief Destructor function for publish information */
458 static void sip_outbound_publish_destroy(void *obj)
459 {
460         struct ast_sip_outbound_publish *publish = obj;
461
462         ast_sip_auth_vector_destroy(&publish->outbound_auths);
463
464         ast_string_field_free_memory(publish);
465 }
466
467 /*! \brief Allocator function for publish information */
468 static void *sip_outbound_publish_alloc(const char *name)
469 {
470         struct ast_sip_outbound_publish *publish = ast_sorcery_generic_alloc(sizeof(*publish),
471                 sip_outbound_publish_destroy);
472
473         if (!publish || ast_string_field_init(publish, 256)) {
474                 ao2_cleanup(publish);
475                 return NULL;
476         }
477
478         return publish;
479 }
480
481 static void sip_outbound_publish_datastore_destroy(void *obj)
482 {
483         struct ast_datastore *datastore = obj;
484
485         /* Using the destroy function (if present) destroy the data */
486         if (datastore->info->destroy != NULL && datastore->data != NULL) {
487                 datastore->info->destroy(datastore->data);
488                 datastore->data = NULL;
489         }
490
491         ast_free((void *) datastore->uid);
492         datastore->uid = NULL;
493 }
494
495 struct ast_datastore *ast_sip_publish_client_alloc_datastore(const struct ast_datastore_info *info, const char *uid)
496 {
497         RAII_VAR(struct ast_datastore *, datastore, NULL, ao2_cleanup);
498         const char *uid_ptr = uid;
499         char uuid_buf[AST_UUID_STR_LEN];
500
501         if (!info) {
502                 return NULL;
503         }
504
505         datastore = ao2_alloc(sizeof(*datastore), sip_outbound_publish_datastore_destroy);
506         if (!datastore) {
507                 return NULL;
508         }
509
510         datastore->info = info;
511         if (ast_strlen_zero(uid)) {
512                 /* They didn't provide an ID so we'll provide one ourself */
513                 uid_ptr = ast_uuid_generate_str(uuid_buf, sizeof(uuid_buf));
514         }
515
516         datastore->uid = ast_strdup(uid_ptr);
517         if (!datastore->uid) {
518                 return NULL;
519         }
520
521         ao2_ref(datastore, +1);
522         return datastore;
523 }
524
525 int ast_sip_publish_client_add_datastore(struct ast_sip_outbound_publish_client *client,
526         struct ast_datastore *datastore)
527 {
528         ast_assert(datastore != NULL);
529         ast_assert(datastore->info != NULL);
530         ast_assert(!ast_strlen_zero(datastore->uid));
531
532         if (!ao2_link(client->datastores, datastore)) {
533                 return -1;
534         }
535         return 0;
536 }
537
538 struct ast_datastore *ast_sip_publish_client_get_datastore(struct ast_sip_outbound_publish_client *client,
539         const char *name)
540 {
541         return ao2_find(client->datastores, name, OBJ_SEARCH_KEY);
542 }
543
544 void ast_sip_publish_client_remove_datastore(struct ast_sip_outbound_publish_client *client,
545         const char *name)
546 {
547         ao2_find(client->datastores, name, OBJ_SEARCH_KEY | OBJ_UNLINK | OBJ_NODATA);
548 }
549
550 static int sip_publish_client_service_queue(void *data)
551 {
552         RAII_VAR(struct ast_sip_outbound_publish_client *, client, data, ao2_cleanup);
553         SCOPED_AO2LOCK(lock, client);
554         struct sip_outbound_publish_message *message;
555         pjsip_tx_data *tdata;
556         pj_status_t status;
557
558         if (client->destroy || client->sending || !(message = AST_LIST_FIRST(&client->queue))) {
559                 return 0;
560         }
561
562         if (pjsip_publishc_publish(client->client, PJ_FALSE, &tdata) != PJ_SUCCESS) {
563                 goto fatal;
564         }
565
566         if (!ast_strlen_zero(message->body.type) && !ast_strlen_zero(message->body.subtype) &&
567                 ast_sip_add_body(tdata, &message->body)) {
568                 pjsip_tx_data_dec_ref(tdata);
569                 goto fatal;
570         }
571
572         status = pjsip_publishc_send(client->client, tdata);
573         if (status == PJ_EBUSY) {
574                 /* We attempted to send the message but something else got there first */
575                 goto service;
576         } else if (status != PJ_SUCCESS) {
577                 goto fatal;
578         }
579
580         client->sending = message;
581
582         return 0;
583
584 fatal:
585         AST_LIST_REMOVE_HEAD(&client->queue, entry);
586         ast_free(message);
587
588 service:
589         if (ast_sip_push_task(NULL, sip_publish_client_service_queue, ao2_bump(client))) {
590                 ao2_ref(client, -1);
591         }
592         return -1;
593 }
594
595 int ast_sip_publish_client_send(struct ast_sip_outbound_publish_client *client,
596         const struct ast_sip_body *body)
597 {
598         SCOPED_AO2LOCK(lock, client);
599         struct sip_outbound_publish_message *message;
600         size_t type_len = 0, subtype_len = 0, body_text_len = 0;
601         int res;
602
603         if (!client->client) {
604                 return -1;
605         }
606
607         /* If a body is present we need more space for the contents of it */
608         if (body) {
609                 type_len = strlen(body->type) + 1;
610                 subtype_len = strlen(body->subtype) + 1;
611                 body_text_len = strlen(body->body_text) + 1;
612         }
613
614         message = ast_calloc(1, sizeof(*message) + type_len + subtype_len + body_text_len);
615         if (!message) {
616                 return -1;
617         }
618
619         if (body) {
620                 char *dst = message->body_contents;
621
622                 message->body.type = strcpy(dst, body->type);
623                 dst += type_len;
624                 message->body.subtype = strcpy(dst, body->subtype);
625                 dst += subtype_len;
626                 message->body.body_text = strcpy(dst, body->body_text);
627         }
628
629         AST_LIST_INSERT_TAIL(&client->queue, message, entry);
630
631         res = ast_sip_push_task(NULL, sip_publish_client_service_queue, ao2_bump(client));
632         if (res) {
633                 ao2_ref(client, -1);
634         }
635
636         return res;
637 }
638
639 /*! \brief Destructor function for publish client */
640 static void sip_outbound_publish_client_destroy(void *obj)
641 {
642         struct ast_sip_outbound_publish_client *client = obj;
643         struct sip_outbound_publish_message *message;
644
645         /* You might be tempted to think "the publish client isn't being destroyed" but it actually is - just elsewhere */
646
647         while ((message = AST_LIST_REMOVE_HEAD(&client->queue, entry))) {
648                 ast_free(message);
649         }
650
651         ao2_cleanup(client->datastores);
652         ao2_cleanup(client->publish);
653
654         /* if unloading the module and all objects have been unpublished
655            send the signal to finish unloading */
656         if (unloading.is_unloading) {
657                 ast_mutex_lock(&unloading.lock);
658                 if (--unloading.count == 0) {
659                         ast_cond_signal(&unloading.cond);
660                 }
661                 ast_mutex_unlock(&unloading.lock);
662         }
663 }
664
665 static int explicit_publish_destroy(void *data)
666 {
667         struct ast_sip_outbound_publish_client *client = data;
668
669         pjsip_publishc_destroy(client->client);
670         ao2_ref(client, -1);
671
672         return 0;
673 }
674
675 /*! \brief Helper function which cancels and un-publishes a no longer used client */
676 static int cancel_and_unpublish(struct ast_sip_outbound_publish_client *client)
677 {
678         struct ast_sip_event_publisher_handler *handler;
679         SCOPED_AO2LOCK(lock, client);
680
681         if (!client->started) {
682                 /* If the client was never started, there's nothing to unpublish, so just
683                  * destroy the publication and remove its reference to the client.
684                  */
685                 ast_sip_push_task(NULL, explicit_publish_destroy, client);
686                 return 0;
687         }
688
689         handler = find_publisher_handler_for_event_name(client->publish->event);
690         if (handler) {
691                 handler->stop_publishing(client);
692         }
693
694         client->started = 0;
695         if (ast_sip_push_task(NULL, cancel_refresh_timer_task, ao2_bump(client))) {
696                 ast_log(LOG_WARNING, "Could not stop refresh timer on outbound publish '%s'\n",
697                         ast_sorcery_object_get_id(client->publish));
698                 ao2_ref(client, -1);
699         }
700
701         /* If nothing is being sent right now send the unpublish - the destroy will happen in the subsequent callback */
702         if (!client->sending) {
703                 if (ast_sip_push_task(NULL, send_unpublish_task, ao2_bump(client))) {
704                         ast_log(LOG_WARNING, "Could not send unpublish message on outbound publish '%s'\n",
705                                 ast_sorcery_object_get_id(client->publish));
706                         ao2_ref(client, -1);
707                 }
708         }
709         client->destroy = 1;
710         return 0;
711 }
712
713 /*! \brief Destructor function for publish state */
714 static void sip_outbound_publish_state_destroy(void *obj)
715 {
716         struct ast_sip_outbound_publish_state *state = obj;
717
718         cancel_and_unpublish(state->client);
719         ao2_cleanup(state->client);
720 }
721
722 /*!
723  * \internal
724  * \brief Check if a publish can be reused
725  *
726  * This checks if the existing outbound publish's configuration differs from a newly-applied
727  * outbound publish.
728  *
729  * \param existing The pre-existing outbound publish
730  * \param applied The newly-created publish
731  */
732 static int can_reuse_publish(struct ast_sip_outbound_publish *existing, struct ast_sip_outbound_publish *applied)
733 {
734         int i;
735
736         if (strcmp(existing->server_uri, applied->server_uri) || strcmp(existing->from_uri, applied->from_uri) ||
737                 strcmp(existing->to_uri, applied->to_uri) || strcmp(existing->outbound_proxy, applied->outbound_proxy) ||
738                 strcmp(existing->event, applied->event) ||
739                 AST_VECTOR_SIZE(&existing->outbound_auths) != AST_VECTOR_SIZE(&applied->outbound_auths)) {
740                 return 0;
741         }
742
743         for (i = 0; i < AST_VECTOR_SIZE(&existing->outbound_auths); ++i) {
744                 if (strcmp(AST_VECTOR_GET(&existing->outbound_auths, i), AST_VECTOR_GET(&applied->outbound_auths, i))) {
745                         return 0;
746                 }
747         }
748
749         return 1;
750 }
751
752 static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param);
753
754 /*! \brief Helper function that allocates a pjsip publish client and configures it */
755 static int sip_outbound_publish_client_alloc(void *data)
756 {
757         struct ast_sip_outbound_publish_client *client = data;
758         RAII_VAR(struct ast_sip_outbound_publish *, publish, NULL, ao2_cleanup);
759         pjsip_publishc_opt opt = {
760                 .queue_request = PJ_FALSE,
761         };
762         pj_str_t event, server_uri, to_uri, from_uri;
763         pj_status_t status;
764
765         if (client->client) {
766                 return 0;
767         } else if (pjsip_publishc_create(ast_sip_get_pjsip_endpoint(), &opt, ao2_bump(client), sip_outbound_publish_callback,
768                 &client->client) != PJ_SUCCESS) {
769                 ao2_ref(client, -1);
770                 return -1;
771         }
772
773         publish = ao2_bump(client->publish);
774
775         if (!ast_strlen_zero(publish->outbound_proxy)) {
776                 pjsip_route_hdr route_set, *route;
777                 static const pj_str_t ROUTE_HNAME = { "Route", 5 };
778
779                 pj_list_init(&route_set);
780
781                 if (!(route = pjsip_parse_hdr(pjsip_publishc_get_pool(client->client), &ROUTE_HNAME,
782                         (char*)publish->outbound_proxy, strlen(publish->outbound_proxy), NULL))) {
783                         pjsip_publishc_destroy(client->client);
784                         return -1;
785                 }
786                 pj_list_insert_nodes_before(&route_set, route);
787
788                 pjsip_publishc_set_route_set(client->client, &route_set);
789         }
790
791         pj_cstr(&event, publish->event);
792         pj_cstr(&server_uri, publish->server_uri);
793         pj_cstr(&to_uri, S_OR(publish->to_uri, publish->server_uri));
794         pj_cstr(&from_uri, S_OR(publish->from_uri, publish->server_uri));
795
796         status = pjsip_publishc_init(client->client, &event, &server_uri, &from_uri, &to_uri,
797                 publish->expiration);
798         if (status == PJSIP_EINVALIDURI) {
799                 pj_pool_t *pool;
800                 pj_str_t tmp;
801                 pjsip_uri *uri;
802
803                 pool = pjsip_endpt_create_pool(ast_sip_get_pjsip_endpoint(), "URI Validation", 256, 256);
804                 if (!pool) {
805                         ast_log(LOG_ERROR, "Could not create pool for URI validation on outbound publish '%s'\n",
806                                 ast_sorcery_object_get_id(publish));
807                         pjsip_publishc_destroy(client->client);
808                         return -1;
809                 }
810
811                 pj_strdup2_with_null(pool, &tmp, publish->server_uri);
812                 uri = pjsip_parse_uri(pool, tmp.ptr, tmp.slen, 0);
813                 if (!uri) {
814                         ast_log(LOG_ERROR, "Invalid server URI '%s' specified on outbound publish '%s'\n",
815                                 publish->server_uri, ast_sorcery_object_get_id(publish));
816                 }
817
818                 if (!ast_strlen_zero(publish->to_uri)) {
819                         pj_strdup2_with_null(pool, &tmp, publish->to_uri);
820                         uri = pjsip_parse_uri(pool, tmp.ptr, tmp.slen, 0);
821                         if (!uri) {
822                                 ast_log(LOG_ERROR, "Invalid to URI '%s' specified on outbound publish '%s'\n",
823                                         publish->to_uri, ast_sorcery_object_get_id(publish));
824                         }
825                 }
826
827                 if (!ast_strlen_zero(publish->from_uri)) {
828                         pj_strdup2_with_null(pool, &tmp, publish->from_uri);
829                         uri = pjsip_parse_uri(pool, tmp.ptr, tmp.slen, 0);
830                         if (!uri) {
831                                 ast_log(LOG_ERROR, "Invalid from URI '%s' specified on outbound publish '%s'\n",
832                                         publish->from_uri, ast_sorcery_object_get_id(publish));
833                         }
834                 }
835
836                 pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
837                 pjsip_publishc_destroy(client->client);
838                 return -1;
839         } else if (status != PJ_SUCCESS) {
840                 pjsip_publishc_destroy(client->client);
841                 return -1;
842         }
843
844         return 0;
845 }
846
847 /*! \brief Callback function for publish client responses */
848 static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param)
849 {
850         RAII_VAR(struct ast_sip_outbound_publish_client *, client, ao2_bump(param->token), ao2_cleanup);
851         RAII_VAR(struct ast_sip_outbound_publish *, publish, ao2_bump(client->publish), ao2_cleanup);
852         SCOPED_AO2LOCK(lock, client);
853         pjsip_tx_data *tdata;
854
855         if (client->destroy) {
856                 if (client->sending) {
857                         client->sending = NULL;
858
859                         if (!ast_sip_push_task(NULL, send_unpublish_task, ao2_bump(client))) {
860                                 return;
861                         }
862                         ast_log(LOG_WARNING, "Could not send unpublish message on outbound publish '%s'\n",
863                                 ast_sorcery_object_get_id(publish));
864                         ao2_ref(client, -1);
865                 }
866                 /* Once the destroy is called this callback will not get called any longer, so drop the client ref */
867                 pjsip_publishc_destroy(client->client);
868                 ao2_ref(client, -1);
869                 return;
870         }
871
872         if (param->code == 401 || param->code == 407) {
873                 if (!ast_sip_create_request_with_auth(&publish->outbound_auths,
874                                 param->rdata, pjsip_rdata_get_tsx(param->rdata), &tdata)) {
875                         pjsip_publishc_send(client->client, tdata);
876                 }
877                 client->auth_attempts++;
878
879                 if (client->auth_attempts == publish->max_auth_attempts) {
880                         pjsip_publishc_destroy(client->client);
881                         client->client = NULL;
882
883                         ast_log(LOG_ERROR, "Reached maximum number of PUBLISH authentication attempts on outbound publish '%s'\n",
884                                 ast_sorcery_object_get_id(publish));
885
886                         goto end;
887                 }
888                 return;
889         }
890
891         client->auth_attempts = 0;
892
893         if (param->code == 412) {
894                 pjsip_publishc_destroy(client->client);
895                 client->client = NULL;
896
897                 if (sip_outbound_publish_client_alloc(publish)) {
898                         ast_log(LOG_ERROR, "Failed to create a new outbound publish client for '%s' on 412 response\n",
899                                 ast_sorcery_object_get_id(publish));
900                         goto end;
901                 }
902
903                 /* Setting this to NULL will cause a new PUBLISH to get created and sent for the same underlying body */
904                 client->sending = NULL;
905         } else if (param->code == 423) {
906                 /* Update the expiration with the new expiration time if available */
907                 pjsip_expires_hdr *expires;
908
909                 expires = pjsip_msg_find_hdr(param->rdata->msg_info.msg, PJSIP_H_MIN_EXPIRES, NULL);
910                 if (!expires || !expires->ivalue) {
911                         ast_log(LOG_ERROR, "Received 423 response on outbound publish '%s' without a Min-Expires header\n",
912                                 ast_sorcery_object_get_id(publish));
913                         pjsip_publishc_destroy(client->client);
914                         client->client = NULL;
915                         goto end;
916                 }
917
918                 pjsip_publishc_update_expires(client->client, expires->ivalue);
919                 client->sending = NULL;
920         } else if (client->sending) {
921                 /* Remove the message currently being sent so that when the queue is serviced another will get sent */
922                 AST_LIST_REMOVE_HEAD(&client->queue, entry);
923                 ast_free(client->sending);
924                 client->sending = NULL;
925         }
926
927         if (AST_LIST_EMPTY(&client->queue)) {
928                 schedule_publish_refresh(client, param->rdata);
929         }
930
931 end:
932         if (!client->client) {
933                 struct sip_outbound_publish_message *message;
934
935                 while ((message = AST_LIST_REMOVE_HEAD(&client->queue, entry))) {
936                         ast_free(message);
937                 }
938         } else {
939                 if (ast_sip_push_task(NULL, sip_publish_client_service_queue, ao2_bump(client))) {
940                         ao2_ref(client, -1);
941                 }
942         }
943 }
944
945 #define DATASTORE_BUCKETS 53
946
947 static int datastore_hash(const void *obj, int flags)
948 {
949         const struct ast_datastore *datastore;
950         const char *uid;
951
952         switch (flags & OBJ_SEARCH_MASK) {
953         case OBJ_SEARCH_KEY:
954                 uid = obj;
955                 break;
956         case OBJ_SEARCH_OBJECT:
957                 datastore = obj;
958                 uid = datastore->uid;
959                 break;
960         default:
961                 /* Hash can only work on something with a full key. */
962                 ast_assert(0);
963                 return 0;
964         }
965
966         return ast_str_hash(uid);
967 }
968
969 static int datastore_cmp(void *obj, void *arg, int flags)
970 {
971         const struct ast_datastore *object_left = obj;
972         const struct ast_datastore *object_right = arg;
973         const char *right_key = arg;
974         int cmp;
975
976         switch (flags & OBJ_SEARCH_MASK) {
977         case OBJ_SEARCH_OBJECT:
978                 right_key = object_right->uid;
979                 /* Fall through */
980         case OBJ_SEARCH_KEY:
981                 cmp = strcmp(object_left->uid, right_key);
982                 break;
983         case OBJ_SEARCH_PARTIAL_KEY:
984         cmp = strncmp(object_left->uid, right_key, strlen(right_key));
985                 break;
986         default:
987                 /*
988                  * What arg points to is specific to this traversal callback
989                  * and has no special meaning to astobj2.
990                  */
991                 cmp = 0;
992                 break;
993         }
994         if (cmp) {
995                 return 0;
996         }
997         /*
998          * At this point the traversal callback is identical to a sorted
999          * container.
1000          */
1001         return CMP_MATCH;
1002 }
1003
1004 /*! \brief Allocator function for publish client */
1005 static struct ast_sip_outbound_publish_state *sip_outbound_publish_state_alloc(
1006         struct ast_sip_outbound_publish *publish)
1007 {
1008         const char *id = ast_sorcery_object_get_id(publish);
1009         struct ast_sip_outbound_publish_state *state =
1010                 ao2_alloc(sizeof(*state) + strlen(id) + 1, sip_outbound_publish_state_destroy);
1011
1012         if (!state) {
1013                 return NULL;
1014         }
1015
1016         state->client = ao2_alloc(sizeof(*state->client), sip_outbound_publish_client_destroy);
1017         if (!state->client) {
1018                 ao2_ref(state, -1);
1019                 return NULL;
1020         }
1021
1022         state->client->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp);
1023         if (!state->client->datastores) {
1024                 ao2_ref(state, -1);
1025                 return NULL;
1026         }
1027
1028         state->client->timer.user_data = state->client;
1029         state->client->timer.cb = sip_outbound_publish_timer_cb;
1030         state->client->publish = ao2_bump(publish);
1031
1032         strcpy(state->id, id);
1033         return state;
1034 }
1035
1036 /*! \brief Apply function which finds or allocates a state structure */
1037 static int sip_outbound_publish_apply(const struct ast_sorcery *sorcery, void *obj)
1038 {
1039         RAII_VAR(struct ao2_container *, states, ao2_global_obj_ref(current_states), ao2_cleanup);
1040         RAII_VAR(struct ast_sip_outbound_publish_state *, state, NULL, ao2_cleanup);
1041         struct ast_sip_outbound_publish *applied = obj;
1042
1043         if (ast_strlen_zero(applied->server_uri)) {
1044                 ast_log(LOG_ERROR, "No server URI specified on outbound publish '%s'\n",
1045                         ast_sorcery_object_get_id(applied));
1046                 return -1;
1047         } else if (ast_strlen_zero(applied->event)) {
1048                 ast_log(LOG_ERROR, "No event type specified for outbound publish '%s'\n",
1049                         ast_sorcery_object_get_id(applied));
1050                 return -1;
1051         }
1052
1053         if (!new_states) {
1054                 /* make sure new_states has been allocated as we will be adding to it */
1055                 new_states = ao2_container_alloc_options(
1056                         AO2_ALLOC_OPT_LOCK_NOLOCK, DEFAULT_STATE_BUCKETS,
1057                         outbound_publish_state_hash, outbound_publish_state_cmp);
1058
1059                 if (!new_states) {
1060                         ast_log(LOG_ERROR, "Unable to allocate new states container\n");
1061                         return -1;
1062                 }
1063         }
1064
1065         if (states) {
1066                 state = ao2_find(states, ast_sorcery_object_get_id(obj), OBJ_SEARCH_KEY);
1067                 if (state) {
1068                         if (can_reuse_publish(state->client->publish, applied)) {
1069                                 ao2_replace(state->client->publish, applied);
1070                         } else {
1071                                 ao2_ref(state, -1);
1072                                 state = NULL;
1073                         }
1074                 }
1075         }
1076
1077         if (!state) {
1078                 state = sip_outbound_publish_state_alloc(applied);
1079                 if (!state) {
1080                         ast_log(LOG_ERROR, "Unable to create state for outbound publish '%s'\n",
1081                                 ast_sorcery_object_get_id(applied));
1082                         return -1;
1083                 };
1084         }
1085
1086         if (ast_sip_push_task_synchronous(NULL, sip_outbound_publish_client_alloc, state->client)) {
1087                 ast_log(LOG_ERROR, "Unable to create client for outbound publish '%s'\n",
1088                         ast_sorcery_object_get_id(applied));
1089                 return -1;
1090         }
1091
1092         ao2_link(new_states, state);
1093         return 0;
1094 }
1095
1096 static int outbound_auth_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
1097 {
1098         struct ast_sip_outbound_publish *publish = obj;
1099
1100         return ast_sip_auth_vector_init(&publish->outbound_auths, var->value);
1101 }
1102
1103 static int load_module(void)
1104 {
1105         CHECK_PJSIP_MODULE_LOADED();
1106
1107         ast_sorcery_apply_config(ast_sip_get_sorcery(), "res_pjsip_outbound_publish");
1108         ast_sorcery_apply_default(ast_sip_get_sorcery(), "outbound-publish", "config", "pjsip.conf,criteria=type=outbound-publish");
1109
1110         if (ast_sorcery_object_register(ast_sip_get_sorcery(), "outbound-publish", sip_outbound_publish_alloc, NULL,
1111                 sip_outbound_publish_apply)) {
1112                 return AST_MODULE_LOAD_DECLINE;
1113         }
1114
1115         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "type", "", OPT_NOOP_T, 0, 0);
1116         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "server_uri", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_outbound_publish, server_uri));
1117         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "from_uri", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_outbound_publish, from_uri));
1118         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "event", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_outbound_publish, event));
1119         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "to_uri", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_outbound_publish, to_uri));
1120         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "outbound_proxy", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_outbound_publish, outbound_proxy));
1121         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "expiration", "3600", OPT_UINT_T, 0, FLDSET(struct ast_sip_outbound_publish, expiration));
1122         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "max_auth_attempts", "5", OPT_UINT_T, 0, FLDSET(struct ast_sip_outbound_publish, max_auth_attempts));
1123         ast_sorcery_object_field_register_custom(ast_sip_get_sorcery(), "outbound-publish", "outbound_auth", "", outbound_auth_handler, NULL, NULL, 0, 0);
1124
1125         ast_sorcery_reload_object(ast_sip_get_sorcery(), "outbound-publish");
1126
1127         AST_RWLIST_RDLOCK(&publisher_handlers);
1128         sip_outbound_publish_synchronize(NULL);
1129         AST_RWLIST_UNLOCK(&publisher_handlers);
1130
1131         pjsip_publishc_init_module(ast_sip_get_pjsip_endpoint());
1132
1133         return AST_MODULE_LOAD_SUCCESS;
1134 }
1135
1136 static int reload_module(void)
1137 {
1138         ast_sorcery_reload_object(ast_sip_get_sorcery(), "outbound-publish");
1139
1140         AST_RWLIST_RDLOCK(&publisher_handlers);
1141         sip_outbound_publish_synchronize(NULL);
1142         AST_RWLIST_UNLOCK(&publisher_handlers);
1143         return 0;
1144 }
1145
1146 static int unload_module(void)
1147 {
1148         struct timeval start = ast_tvnow();
1149         struct timespec end = {
1150                 .tv_sec = start.tv_sec + 10,
1151                 .tv_nsec = start.tv_usec * 1000
1152         };
1153         int res = 0;
1154         struct ao2_container *states = ao2_global_obj_ref(current_states);
1155
1156         if (!states || !(unloading.count = ao2_container_count(states))) {
1157                 return 0;
1158         }
1159         ao2_ref(states, -1);
1160
1161         ast_mutex_init(&unloading.lock);
1162         ast_cond_init(&unloading.cond, NULL);
1163         ast_mutex_lock(&unloading.lock);
1164
1165         unloading.is_unloading = 1;
1166         ao2_global_obj_release(current_states);
1167
1168         /* wait for items to unpublish */
1169         ast_verb(5, "Waiting to complete unpublishing task(s)\n");
1170         while (unloading.count) {
1171                 res = ast_cond_timedwait(&unloading.cond, &unloading.lock, &end);
1172         }
1173         ast_mutex_unlock(&unloading.lock);
1174
1175         ast_mutex_destroy(&unloading.lock);
1176         ast_cond_destroy(&unloading.cond);
1177
1178         if (res) {
1179                 ast_verb(5, "At least %d items were unable to unpublish "
1180                         "in the allowed time\n", unloading.count);
1181         } else {
1182                 ast_verb(5, "All items successfully unpublished\n");
1183         }
1184
1185         return res;
1186 }
1187
1188 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS | AST_MODFLAG_LOAD_ORDER, "PJSIP Outbound Publish Support",
1189                 .load = load_module,
1190                 .reload = reload_module,
1191                 .unload = unload_module,
1192                 .load_pri = AST_MODPRI_CHANNEL_DEPEND,
1193                );