a58bcbb9165e4dc9643f87df5fda2f99d11e1d16
[asterisk/asterisk.git] / res / res_pjsip_outbound_publish.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2014, Digium, Inc.
5  *
6  * Joshua Colp <jcolp@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*** MODULEINFO
20         <depend>pjproject</depend>
21         <depend>res_pjsip</depend>
22         <support_level>core</support_level>
23  ***/
24
25 #include "asterisk.h"
26
27 #include <pjsip.h>
28 #include <pjsip_simple.h>
29
30 #include "asterisk/res_pjsip.h"
31 #include "asterisk/res_pjsip_outbound_publish.h"
32 #include "asterisk/module.h"
33 #include "asterisk/taskprocessor.h"
34 #include "asterisk/datastore.h"
35
36 /*** DOCUMENTATION
37         <configInfo name="res_pjsip_outbound_publish" language="en_US">
38                 <synopsis>SIP resource for outbound publish</synopsis>
39                 <description><para>
40                         <emphasis>Outbound Publish</emphasis>
41                         </para>
42                         <para>This module allows <literal>res_pjsip</literal> to publish to other SIP servers.</para>
43                 </description>
44                 <configFile name="pjsip.conf">
45                         <configObject name="outbound-publish">
46                                 <synopsis>The configuration for outbound publish</synopsis>
47                                 <description><para>
48                                         Publish is <emphasis>COMPLETELY</emphasis> separate from the rest of
49                                         <literal>pjsip.conf</literal>. A minimal configuration consists of
50                                         setting a <literal>server_uri</literal> and <literal>event</literal>.
51                                 </para></description>
52                                 <configOption name="expiration" default="3600">
53                                         <synopsis>Expiration time for publications in seconds</synopsis>
54                                 </configOption>
55                                 <configOption name="outbound_auth" default="">
56                                         <synopsis>Authentication object to be used for outbound publishes.</synopsis>
57                                 </configOption>
58                                 <configOption name="outbound_proxy" default="">
59                                         <synopsis>SIP URI of the outbound proxy used to send publishes</synopsis>
60                                 </configOption>
61                                 <configOption name="server_uri">
62                                         <synopsis>SIP URI of the server and entity to publish to</synopsis>
63                                         <description><para>
64                                                 This is the URI at which to find the entity and server to send the outbound PUBLISH to.
65                                                 This URI is used as the request URI of the outbound PUBLISH request from Asterisk.
66                                         </para></description>
67                                 </configOption>
68                                 <configOption name="from_uri">
69                                         <synopsis>SIP URI to use in the From header</synopsis>
70                                         <description><para>
71                                                 This is the URI that will be placed into the From header of outgoing PUBLISH
72                                                 messages. If no URI is specified then the URI provided in <literal>server_uri</literal>
73                                                 will be used.
74                                         </para></description>
75                                 </configOption>
76                                 <configOption name="to_uri">
77                                         <synopsis>SIP URI to use in the To header</synopsis>
78                                         <description><para>
79                                                 This is the URI that will be placed into the To header of outgoing PUBLISH
80                                                 messages. If no URI is specified then the URI provided in <literal>server_uri</literal>
81                                                 will be used.
82                                         </para></description>
83                                 </configOption>
84                                 <configOption name="event" default="">
85                                         <synopsis>Event type of the PUBLISH.</synopsis>
86                                 </configOption>
87                                 <configOption name="max_auth_attempts" default="5">
88                                         <synopsis>Maximum number of authentication attempts before stopping the publication.</synopsis>
89                                 </configOption>
90                                 <configOption name="type">
91                                         <synopsis>Must be of type 'outbound-publish'.</synopsis>
92                                 </configOption>
93                         </configObject>
94                 </configFile>
95         </configInfo>
96  ***/
97
98 /*! \brief Queued outbound publish message */
99 struct sip_outbound_publish_message {
100         /*! \brief Optional body */
101         struct ast_sip_body body;
102         /*! \brief Linked list information */
103         AST_LIST_ENTRY(sip_outbound_publish_message) entry;
104         /*! \brief Extra space for body contents */
105         char body_contents[0];
106 };
107
108 /*! \brief Outbound publish information */
109 struct ast_sip_outbound_publish {
110         /*! \brief Sorcery object details */
111         SORCERY_OBJECT(details);
112         /*! \brief Stringfields */
113         AST_DECLARE_STRING_FIELDS(
114                 /*! \brief URI for the entity and server */
115                 AST_STRING_FIELD(server_uri);
116                 /*! \brief URI for the From header */
117                 AST_STRING_FIELD(from_uri);
118                 /*! \brief URI for the To header */
119                 AST_STRING_FIELD(to_uri);
120                 /*! \brief Outbound proxy to use */
121                 AST_STRING_FIELD(outbound_proxy);
122                 /*! \brief The event type to publish */
123                 AST_STRING_FIELD(event);
124         );
125         /*! \brief Requested expiration time */
126         unsigned int expiration;
127         /*! \brief Maximum number of auth attempts before stopping the publish client */
128         unsigned int max_auth_attempts;
129         /*! \brief Configured authentication credentials */
130         struct ast_sip_auth_vector outbound_auths;
131 };
132
133 /*! \brief Outbound publish client state information (persists for lifetime that publish should exist) */
134 struct ast_sip_outbound_publish_client {
135         /*! \brief Underlying publish client */
136         pjsip_publishc *client;
137         /*! \brief Timer entry for refreshing publish */
138         pj_timer_entry timer;
139         /*! \brief Publisher datastores set up by handlers */
140         struct ao2_container *datastores;
141         /*! \brief The number of auth attempts done */
142         unsigned int auth_attempts;
143         /*! \brief Queue of outgoing publish messages to send*/
144         AST_LIST_HEAD_NOLOCK(, sip_outbound_publish_message) queue;
145         /*! \brief The message currently being sent */
146         struct sip_outbound_publish_message *sending;
147         /*! \brief Publish client has been fully started and event type informed */
148         unsigned int started;
149         /*! \brief Publish client should be destroyed */
150         unsigned int destroy;
151         /*! \brief Outbound publish information */
152         struct ast_sip_outbound_publish *publish;
153 };
154
155 /*! \brief Outbound publish state information (persists for lifetime of a publish) */
156 struct ast_sip_outbound_publish_state {
157         /*! \brief Outbound publish client */
158         struct ast_sip_outbound_publish_client *client;
159         /* publish state id lookup key - same as publish configuration id */
160         char id[0];
161 };
162
163 /*! \brief Unloading data */
164 struct unloading_data {
165         int is_unloading;
166         int count;
167         ast_mutex_t lock;
168         ast_cond_t cond;
169 } unloading;
170
171 /*! \brief Default number of client state container buckets */
172 #define DEFAULT_STATE_BUCKETS 31
173 static AO2_GLOBAL_OBJ_STATIC(current_states);
174 /*! \brief Used on [re]loads to hold new state data */
175 static struct ao2_container *new_states;
176
177 /*! \brief hashing function for state objects */
178 static int outbound_publish_state_hash(const void *obj, const int flags)
179 {
180         const struct ast_sip_outbound_publish_state *object;
181         const char *key;
182
183         switch (flags & OBJ_SEARCH_MASK) {
184         case OBJ_SEARCH_KEY:
185                 key = obj;
186                 break;
187         case OBJ_SEARCH_OBJECT:
188                 object = obj;
189                 key = object->id;
190                 break;
191         default:
192                 ast_assert(0);
193                 return 0;
194         }
195         return ast_str_hash(key);
196 }
197
198 /*! \brief comparator function for client objects */
199 static int outbound_publish_state_cmp(void *obj, void *arg, int flags)
200 {
201         const struct ast_sip_outbound_publish_state *object_left = obj;
202         const struct ast_sip_outbound_publish_state *object_right = arg;
203         const char *right_key = arg;
204         int cmp;
205
206         switch (flags & OBJ_SEARCH_MASK) {
207         case OBJ_SEARCH_OBJECT:
208                 right_key = object_right->id;
209                 /* Fall through */
210         case OBJ_SEARCH_KEY:
211                 cmp = strcmp(object_left->id, right_key);
212                 break;
213         case OBJ_SEARCH_PARTIAL_KEY:
214                 /* Not supported by container. */
215                 ast_assert(0);
216                 return 0;
217         default:
218                 cmp = 0;
219                 break;
220         }
221         if (cmp) {
222                 return 0;
223         }
224         return CMP_MATCH;
225 }
226
227 static struct ao2_container *get_publishes_and_update_state(void)
228 {
229         struct ao2_container *container;
230
231         container = ast_sorcery_retrieve_by_fields(
232                 ast_sip_get_sorcery(), "outbound-publish",
233                 AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL);
234
235         if (!new_states) {
236                 return container;
237         }
238
239         ao2_global_obj_replace_unref(current_states, new_states);
240         ao2_cleanup(new_states);
241         new_states = NULL;
242
243         return container;
244 }
245
246 AST_RWLIST_HEAD_STATIC(publisher_handlers, ast_sip_event_publisher_handler);
247
248 static void sub_add_handler(struct ast_sip_event_publisher_handler *handler)
249 {
250         AST_RWLIST_INSERT_TAIL(&publisher_handlers, handler, next);
251         ast_module_ref(ast_module_info->self);
252 }
253
254 static struct ast_sip_event_publisher_handler *find_publisher_handler_for_event_name(const char *event_name)
255 {
256         struct ast_sip_event_publisher_handler *iter;
257
258         AST_RWLIST_TRAVERSE(&publisher_handlers, iter, next) {
259                 if (!strcmp(iter->event_name, event_name)) {
260                         break;
261                 }
262         }
263         return iter;
264 }
265
266 /*! \brief Helper function which cancels the refresh timer on a client */
267 static void cancel_publish_refresh(struct ast_sip_outbound_publish_client *client)
268 {
269         if (pj_timer_heap_cancel(pjsip_endpt_get_timer_heap(ast_sip_get_pjsip_endpoint()), &client->timer)) {
270                 /* The timer was successfully cancelled, drop the refcount of the client */
271                 ao2_ref(client, -1);
272         }
273 }
274
275 /*! \brief Helper function which sets up the timer to send publication */
276 static void schedule_publish_refresh(struct ast_sip_outbound_publish_client *client, int expiration)
277 {
278         struct ast_sip_outbound_publish *publish = ao2_bump(client->publish);
279         pj_time_val delay = { .sec = 0, };
280
281         cancel_publish_refresh(client);
282
283         if (expiration > 0) {
284                 delay.sec = expiration - PJSIP_PUBLISHC_DELAY_BEFORE_REFRESH;
285         }
286         if (publish->expiration && ((delay.sec > publish->expiration) || !delay.sec)) {
287                 delay.sec = publish->expiration;
288         }
289         if (delay.sec < PJSIP_PUBLISHC_DELAY_BEFORE_REFRESH) {
290                 delay.sec = PJSIP_PUBLISHC_DELAY_BEFORE_REFRESH;
291         }
292
293         ao2_ref(client, +1);
294         if (pjsip_endpt_schedule_timer(ast_sip_get_pjsip_endpoint(), &client->timer, &delay) != PJ_SUCCESS) {
295                 ast_log(LOG_WARNING, "Failed to pass timed publish refresh to scheduler\n");
296                 ao2_ref(client, -1);
297         }
298         ao2_ref(publish, -1);
299 }
300
301 /*! \brief Publish client timer callback function */
302 static void sip_outbound_publish_timer_cb(pj_timer_heap_t *timer_heap, struct pj_timer_entry *entry)
303 {
304         struct ast_sip_outbound_publish_client *client = entry->user_data;
305
306         ao2_lock(client);
307         if (AST_LIST_EMPTY(&client->queue)) {
308                 /* If there are no outstanding messages send an empty PUBLISH message so our publication doesn't expire */
309                 ast_sip_publish_client_send(client, NULL);
310         }
311         ao2_unlock(client);
312
313         ao2_ref(client, -1);
314 }
315
316 /*! \brief Task for cancelling a refresh timer */
317 static int cancel_refresh_timer_task(void *data)
318 {
319         struct ast_sip_outbound_publish_client *client = data;
320
321         cancel_publish_refresh(client);
322         ao2_ref(client, -1);
323
324         return 0;
325 }
326
327 /*! \brief Task for sending an unpublish */
328 static int send_unpublish_task(void *data)
329 {
330         struct ast_sip_outbound_publish_client *client = data;
331         pjsip_tx_data *tdata;
332
333         if (pjsip_publishc_unpublish(client->client, &tdata) == PJ_SUCCESS) {
334                 pjsip_publishc_send(client->client, tdata);
335         }
336
337         ao2_ref(client, -1);
338
339         return 0;
340 }
341
342 /*! \brief Helper function which starts or stops publish clients when applicable */
343 static void sip_outbound_publish_synchronize(struct ast_sip_event_publisher_handler *removed)
344 {
345         RAII_VAR(struct ao2_container *, publishes, get_publishes_and_update_state(), ao2_cleanup);
346         struct ao2_container *states;
347         struct ao2_iterator i;
348         struct ast_sip_outbound_publish_state *state;
349
350         if (!publishes) {
351                 return;
352         }
353
354         states = ao2_global_obj_ref(current_states);
355         if (!states) {
356                 return;
357         }
358
359         i = ao2_iterator_init(states, 0);
360         while ((state = ao2_iterator_next(&i))) {
361                 struct ast_sip_outbound_publish *publish = ao2_bump(state->client->publish);
362                 struct ast_sip_event_publisher_handler *handler = find_publisher_handler_for_event_name(publish->event);
363
364                 if (!state->client->started) {
365                         /* If the publisher client has not yet been started try to start it */
366                         if (!handler) {
367                                 ast_debug(2, "Could not find handler for event '%s' for outbound publish client '%s'\n",
368                                           publish->event, ast_sorcery_object_get_id(publish));
369                         } else if (handler->start_publishing(publish, state->client)) {
370                                 ast_log(LOG_ERROR, "Failed to start outbound publish with event '%s' for client '%s'\n",
371                                         publish->event, ast_sorcery_object_get_id(publish));
372                         } else {
373                                 state->client->started = 1;
374                         }
375                 } else if (state->client->started && !handler && removed && !strcmp(publish->event, removed->event_name)) {
376                         /* If the publisher client has been started but it is going away stop it */
377                         removed->stop_publishing(state->client);
378                         state->client->started = 0;
379                         if (ast_sip_push_task(NULL, cancel_refresh_timer_task, ao2_bump(state->client))) {
380                                 ast_log(LOG_WARNING, "Could not stop refresh timer on client '%s'\n",
381                                         ast_sorcery_object_get_id(publish));
382                                 ao2_ref(state->client, -1);
383                         }
384                 }
385                 ao2_ref(publish, -1);
386                 ao2_ref(state, -1);
387         }
388         ao2_iterator_destroy(&i);
389         ao2_ref(states, -1);
390 }
391
392 struct ast_sip_outbound_publish_client *ast_sip_publish_client_get(const char *name)
393 {
394         RAII_VAR(struct ao2_container *, states,
395                  ao2_global_obj_ref(current_states), ao2_cleanup);
396         RAII_VAR(struct ast_sip_outbound_publish_state *, state, NULL, ao2_cleanup);
397
398         if (!states) {
399                 return NULL;
400         }
401
402         state = ao2_find(states, name, OBJ_SEARCH_KEY);
403         if (!state) {
404                 return NULL;
405         }
406
407         ao2_ref(state->client, +1);
408         return state->client;
409 }
410
411 int ast_sip_register_event_publisher_handler(struct ast_sip_event_publisher_handler *handler)
412 {
413         struct ast_sip_event_publisher_handler *existing;
414         SCOPED_LOCK(lock, &publisher_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
415
416         if (!handler->start_publishing || !handler->stop_publishing) {
417                 ast_log(LOG_ERROR, "Handler does not implement required callbacks. Cannot register\n");
418                 return -1;
419         } else if (ast_strlen_zero(handler->event_name)) {
420                 ast_log(LOG_ERROR, "No event package specified for event publisher handler. Cannot register\n");
421                 return -1;
422         }
423
424         existing = find_publisher_handler_for_event_name(handler->event_name);
425         if (existing) {
426                 ast_log(LOG_ERROR, "Unable to register event publisher handler for event %s. "
427                                 "A handler is already registered\n", handler->event_name);
428                 return -1;
429         }
430
431         sub_add_handler(handler);
432
433         sip_outbound_publish_synchronize(NULL);
434
435         return 0;
436 }
437
438 void ast_sip_unregister_event_publisher_handler(struct ast_sip_event_publisher_handler *handler)
439 {
440         struct ast_sip_event_publisher_handler *iter;
441         SCOPED_LOCK(lock, &publisher_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
442         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&publisher_handlers, iter, next) {
443                 if (handler == iter) {
444                         AST_RWLIST_REMOVE_CURRENT(next);
445                         ast_module_unref(ast_module_info->self);
446                         break;
447                 }
448         }
449         AST_RWLIST_TRAVERSE_SAFE_END;
450
451         sip_outbound_publish_synchronize(handler);
452 }
453
454 /*! \brief Destructor function for publish information */
455 static void sip_outbound_publish_destroy(void *obj)
456 {
457         struct ast_sip_outbound_publish *publish = obj;
458
459         ast_sip_auth_vector_destroy(&publish->outbound_auths);
460
461         ast_string_field_free_memory(publish);
462 }
463
464 /*! \brief Allocator function for publish information */
465 static void *sip_outbound_publish_alloc(const char *name)
466 {
467         struct ast_sip_outbound_publish *publish = ast_sorcery_generic_alloc(sizeof(*publish),
468                 sip_outbound_publish_destroy);
469
470         if (!publish || ast_string_field_init(publish, 256)) {
471                 ao2_cleanup(publish);
472                 return NULL;
473         }
474
475         return publish;
476 }
477
478 static void sip_outbound_publish_datastore_destroy(void *obj)
479 {
480         struct ast_datastore *datastore = obj;
481
482         /* Using the destroy function (if present) destroy the data */
483         if (datastore->info->destroy != NULL && datastore->data != NULL) {
484                 datastore->info->destroy(datastore->data);
485                 datastore->data = NULL;
486         }
487
488         ast_free((void *) datastore->uid);
489         datastore->uid = NULL;
490 }
491
492 struct ast_datastore *ast_sip_publish_client_alloc_datastore(const struct ast_datastore_info *info, const char *uid)
493 {
494         RAII_VAR(struct ast_datastore *, datastore, NULL, ao2_cleanup);
495         const char *uid_ptr = uid;
496         char uuid_buf[AST_UUID_STR_LEN];
497
498         if (!info) {
499                 return NULL;
500         }
501
502         datastore = ao2_alloc(sizeof(*datastore), sip_outbound_publish_datastore_destroy);
503         if (!datastore) {
504                 return NULL;
505         }
506
507         datastore->info = info;
508         if (ast_strlen_zero(uid)) {
509                 /* They didn't provide an ID so we'll provide one ourself */
510                 uid_ptr = ast_uuid_generate_str(uuid_buf, sizeof(uuid_buf));
511         }
512
513         datastore->uid = ast_strdup(uid_ptr);
514         if (!datastore->uid) {
515                 return NULL;
516         }
517
518         ao2_ref(datastore, +1);
519         return datastore;
520 }
521
522 int ast_sip_publish_client_add_datastore(struct ast_sip_outbound_publish_client *client,
523         struct ast_datastore *datastore)
524 {
525         ast_assert(datastore != NULL);
526         ast_assert(datastore->info != NULL);
527         ast_assert(!ast_strlen_zero(datastore->uid));
528
529         if (!ao2_link(client->datastores, datastore)) {
530                 return -1;
531         }
532         return 0;
533 }
534
535 struct ast_datastore *ast_sip_publish_client_get_datastore(struct ast_sip_outbound_publish_client *client,
536         const char *name)
537 {
538         return ao2_find(client->datastores, name, OBJ_SEARCH_KEY);
539 }
540
541 void ast_sip_publish_client_remove_datastore(struct ast_sip_outbound_publish_client *client,
542         const char *name)
543 {
544         ao2_find(client->datastores, name, OBJ_SEARCH_KEY | OBJ_UNLINK | OBJ_NODATA);
545 }
546
547 static int sip_publish_client_service_queue(void *data)
548 {
549         RAII_VAR(struct ast_sip_outbound_publish_client *, client, data, ao2_cleanup);
550         SCOPED_AO2LOCK(lock, client);
551         struct sip_outbound_publish_message *message;
552         pjsip_tx_data *tdata;
553         pj_status_t status;
554
555         if (client->destroy || client->sending || !(message = AST_LIST_FIRST(&client->queue))) {
556                 return 0;
557         }
558
559         if (pjsip_publishc_publish(client->client, PJ_FALSE, &tdata) != PJ_SUCCESS) {
560                 goto fatal;
561         }
562
563         if (!ast_strlen_zero(message->body.type) && !ast_strlen_zero(message->body.subtype) &&
564                 ast_sip_add_body(tdata, &message->body)) {
565                 pjsip_tx_data_dec_ref(tdata);
566                 goto fatal;
567         }
568
569         status = pjsip_publishc_send(client->client, tdata);
570         if (status == PJ_EBUSY) {
571                 /* We attempted to send the message but something else got there first */
572                 goto service;
573         } else if (status != PJ_SUCCESS) {
574                 goto fatal;
575         }
576
577         client->sending = message;
578
579         return 0;
580
581 fatal:
582         AST_LIST_REMOVE_HEAD(&client->queue, entry);
583         ast_free(message);
584
585 service:
586         if (ast_sip_push_task(NULL, sip_publish_client_service_queue, ao2_bump(client))) {
587                 ao2_ref(client, -1);
588         }
589         return -1;
590 }
591
592 int ast_sip_publish_client_send(struct ast_sip_outbound_publish_client *client,
593         const struct ast_sip_body *body)
594 {
595         SCOPED_AO2LOCK(lock, client);
596         struct sip_outbound_publish_message *message;
597         size_t type_len = 0, subtype_len = 0, body_text_len = 0;
598         int res;
599
600         if (!client->client) {
601                 return -1;
602         }
603
604         /* If a body is present we need more space for the contents of it */
605         if (body) {
606                 type_len = strlen(body->type) + 1;
607                 subtype_len = strlen(body->subtype) + 1;
608                 body_text_len = strlen(body->body_text) + 1;
609         }
610
611         message = ast_calloc(1, sizeof(*message) + type_len + subtype_len + body_text_len);
612         if (!message) {
613                 return -1;
614         }
615
616         if (body) {
617                 char *dst = message->body_contents;
618
619                 message->body.type = strcpy(dst, body->type);
620                 dst += type_len;
621                 message->body.subtype = strcpy(dst, body->subtype);
622                 dst += subtype_len;
623                 message->body.body_text = strcpy(dst, body->body_text);
624         }
625
626         AST_LIST_INSERT_TAIL(&client->queue, message, entry);
627
628         res = ast_sip_push_task(NULL, sip_publish_client_service_queue, ao2_bump(client));
629         if (res) {
630                 ao2_ref(client, -1);
631         }
632
633         return res;
634 }
635
636 /*! \brief Destructor function for publish client */
637 static void sip_outbound_publish_client_destroy(void *obj)
638 {
639         struct ast_sip_outbound_publish_client *client = obj;
640         struct sip_outbound_publish_message *message;
641
642         /* You might be tempted to think "the publish client isn't being destroyed" but it actually is - just elsewhere */
643
644         while ((message = AST_LIST_REMOVE_HEAD(&client->queue, entry))) {
645                 ast_free(message);
646         }
647
648         ao2_cleanup(client->datastores);
649         ao2_cleanup(client->publish);
650
651         /* if unloading the module and all objects have been unpublished
652            send the signal to finish unloading */
653         if (unloading.is_unloading) {
654                 ast_mutex_lock(&unloading.lock);
655                 if (--unloading.count == 0) {
656                         ast_cond_signal(&unloading.cond);
657                 }
658                 ast_mutex_unlock(&unloading.lock);
659         }
660 }
661
662 static int explicit_publish_destroy(void *data)
663 {
664         struct ast_sip_outbound_publish_client *client = data;
665
666         pjsip_publishc_destroy(client->client);
667         ao2_ref(client, -1);
668
669         return 0;
670 }
671
672 /*! \brief Helper function which cancels and un-publishes a no longer used client */
673 static int cancel_and_unpublish(struct ast_sip_outbound_publish_client *client)
674 {
675         struct ast_sip_event_publisher_handler *handler;
676         SCOPED_AO2LOCK(lock, client);
677
678         if (!client->started) {
679                 /* If the client was never started, there's nothing to unpublish, so just
680                  * destroy the publication and remove its reference to the client.
681                  */
682                 ast_sip_push_task(NULL, explicit_publish_destroy, client);
683                 return 0;
684         }
685
686         handler = find_publisher_handler_for_event_name(client->publish->event);
687         if (handler) {
688                 handler->stop_publishing(client);
689         }
690
691         client->started = 0;
692         if (ast_sip_push_task(NULL, cancel_refresh_timer_task, ao2_bump(client))) {
693                 ast_log(LOG_WARNING, "Could not stop refresh timer on outbound publish '%s'\n",
694                         ast_sorcery_object_get_id(client->publish));
695                 ao2_ref(client, -1);
696         }
697
698         /* If nothing is being sent right now send the unpublish - the destroy will happen in the subsequent callback */
699         if (!client->sending) {
700                 if (ast_sip_push_task(NULL, send_unpublish_task, ao2_bump(client))) {
701                         ast_log(LOG_WARNING, "Could not send unpublish message on outbound publish '%s'\n",
702                                 ast_sorcery_object_get_id(client->publish));
703                         ao2_ref(client, -1);
704                 }
705         }
706         client->destroy = 1;
707         return 0;
708 }
709
710 /*! \brief Destructor function for publish state */
711 static void sip_outbound_publish_state_destroy(void *obj)
712 {
713         struct ast_sip_outbound_publish_state *state = obj;
714
715         cancel_and_unpublish(state->client);
716         ao2_cleanup(state->client);
717 }
718
719 /*!
720  * \internal
721  * \brief Check if a publish can be reused
722  *
723  * This checks if the existing outbound publish's configuration differs from a newly-applied
724  * outbound publish.
725  *
726  * \param existing The pre-existing outbound publish
727  * \param applied The newly-created publish
728  */
729 static int can_reuse_publish(struct ast_sip_outbound_publish *existing, struct ast_sip_outbound_publish *applied)
730 {
731         int i;
732
733         if (strcmp(existing->server_uri, applied->server_uri) || strcmp(existing->from_uri, applied->from_uri) ||
734                 strcmp(existing->to_uri, applied->to_uri) || strcmp(existing->outbound_proxy, applied->outbound_proxy) ||
735                 strcmp(existing->event, applied->event) ||
736                 AST_VECTOR_SIZE(&existing->outbound_auths) != AST_VECTOR_SIZE(&applied->outbound_auths)) {
737                 return 0;
738         }
739
740         for (i = 0; i < AST_VECTOR_SIZE(&existing->outbound_auths); ++i) {
741                 if (strcmp(AST_VECTOR_GET(&existing->outbound_auths, i), AST_VECTOR_GET(&applied->outbound_auths, i))) {
742                         return 0;
743                 }
744         }
745
746         return 1;
747 }
748
749 static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param);
750
751 /*! \brief Helper function that allocates a pjsip publish client and configures it */
752 static int sip_outbound_publish_client_alloc(void *data)
753 {
754         struct ast_sip_outbound_publish_client *client = data;
755         RAII_VAR(struct ast_sip_outbound_publish *, publish, NULL, ao2_cleanup);
756         pjsip_publishc_opt opt = {
757                 .queue_request = PJ_FALSE,
758         };
759         pj_str_t event, server_uri, to_uri, from_uri;
760         pj_status_t status;
761
762         if (client->client) {
763                 return 0;
764         } else if (pjsip_publishc_create(ast_sip_get_pjsip_endpoint(), &opt, ao2_bump(client), sip_outbound_publish_callback,
765                 &client->client) != PJ_SUCCESS) {
766                 ao2_ref(client, -1);
767                 return -1;
768         }
769
770         publish = ao2_bump(client->publish);
771
772         if (!ast_strlen_zero(publish->outbound_proxy)) {
773                 pjsip_route_hdr route_set, *route;
774                 static const pj_str_t ROUTE_HNAME = { "Route", 5 };
775
776                 pj_list_init(&route_set);
777
778                 if (!(route = pjsip_parse_hdr(pjsip_publishc_get_pool(client->client), &ROUTE_HNAME,
779                         (char*)publish->outbound_proxy, strlen(publish->outbound_proxy), NULL))) {
780                         pjsip_publishc_destroy(client->client);
781                         return -1;
782                 }
783                 pj_list_insert_nodes_before(&route_set, route);
784
785                 pjsip_publishc_set_route_set(client->client, &route_set);
786         }
787
788         pj_cstr(&event, publish->event);
789         pj_cstr(&server_uri, publish->server_uri);
790         pj_cstr(&to_uri, S_OR(publish->to_uri, publish->server_uri));
791         pj_cstr(&from_uri, S_OR(publish->from_uri, publish->server_uri));
792
793         status = pjsip_publishc_init(client->client, &event, &server_uri, &from_uri, &to_uri,
794                 publish->expiration);
795         if (status == PJSIP_EINVALIDURI) {
796                 pj_pool_t *pool;
797                 pj_str_t tmp;
798                 pjsip_uri *uri;
799
800                 pool = pjsip_endpt_create_pool(ast_sip_get_pjsip_endpoint(), "URI Validation", 256, 256);
801                 if (!pool) {
802                         ast_log(LOG_ERROR, "Could not create pool for URI validation on outbound publish '%s'\n",
803                                 ast_sorcery_object_get_id(publish));
804                         pjsip_publishc_destroy(client->client);
805                         return -1;
806                 }
807
808                 pj_strdup2_with_null(pool, &tmp, publish->server_uri);
809                 uri = pjsip_parse_uri(pool, tmp.ptr, tmp.slen, 0);
810                 if (!uri) {
811                         ast_log(LOG_ERROR, "Invalid server URI '%s' specified on outbound publish '%s'\n",
812                                 publish->server_uri, ast_sorcery_object_get_id(publish));
813                 }
814
815                 if (!ast_strlen_zero(publish->to_uri)) {
816                         pj_strdup2_with_null(pool, &tmp, publish->to_uri);
817                         uri = pjsip_parse_uri(pool, tmp.ptr, tmp.slen, 0);
818                         if (!uri) {
819                                 ast_log(LOG_ERROR, "Invalid to URI '%s' specified on outbound publish '%s'\n",
820                                         publish->to_uri, ast_sorcery_object_get_id(publish));
821                         }
822                 }
823
824                 if (!ast_strlen_zero(publish->from_uri)) {
825                         pj_strdup2_with_null(pool, &tmp, publish->from_uri);
826                         uri = pjsip_parse_uri(pool, tmp.ptr, tmp.slen, 0);
827                         if (!uri) {
828                                 ast_log(LOG_ERROR, "Invalid from URI '%s' specified on outbound publish '%s'\n",
829                                         publish->from_uri, ast_sorcery_object_get_id(publish));
830                         }
831                 }
832
833                 pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
834                 pjsip_publishc_destroy(client->client);
835                 return -1;
836         } else if (status != PJ_SUCCESS) {
837                 pjsip_publishc_destroy(client->client);
838                 return -1;
839         }
840
841         return 0;
842 }
843
844 /*! \brief Callback function for publish client responses */
845 static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param)
846 {
847         RAII_VAR(struct ast_sip_outbound_publish_client *, client, ao2_bump(param->token), ao2_cleanup);
848         RAII_VAR(struct ast_sip_outbound_publish *, publish, ao2_bump(client->publish), ao2_cleanup);
849         SCOPED_AO2LOCK(lock, client);
850         pjsip_tx_data *tdata;
851
852         if (client->destroy) {
853                 if (client->sending) {
854                         client->sending = NULL;
855
856                         if (!ast_sip_push_task(NULL, send_unpublish_task, ao2_bump(client))) {
857                                 return;
858                         }
859                         ast_log(LOG_WARNING, "Could not send unpublish message on outbound publish '%s'\n",
860                                 ast_sorcery_object_get_id(publish));
861                         ao2_ref(client, -1);
862                 }
863                 /* Once the destroy is called this callback will not get called any longer, so drop the client ref */
864                 pjsip_publishc_destroy(client->client);
865                 ao2_ref(client, -1);
866                 return;
867         }
868
869         if (param->code == 401 || param->code == 407) {
870                 pjsip_transaction *tsx = pjsip_rdata_get_tsx(param->rdata);
871
872                 if (!ast_sip_create_request_with_auth(&publish->outbound_auths,
873                                 param->rdata, tsx->last_tx, &tdata)) {
874                         pjsip_publishc_send(client->client, tdata);
875                 }
876                 client->auth_attempts++;
877
878                 if (client->auth_attempts == publish->max_auth_attempts) {
879                         pjsip_publishc_destroy(client->client);
880                         client->client = NULL;
881
882                         ast_log(LOG_ERROR, "Reached maximum number of PUBLISH authentication attempts on outbound publish '%s'\n",
883                                 ast_sorcery_object_get_id(publish));
884
885                         goto end;
886                 }
887                 return;
888         }
889
890         client->auth_attempts = 0;
891
892         if (param->code == 412) {
893                 pjsip_publishc_destroy(client->client);
894                 client->client = NULL;
895
896                 if (sip_outbound_publish_client_alloc(publish)) {
897                         ast_log(LOG_ERROR, "Failed to create a new outbound publish client for '%s' on 412 response\n",
898                                 ast_sorcery_object_get_id(publish));
899                         goto end;
900                 }
901
902                 /* Setting this to NULL will cause a new PUBLISH to get created and sent for the same underlying body */
903                 client->sending = NULL;
904         } else if (param->code == 423) {
905                 /* Update the expiration with the new expiration time if available */
906                 pjsip_expires_hdr *expires;
907
908                 expires = pjsip_msg_find_hdr(param->rdata->msg_info.msg, PJSIP_H_MIN_EXPIRES, NULL);
909                 if (!expires || !expires->ivalue) {
910                         ast_log(LOG_ERROR, "Received 423 response on outbound publish '%s' without a Min-Expires header\n",
911                                 ast_sorcery_object_get_id(publish));
912                         pjsip_publishc_destroy(client->client);
913                         client->client = NULL;
914                         goto end;
915                 }
916
917                 pjsip_publishc_update_expires(client->client, expires->ivalue);
918                 client->sending = NULL;
919         } else if (client->sending) {
920                 /* Remove the message currently being sent so that when the queue is serviced another will get sent */
921                 AST_LIST_REMOVE_HEAD(&client->queue, entry);
922                 ast_free(client->sending);
923                 client->sending = NULL;
924                 if (!param->rdata) {
925                         ast_log(LOG_NOTICE, "No response received for outbound publish '%s'\n",
926                                 ast_sorcery_object_get_id(publish));
927                 }
928         }
929
930         if (AST_LIST_EMPTY(&client->queue)) {
931                 schedule_publish_refresh(client, param->expiration);
932         }
933
934 end:
935         if (!client->client) {
936                 struct sip_outbound_publish_message *message;
937
938                 while ((message = AST_LIST_REMOVE_HEAD(&client->queue, entry))) {
939                         ast_free(message);
940                 }
941         } else {
942                 if (ast_sip_push_task(NULL, sip_publish_client_service_queue, ao2_bump(client))) {
943                         ao2_ref(client, -1);
944                 }
945         }
946 }
947
948 #define DATASTORE_BUCKETS 53
949
950 static int datastore_hash(const void *obj, int flags)
951 {
952         const struct ast_datastore *datastore;
953         const char *uid;
954
955         switch (flags & OBJ_SEARCH_MASK) {
956         case OBJ_SEARCH_KEY:
957                 uid = obj;
958                 break;
959         case OBJ_SEARCH_OBJECT:
960                 datastore = obj;
961                 uid = datastore->uid;
962                 break;
963         default:
964                 /* Hash can only work on something with a full key. */
965                 ast_assert(0);
966                 return 0;
967         }
968
969         return ast_str_hash(uid);
970 }
971
972 static int datastore_cmp(void *obj, void *arg, int flags)
973 {
974         const struct ast_datastore *object_left = obj;
975         const struct ast_datastore *object_right = arg;
976         const char *right_key = arg;
977         int cmp;
978
979         switch (flags & OBJ_SEARCH_MASK) {
980         case OBJ_SEARCH_OBJECT:
981                 right_key = object_right->uid;
982                 /* Fall through */
983         case OBJ_SEARCH_KEY:
984                 cmp = strcmp(object_left->uid, right_key);
985                 break;
986         case OBJ_SEARCH_PARTIAL_KEY:
987         cmp = strncmp(object_left->uid, right_key, strlen(right_key));
988                 break;
989         default:
990                 /*
991                  * What arg points to is specific to this traversal callback
992                  * and has no special meaning to astobj2.
993                  */
994                 cmp = 0;
995                 break;
996         }
997         if (cmp) {
998                 return 0;
999         }
1000         /*
1001          * At this point the traversal callback is identical to a sorted
1002          * container.
1003          */
1004         return CMP_MATCH;
1005 }
1006
1007 /*! \brief Allocator function for publish client */
1008 static struct ast_sip_outbound_publish_state *sip_outbound_publish_state_alloc(
1009         struct ast_sip_outbound_publish *publish)
1010 {
1011         const char *id = ast_sorcery_object_get_id(publish);
1012         struct ast_sip_outbound_publish_state *state =
1013                 ao2_alloc(sizeof(*state) + strlen(id) + 1, sip_outbound_publish_state_destroy);
1014
1015         if (!state) {
1016                 return NULL;
1017         }
1018
1019         state->client = ao2_alloc(sizeof(*state->client), sip_outbound_publish_client_destroy);
1020         if (!state->client) {
1021                 ao2_ref(state, -1);
1022                 return NULL;
1023         }
1024
1025         state->client->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp);
1026         if (!state->client->datastores) {
1027                 ao2_ref(state, -1);
1028                 return NULL;
1029         }
1030
1031         state->client->timer.user_data = state->client;
1032         state->client->timer.cb = sip_outbound_publish_timer_cb;
1033         state->client->publish = ao2_bump(publish);
1034
1035         strcpy(state->id, id);
1036         return state;
1037 }
1038
1039 /*! \brief Apply function which finds or allocates a state structure */
1040 static int sip_outbound_publish_apply(const struct ast_sorcery *sorcery, void *obj)
1041 {
1042         RAII_VAR(struct ao2_container *, states, ao2_global_obj_ref(current_states), ao2_cleanup);
1043         RAII_VAR(struct ast_sip_outbound_publish_state *, state, NULL, ao2_cleanup);
1044         struct ast_sip_outbound_publish *applied = obj;
1045
1046         if (ast_strlen_zero(applied->server_uri)) {
1047                 ast_log(LOG_ERROR, "No server URI specified on outbound publish '%s'\n",
1048                         ast_sorcery_object_get_id(applied));
1049                 return -1;
1050         } else if (ast_strlen_zero(applied->event)) {
1051                 ast_log(LOG_ERROR, "No event type specified for outbound publish '%s'\n",
1052                         ast_sorcery_object_get_id(applied));
1053                 return -1;
1054         }
1055
1056         if (!new_states) {
1057                 /* make sure new_states has been allocated as we will be adding to it */
1058                 new_states = ao2_container_alloc_options(
1059                         AO2_ALLOC_OPT_LOCK_NOLOCK, DEFAULT_STATE_BUCKETS,
1060                         outbound_publish_state_hash, outbound_publish_state_cmp);
1061
1062                 if (!new_states) {
1063                         ast_log(LOG_ERROR, "Unable to allocate new states container\n");
1064                         return -1;
1065                 }
1066         }
1067
1068         if (states) {
1069                 state = ao2_find(states, ast_sorcery_object_get_id(obj), OBJ_SEARCH_KEY);
1070                 if (state) {
1071                         if (can_reuse_publish(state->client->publish, applied)) {
1072                                 ao2_replace(state->client->publish, applied);
1073                         } else {
1074                                 ao2_ref(state, -1);
1075                                 state = NULL;
1076                         }
1077                 }
1078         }
1079
1080         if (!state) {
1081                 state = sip_outbound_publish_state_alloc(applied);
1082                 if (!state) {
1083                         ast_log(LOG_ERROR, "Unable to create state for outbound publish '%s'\n",
1084                                 ast_sorcery_object_get_id(applied));
1085                         return -1;
1086                 };
1087         }
1088
1089         if (ast_sip_push_task_synchronous(NULL, sip_outbound_publish_client_alloc, state->client)) {
1090                 ast_log(LOG_ERROR, "Unable to create client for outbound publish '%s'\n",
1091                         ast_sorcery_object_get_id(applied));
1092                 return -1;
1093         }
1094
1095         ao2_link(new_states, state);
1096         return 0;
1097 }
1098
1099 static int outbound_auth_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
1100 {
1101         struct ast_sip_outbound_publish *publish = obj;
1102
1103         return ast_sip_auth_vector_init(&publish->outbound_auths, var->value);
1104 }
1105
1106 static int load_module(void)
1107 {
1108         CHECK_PJSIP_MODULE_LOADED();
1109
1110         ast_sorcery_apply_config(ast_sip_get_sorcery(), "res_pjsip_outbound_publish");
1111         ast_sorcery_apply_default(ast_sip_get_sorcery(), "outbound-publish", "config", "pjsip.conf,criteria=type=outbound-publish");
1112
1113         if (ast_sorcery_object_register(ast_sip_get_sorcery(), "outbound-publish", sip_outbound_publish_alloc, NULL,
1114                 sip_outbound_publish_apply)) {
1115                 return AST_MODULE_LOAD_DECLINE;
1116         }
1117
1118         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "type", "", OPT_NOOP_T, 0, 0);
1119         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "server_uri", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_outbound_publish, server_uri));
1120         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "from_uri", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_outbound_publish, from_uri));
1121         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "event", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_outbound_publish, event));
1122         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "to_uri", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_outbound_publish, to_uri));
1123         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "outbound_proxy", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_outbound_publish, outbound_proxy));
1124         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "expiration", "3600", OPT_UINT_T, 0, FLDSET(struct ast_sip_outbound_publish, expiration));
1125         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "max_auth_attempts", "5", OPT_UINT_T, 0, FLDSET(struct ast_sip_outbound_publish, max_auth_attempts));
1126         ast_sorcery_object_field_register_custom(ast_sip_get_sorcery(), "outbound-publish", "outbound_auth", "", outbound_auth_handler, NULL, NULL, 0, 0);
1127
1128         ast_sorcery_reload_object(ast_sip_get_sorcery(), "outbound-publish");
1129
1130         AST_RWLIST_RDLOCK(&publisher_handlers);
1131         sip_outbound_publish_synchronize(NULL);
1132         AST_RWLIST_UNLOCK(&publisher_handlers);
1133
1134         pjsip_publishc_init_module(ast_sip_get_pjsip_endpoint());
1135
1136         return AST_MODULE_LOAD_SUCCESS;
1137 }
1138
1139 static int reload_module(void)
1140 {
1141         ast_sorcery_reload_object(ast_sip_get_sorcery(), "outbound-publish");
1142
1143         AST_RWLIST_RDLOCK(&publisher_handlers);
1144         sip_outbound_publish_synchronize(NULL);
1145         AST_RWLIST_UNLOCK(&publisher_handlers);
1146         return 0;
1147 }
1148
1149 static int unload_module(void)
1150 {
1151         struct timeval start = ast_tvnow();
1152         struct timespec end = {
1153                 .tv_sec = start.tv_sec + 10,
1154                 .tv_nsec = start.tv_usec * 1000
1155         };
1156         int res = 0;
1157         struct ao2_container *states = ao2_global_obj_ref(current_states);
1158
1159         if (!states || !(unloading.count = ao2_container_count(states))) {
1160                 return 0;
1161         }
1162         ao2_ref(states, -1);
1163
1164         ast_mutex_init(&unloading.lock);
1165         ast_cond_init(&unloading.cond, NULL);
1166         ast_mutex_lock(&unloading.lock);
1167
1168         unloading.is_unloading = 1;
1169         ao2_global_obj_release(current_states);
1170
1171         /* wait for items to unpublish */
1172         ast_verb(5, "Waiting to complete unpublishing task(s)\n");
1173         while (unloading.count) {
1174                 res = ast_cond_timedwait(&unloading.cond, &unloading.lock, &end);
1175         }
1176         ast_mutex_unlock(&unloading.lock);
1177
1178         ast_mutex_destroy(&unloading.lock);
1179         ast_cond_destroy(&unloading.cond);
1180
1181         if (res) {
1182                 ast_verb(5, "At least %d items were unable to unpublish "
1183                         "in the allowed time\n", unloading.count);
1184         } else {
1185                 ast_verb(5, "All items successfully unpublished\n");
1186         }
1187
1188         return res;
1189 }
1190
1191 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS | AST_MODFLAG_LOAD_ORDER, "PJSIP Outbound Publish Support",
1192         .load = load_module,
1193         .reload = reload_module,
1194         .unload = unload_module,
1195         .load_pri = AST_MODPRI_CHANNEL_DEPEND,
1196 );