res_pjsip_outbound_publish: Add multi-user support per configuration
[asterisk/asterisk.git] / res / res_pjsip_outbound_publish.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2014, Digium, Inc.
5  *
6  * Joshua Colp <jcolp@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*** MODULEINFO
20         <depend>pjproject</depend>
21         <depend>res_pjsip</depend>
22         <support_level>core</support_level>
23  ***/
24
25 #include "asterisk.h"
26
27 #include <pjsip.h>
28 #include <pjsip_simple.h>
29
30 #include "asterisk/res_pjproject.h"
31 #include "asterisk/res_pjsip.h"
32 #include "asterisk/res_pjsip_outbound_publish.h"
33 #include "asterisk/module.h"
34 #include "asterisk/taskprocessor.h"
35 #include "asterisk/datastore.h"
36
37 /*** DOCUMENTATION
38         <configInfo name="res_pjsip_outbound_publish" language="en_US">
39                 <synopsis>SIP resource for outbound publish</synopsis>
40                 <description><para>
41                         <emphasis>Outbound Publish</emphasis>
42                         </para>
43                         <para>This module allows <literal>res_pjsip</literal> to publish to other SIP servers.</para>
44                 </description>
45                 <configFile name="pjsip.conf">
46                         <configObject name="outbound-publish">
47                                 <synopsis>The configuration for outbound publish</synopsis>
48                                 <description><para>
49                                         Publish is <emphasis>COMPLETELY</emphasis> separate from the rest of
50                                         <literal>pjsip.conf</literal>. A minimal configuration consists of
51                                         setting a <literal>server_uri</literal> and <literal>event</literal>.
52                                 </para></description>
53                                 <configOption name="expiration" default="3600">
54                                         <synopsis>Expiration time for publications in seconds</synopsis>
55                                 </configOption>
56                                 <configOption name="outbound_auth" default="">
57                                         <synopsis>Authentication object to be used for outbound publishes.</synopsis>
58                                 </configOption>
59                                 <configOption name="outbound_proxy" default="">
60                                         <synopsis>SIP URI of the outbound proxy used to send publishes</synopsis>
61                                 </configOption>
62                                 <configOption name="server_uri">
63                                         <synopsis>SIP URI of the server and entity to publish to</synopsis>
64                                         <description><para>
65                                                 This is the URI at which to find the entity and server to send the outbound PUBLISH to.
66                                                 This URI is used as the request URI of the outbound PUBLISH request from Asterisk.
67                                         </para></description>
68                                 </configOption>
69                                 <configOption name="from_uri">
70                                         <synopsis>SIP URI to use in the From header</synopsis>
71                                         <description><para>
72                                                 This is the URI that will be placed into the From header of outgoing PUBLISH
73                                                 messages. If no URI is specified then the URI provided in <literal>server_uri</literal>
74                                                 will be used.
75                                         </para></description>
76                                 </configOption>
77                                 <configOption name="to_uri">
78                                         <synopsis>SIP URI to use in the To header</synopsis>
79                                         <description><para>
80                                                 This is the URI that will be placed into the To header of outgoing PUBLISH
81                                                 messages. If no URI is specified then the URI provided in <literal>server_uri</literal>
82                                                 will be used.
83                                         </para></description>
84                                 </configOption>
85                                 <configOption name="event" default="">
86                                         <synopsis>Event type of the PUBLISH.</synopsis>
87                                 </configOption>
88                                 <configOption name="max_auth_attempts" default="5">
89                                         <synopsis>Maximum number of authentication attempts before stopping the publication.</synopsis>
90                                 </configOption>
91                                 <configOption name="transport">
92                                         <synopsis>Transport used for outbound publish</synopsis>
93                                         <description>
94                                                 <note><para>A <replaceable>transport</replaceable> configured in
95                                                 <literal>pjsip.conf</literal>. As with other <literal>res_pjsip</literal> modules, this will use the first available transport of the appropriate type if unconfigured.</para></note>
96                                         </description>
97                                 </configOption>
98                                 <configOption name="multi_user" default="no">
99                                         <synopsis>Enable multi-user support</synopsis>
100                                         <description><para>When enabled the user portion of the server uri is replaced by a dynamically created user</para></description>
101                                 </configOption>
102                                 <configOption name="type">
103                                         <synopsis>Must be of type 'outbound-publish'.</synopsis>
104                                 </configOption>
105                         </configObject>
106                 </configFile>
107         </configInfo>
108  ***/
109
110 static int pjsip_max_url_size = PJSIP_MAX_URL_SIZE;
111
112 /*! \brief Queued outbound publish message */
113 struct sip_outbound_publish_message {
114         /*! \brief Optional body */
115         struct ast_sip_body body;
116         /*! \brief Linked list information */
117         AST_LIST_ENTRY(sip_outbound_publish_message) entry;
118         /*! \brief Extra space for body contents */
119         char body_contents[0];
120 };
121
122 /*
123  * A note about some of the object types used in this module:
124  *
125  * The reason we currently have 4 separate object types that relate to configuration,
126  * publishing, state, and client information is due to object lifetimes and order of
127  * destruction dependencies.
128  *
129  * Separation of concerns is a good thing and of course it makes sense to have a
130  * configuration object type as well as an object type wrapper around pjsip's publishing
131  * client class. There also may be run time state data that needs to be tracked, so
132  * again having something to handle that is prudent. However, it may be tempting to think
133  * "why not combine the state and client object types?" Especially seeing as how they have
134  * a one-to-one relationship. The answer is, it's possible, but it'd make the code a bit
135  * more awkward.
136  *
137  * Currently this module maintains a global container of current state objects. When this
138  * states container is replaced, or deleted, it un-references all contained objects. Any
139  * state with a reference left have probably been carried over from a reload/realtime fetch.
140  * States not carried over are destructed and the associated client (and all its publishers)
141  * get unpublished.
142  *
143  * This "unpublishing" goes through a careful process of unpublishing the client, all its
144  * publishers, and making sure all the appropriate references are removed in a sane order.
145  * This process is essentially kicked off with the destruction of the state. If the state
146  * and client objects were to be merged, where clients became the globally tracked object
147  * type, this "unpublishing" process would never start because of the multiple references
148  * held to the client object over it's lifetime. Meaning the global tracking container
149  * would remove its reference to the client object when done with it, but other sources
150  * would still be holding a reference to it (namely the datastore and publisher(s)).
151  *
152  * Thus at this time it is easier to keep them separate.
153  */
154
155 /*! \brief Outbound publish information */
156 struct ast_sip_outbound_publish {
157         /*! \brief Sorcery object details */
158         SORCERY_OBJECT(details);
159         /*! \brief Stringfields */
160         AST_DECLARE_STRING_FIELDS(
161                 /*! \brief URI for the entity and server */
162                 AST_STRING_FIELD(server_uri);
163                 /*! \brief URI for the From header */
164                 AST_STRING_FIELD(from_uri);
165                 /*! \brief URI for the To header */
166                 AST_STRING_FIELD(to_uri);
167                 /*! \brief Explicit transport to use for publish */
168                 AST_STRING_FIELD(transport);
169                 /*! \brief Outbound proxy to use */
170                 AST_STRING_FIELD(outbound_proxy);
171                 /*! \brief The event type to publish */
172                 AST_STRING_FIELD(event);
173         );
174         /*! \brief Requested expiration time */
175         unsigned int expiration;
176         /*! \brief Maximum number of auth attempts before stopping the publish client */
177         unsigned int max_auth_attempts;
178         /*! \brief Configured authentication credentials */
179         struct ast_sip_auth_vector outbound_auths;
180         /*! \brief The publishing client is used for multiple users when true */
181         unsigned int multi_user;
182 };
183
184 struct sip_outbound_publisher {
185         /*! \brief The client object that 'owns' this client
186
187              \note any potential circular reference problems are accounted
188              for (see publisher alloc for more information)
189         */
190         struct ast_sip_outbound_publish_client *owner;
191         /*! \brief Underlying publish client */
192         pjsip_publishc *client;
193         /*! \brief Timer entry for refreshing publish */
194         pj_timer_entry timer;
195         /*! \brief The number of auth attempts done */
196         unsigned int auth_attempts;
197         /*! \brief Queue of outgoing publish messages to send*/
198         AST_LIST_HEAD_NOLOCK(, sip_outbound_publish_message) queue;
199         /*! \brief The message currently being sent */
200         struct sip_outbound_publish_message *sending;
201         /*! \brief Publish client should be destroyed */
202         unsigned int destroy;
203         /*! \brief User, if any, associated with the publisher */
204         char user[0];
205 };
206
207 /*! \brief Outbound publish client state information (persists for lifetime of a publish) */
208 struct ast_sip_outbound_publish_client {
209         /*! \brief Outbound publish information */
210         struct ast_sip_outbound_publish *publish;
211         /*! \brief Publisher datastores set up by handlers */
212         struct ao2_container *datastores;
213         /*! \brief Container of all the client publishing objects */
214         struct ao2_container *publishers;
215         /*! \brief Publishing has been fully started and event type informed */
216         unsigned int started;
217 };
218
219 /*! \brief Outbound publish state information (persists for lifetime of a publish) */
220 struct ast_sip_outbound_publish_state {
221         /*! \brief Outbound publish client */
222         struct ast_sip_outbound_publish_client *client;
223         /* publish state id lookup key - same as publish configuration id */
224         char id[0];
225 };
226
227 /*!
228  * \brief Used for locking while loading/reloading
229  *
230  * Mutli-user configurations make it so publishers can be dynamically added and
231  * removed. Publishers should not be added or removed during a [re]load since
232  * it could cause the current_clients container to be out of sync. Thus the
233  * reason for this lock.
234  */
235 AST_RWLOCK_DEFINE_STATIC(load_lock);
236
237 #define DEFAULT_PUBLISHER_BUCKETS 119
238 AO2_STRING_FIELD_HASH_FN(sip_outbound_publisher, user);
239 AO2_STRING_FIELD_CMP_FN(sip_outbound_publisher, user);
240
241 /*! \brief Unloading data */
242 struct unloading_data {
243         int is_unloading;
244         int count;
245         ast_mutex_t lock;
246         ast_cond_t cond;
247 } unloading;
248
249 /*! \brief Default number of client state container buckets */
250 #define DEFAULT_STATE_BUCKETS 31
251 static AO2_GLOBAL_OBJ_STATIC(current_states);
252 /*! \brief Used on [re]loads to hold new state data */
253 static struct ao2_container *new_states;
254
255 /*! \brief hashing function for state objects */
256 static int outbound_publish_state_hash(const void *obj, const int flags)
257 {
258         const struct ast_sip_outbound_publish_state *object;
259         const char *key;
260
261         switch (flags & OBJ_SEARCH_MASK) {
262         case OBJ_SEARCH_KEY:
263                 key = obj;
264                 break;
265         case OBJ_SEARCH_OBJECT:
266                 object = obj;
267                 key = object->id;
268                 break;
269         default:
270                 ast_assert(0);
271                 return 0;
272         }
273         return ast_str_hash(key);
274 }
275
276 /*! \brief comparator function for client objects */
277 static int outbound_publish_state_cmp(void *obj, void *arg, int flags)
278 {
279         const struct ast_sip_outbound_publish_state *object_left = obj;
280         const struct ast_sip_outbound_publish_state *object_right = arg;
281         const char *right_key = arg;
282         int cmp;
283
284         switch (flags & OBJ_SEARCH_MASK) {
285         case OBJ_SEARCH_OBJECT:
286                 right_key = object_right->id;
287                 /* Fall through */
288         case OBJ_SEARCH_KEY:
289                 cmp = strcmp(object_left->id, right_key);
290                 break;
291         case OBJ_SEARCH_PARTIAL_KEY:
292                 /* Not supported by container. */
293                 ast_assert(0);
294                 return 0;
295         default:
296                 cmp = 0;
297                 break;
298         }
299         if (cmp) {
300                 return 0;
301         }
302         return CMP_MATCH;
303 }
304
305 static struct ao2_container *get_publishes_and_update_state(void)
306 {
307         struct ao2_container *container;
308         SCOPED_WRLOCK(lock, &load_lock);
309
310         container = ast_sorcery_retrieve_by_fields(
311                 ast_sip_get_sorcery(), "outbound-publish",
312                 AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL);
313
314         if (!new_states) {
315                 return container;
316         }
317
318         ao2_global_obj_replace_unref(current_states, new_states);
319         ao2_cleanup(new_states);
320         new_states = NULL;
321
322         return container;
323 }
324
325 AST_RWLIST_HEAD_STATIC(publisher_handlers, ast_sip_event_publisher_handler);
326
327 static void sub_add_handler(struct ast_sip_event_publisher_handler *handler)
328 {
329         AST_RWLIST_INSERT_TAIL(&publisher_handlers, handler, next);
330         ast_module_ref(ast_module_info->self);
331 }
332
333 static struct ast_sip_event_publisher_handler *find_publisher_handler_for_event_name(const char *event_name)
334 {
335         struct ast_sip_event_publisher_handler *iter;
336
337         AST_RWLIST_TRAVERSE(&publisher_handlers, iter, next) {
338                 if (!strcmp(iter->event_name, event_name)) {
339                         break;
340                 }
341         }
342         return iter;
343 }
344
345 /*! \brief Helper function which cancels the refresh timer on a publisher */
346 static void cancel_publish_refresh(struct sip_outbound_publisher *publisher)
347 {
348         if (pj_timer_heap_cancel(pjsip_endpt_get_timer_heap(ast_sip_get_pjsip_endpoint()), &publisher->timer)) {
349                 /* The timer was successfully cancelled, drop the refcount of the publisher */
350                 ao2_ref(publisher, -1);
351         }
352 }
353
354 /*! \brief Helper function which sets up the timer to send publication */
355 static void schedule_publish_refresh(struct sip_outbound_publisher *publisher, int expiration)
356 {
357         struct ast_sip_outbound_publish *publish = ao2_bump(publisher->owner->publish);
358         pj_time_val delay = { .sec = 0, };
359
360         cancel_publish_refresh(publisher);
361
362         if (expiration > 0) {
363                 delay.sec = expiration - PJSIP_PUBLISHC_DELAY_BEFORE_REFRESH;
364         }
365         if (publish->expiration && ((delay.sec > publish->expiration) || !delay.sec)) {
366                 delay.sec = publish->expiration;
367         }
368         if (delay.sec < PJSIP_PUBLISHC_DELAY_BEFORE_REFRESH) {
369                 delay.sec = PJSIP_PUBLISHC_DELAY_BEFORE_REFRESH;
370         }
371
372         ao2_ref(publisher, +1);
373         if (pjsip_endpt_schedule_timer(ast_sip_get_pjsip_endpoint(), &publisher->timer, &delay) != PJ_SUCCESS) {
374                 ast_log(LOG_WARNING, "Failed to pass timed publish refresh to scheduler\n");
375                 ao2_ref(publisher, -1);
376         }
377         ao2_ref(publish, -1);
378 }
379
380 static int publisher_client_send(void *obj, void *arg, void *data, int flags);
381
382 /*! \brief Publish client timer callback function */
383 static void sip_outbound_publish_timer_cb(pj_timer_heap_t *timer_heap, struct pj_timer_entry *entry)
384 {
385         struct sip_outbound_publisher *publisher = entry->user_data;
386
387         ao2_lock(publisher);
388         if (AST_LIST_EMPTY(&publisher->queue)) {
389                 int res;
390                 /* If there are no outstanding messages send an empty PUBLISH message so our publication doesn't expire */
391                 publisher_client_send(publisher, NULL, &res, 0);
392         }
393         ao2_unlock(publisher);
394
395         ao2_ref(publisher, -1);
396 }
397
398 /*! \brief Task for cancelling a refresh timer */
399 static int cancel_refresh_timer_task(void *data)
400 {
401         struct sip_outbound_publisher *publisher = data;
402
403         cancel_publish_refresh(publisher);
404         ao2_ref(publisher, -1);
405
406         return 0;
407 }
408
409 static void set_transport(struct sip_outbound_publisher *publisher, pjsip_tx_data *tdata)
410 {
411         if (!ast_strlen_zero(publisher->owner->publish->transport)) {
412                 pjsip_tpselector selector = { .type = PJSIP_TPSELECTOR_NONE, };
413                 ast_sip_set_tpselector_from_transport_name(
414                         publisher->owner->publish->transport, &selector);
415                 pjsip_tx_data_set_transport(tdata, &selector);
416         }
417 }
418
419 /*! \brief Task for sending an unpublish */
420 static int send_unpublish_task(void *data)
421 {
422         struct sip_outbound_publisher *publisher = data;
423         pjsip_tx_data *tdata;
424
425         if (pjsip_publishc_unpublish(publisher->client, &tdata) == PJ_SUCCESS) {
426                 set_transport(publisher, tdata);
427                 pjsip_publishc_send(publisher->client, tdata);
428         }
429
430         ao2_ref(publisher, -1);
431
432         return 0;
433 }
434
435 static void stop_publishing(struct ast_sip_outbound_publish_client *client,
436                             struct ast_sip_event_publisher_handler *handler)
437 {
438         if (!handler) {
439                 handler = find_publisher_handler_for_event_name(client->publish->event);
440         }
441
442         if (handler) {
443                 handler->stop_publishing(client);
444         }
445 }
446
447 static int cancel_and_unpublish(void *obj, void *arg, int flags);
448
449 /*! \brief Helper function which starts or stops publish clients when applicable */
450 static void sip_outbound_publish_synchronize(struct ast_sip_event_publisher_handler *removed)
451 {
452         RAII_VAR(struct ao2_container *, publishes, get_publishes_and_update_state(), ao2_cleanup);
453         struct ao2_container *states;
454         struct ao2_iterator i;
455         struct ast_sip_outbound_publish_state *state;
456
457         if (!publishes) {
458                 return;
459         }
460
461         states = ao2_global_obj_ref(current_states);
462         if (!states) {
463                 return;
464         }
465
466         i = ao2_iterator_init(states, 0);
467         while ((state = ao2_iterator_next(&i))) {
468                 struct ast_sip_outbound_publish *publish = ao2_bump(state->client->publish);
469                 struct ast_sip_event_publisher_handler *handler = find_publisher_handler_for_event_name(publish->event);
470
471                 if (!state->client->started) {
472                         /* If the publisher client has not yet been started try to start it */
473                         if (!handler) {
474                                 ast_debug(2, "Could not find handler for event '%s' for outbound publish client '%s'\n",
475                                           publish->event, ast_sorcery_object_get_id(publish));
476                         } else if (handler->start_publishing(publish, state->client)) {
477                                 ast_log(LOG_ERROR, "Failed to start outbound publish with event '%s' for client '%s'\n",
478                                         publish->event, ast_sorcery_object_get_id(publish));
479                         } else {
480                                 state->client->started = 1;
481                         }
482                 } else if (state->client->started && !handler && removed && !strcmp(publish->event, removed->event_name)) {
483                         stop_publishing(state->client, removed);
484                         ao2_callback(state->client->publishers, OBJ_NODATA, cancel_and_unpublish, NULL);
485                         state->client->started = 0;
486                 }
487                 ao2_ref(publish, -1);
488                 ao2_ref(state, -1);
489         }
490         ao2_iterator_destroy(&i);
491         ao2_ref(states, -1);
492 }
493
494 static struct ast_sip_outbound_publish_state *sip_publish_state_get(const char *id)
495 {
496         struct ao2_container *states = ao2_global_obj_ref(current_states);
497         struct ast_sip_outbound_publish_state *res;
498
499         if (!states) {
500                 return NULL;
501         }
502
503         res = ao2_find(states, id, OBJ_SEARCH_KEY);
504         ao2_ref(states, -1);
505         return res;
506 }
507
508 struct ast_sip_outbound_publish_client *ast_sip_publish_client_get(const char *name)
509 {
510         struct ast_sip_outbound_publish_state *state = sip_publish_state_get(name);
511
512         if (!state) {
513                 return NULL;
514         }
515
516         ao2_ref(state->client, +1);
517         ao2_ref(state, -1);
518         return state->client;
519 }
520
521 const char *ast_sip_publish_client_get_from_uri(struct ast_sip_outbound_publish_client *client)
522 {
523         struct ast_sip_outbound_publish *publish = client->publish;
524
525         return S_OR(publish->from_uri, S_OR(publish->server_uri, ""));
526 }
527
528 const char *ast_sip_publish_client_get_to_uri(struct ast_sip_outbound_publish_client *client)
529 {
530         struct ast_sip_outbound_publish *publish = client->publish;
531
532         return S_OR(publish->to_uri, S_OR(publish->server_uri, ""));
533 }
534
535 int ast_sip_register_event_publisher_handler(struct ast_sip_event_publisher_handler *handler)
536 {
537         struct ast_sip_event_publisher_handler *existing;
538         SCOPED_LOCK(lock, &publisher_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
539
540         if (!handler->start_publishing || !handler->stop_publishing) {
541                 ast_log(LOG_ERROR, "Handler does not implement required callbacks. Cannot register\n");
542                 return -1;
543         } else if (ast_strlen_zero(handler->event_name)) {
544                 ast_log(LOG_ERROR, "No event package specified for event publisher handler. Cannot register\n");
545                 return -1;
546         }
547
548         existing = find_publisher_handler_for_event_name(handler->event_name);
549         if (existing) {
550                 ast_log(LOG_ERROR, "Unable to register event publisher handler for event %s. "
551                                 "A handler is already registered\n", handler->event_name);
552                 return -1;
553         }
554
555         sub_add_handler(handler);
556
557         sip_outbound_publish_synchronize(NULL);
558
559         return 0;
560 }
561
562 void ast_sip_unregister_event_publisher_handler(struct ast_sip_event_publisher_handler *handler)
563 {
564         struct ast_sip_event_publisher_handler *iter;
565         SCOPED_LOCK(lock, &publisher_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
566         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&publisher_handlers, iter, next) {
567                 if (handler == iter) {
568                         AST_RWLIST_REMOVE_CURRENT(next);
569                         ast_module_unref(ast_module_info->self);
570                         break;
571                 }
572         }
573         AST_RWLIST_TRAVERSE_SAFE_END;
574
575         sip_outbound_publish_synchronize(handler);
576 }
577
578 /*! \brief Destructor function for publish information */
579 static void sip_outbound_publish_destroy(void *obj)
580 {
581         struct ast_sip_outbound_publish *publish = obj;
582
583         ast_sip_auth_vector_destroy(&publish->outbound_auths);
584
585         ast_string_field_free_memory(publish);
586 }
587
588 /*! \brief Allocator function for publish information */
589 static void *sip_outbound_publish_alloc(const char *name)
590 {
591         struct ast_sip_outbound_publish *publish = ast_sorcery_generic_alloc(sizeof(*publish),
592                 sip_outbound_publish_destroy);
593
594         if (!publish || ast_string_field_init(publish, 256)) {
595                 ao2_cleanup(publish);
596                 return NULL;
597         }
598
599         return publish;
600 }
601
602 static void sip_outbound_publish_datastore_destroy(void *obj)
603 {
604         struct ast_datastore *datastore = obj;
605
606         /* Using the destroy function (if present) destroy the data */
607         if (datastore->info->destroy != NULL && datastore->data != NULL) {
608                 datastore->info->destroy(datastore->data);
609                 datastore->data = NULL;
610         }
611
612         ast_free((void *) datastore->uid);
613         datastore->uid = NULL;
614 }
615
616 struct ast_datastore *ast_sip_publish_client_alloc_datastore(const struct ast_datastore_info *info, const char *uid)
617 {
618         RAII_VAR(struct ast_datastore *, datastore, NULL, ao2_cleanup);
619         const char *uid_ptr = uid;
620         char uuid_buf[AST_UUID_STR_LEN];
621
622         if (!info) {
623                 return NULL;
624         }
625
626         datastore = ao2_alloc(sizeof(*datastore), sip_outbound_publish_datastore_destroy);
627         if (!datastore) {
628                 return NULL;
629         }
630
631         datastore->info = info;
632         if (ast_strlen_zero(uid)) {
633                 /* They didn't provide an ID so we'll provide one ourself */
634                 uid_ptr = ast_uuid_generate_str(uuid_buf, sizeof(uuid_buf));
635         }
636
637         datastore->uid = ast_strdup(uid_ptr);
638         if (!datastore->uid) {
639                 return NULL;
640         }
641
642         ao2_ref(datastore, +1);
643         return datastore;
644 }
645
646 int ast_sip_publish_client_add_datastore(struct ast_sip_outbound_publish_client *client,
647         struct ast_datastore *datastore)
648 {
649         ast_assert(datastore != NULL);
650         ast_assert(datastore->info != NULL);
651         ast_assert(!ast_strlen_zero(datastore->uid));
652
653         if (!ao2_link(client->datastores, datastore)) {
654                 return -1;
655         }
656         return 0;
657 }
658
659 struct ast_datastore *ast_sip_publish_client_get_datastore(struct ast_sip_outbound_publish_client *client,
660         const char *name)
661 {
662         return ao2_find(client->datastores, name, OBJ_SEARCH_KEY);
663 }
664
665 void ast_sip_publish_client_remove_datastore(struct ast_sip_outbound_publish_client *client,
666         const char *name)
667 {
668         ao2_find(client->datastores, name, OBJ_SEARCH_KEY | OBJ_UNLINK | OBJ_NODATA);
669 }
670
671 static int sip_publisher_service_queue(void *data)
672 {
673         RAII_VAR(struct sip_outbound_publisher *, publisher, data, ao2_cleanup);
674         SCOPED_AO2LOCK(lock, publisher);
675         struct sip_outbound_publish_message *message;
676         pjsip_tx_data *tdata;
677         pj_status_t status;
678
679         if (publisher->destroy || publisher->sending || !(message = AST_LIST_FIRST(&publisher->queue))) {
680                 return 0;
681         }
682
683         if (pjsip_publishc_publish(publisher->client, PJ_FALSE, &tdata) != PJ_SUCCESS) {
684                 goto fatal;
685         }
686
687         if (!ast_strlen_zero(message->body.type) && !ast_strlen_zero(message->body.subtype) &&
688                 ast_sip_add_body(tdata, &message->body)) {
689                 pjsip_tx_data_dec_ref(tdata);
690                 goto fatal;
691         }
692
693         set_transport(publisher, tdata);
694
695         status = pjsip_publishc_send(publisher->client, tdata);
696         if (status == PJ_EBUSY) {
697                 /* We attempted to send the message but something else got there first */
698                 goto service;
699         } else if (status != PJ_SUCCESS) {
700                 goto fatal;
701         }
702
703         publisher->sending = message;
704
705         return 0;
706
707 fatal:
708         AST_LIST_REMOVE_HEAD(&publisher->queue, entry);
709         ast_free(message);
710
711 service:
712         if (ast_sip_push_task(NULL, sip_publisher_service_queue, ao2_bump(publisher))) {
713                 ao2_ref(publisher, -1);
714         }
715         return -1;
716 }
717
718 static int publisher_client_send(void *obj, void *arg, void *data, int flags)
719 {
720         struct sip_outbound_publisher *publisher = obj;
721         const struct ast_sip_body *body = arg;
722         struct sip_outbound_publish_message *message;
723         size_t type_len = 0, subtype_len = 0, body_text_len = 0;
724         int *res = data;
725
726         *res = -1;
727         if (!publisher->client) {
728                 return -1;
729         }
730
731         /* If a body is present we need more space for the contents of it */
732         if (body) {
733                 type_len = strlen(body->type) + 1;
734                 subtype_len = strlen(body->subtype) + 1;
735                 body_text_len = strlen(body->body_text) + 1;
736         }
737
738         message = ast_calloc(1, sizeof(*message) + type_len + subtype_len + body_text_len);
739         if (!message) {
740                 return -1;
741         }
742
743         if (body) {
744                 char *dst = message->body_contents;
745
746                 message->body.type = strcpy(dst, body->type);
747                 dst += type_len;
748                 message->body.subtype = strcpy(dst, body->subtype);
749                 dst += subtype_len;
750                 message->body.body_text = strcpy(dst, body->body_text);
751         }
752
753         AST_LIST_INSERT_TAIL(&publisher->queue, message, entry);
754
755         *res = ast_sip_push_task(NULL, sip_publisher_service_queue, ao2_bump(publisher));
756         if (*res) {
757                 ao2_ref(publisher, -1);
758         }
759
760         return *res;
761 }
762
763 int ast_sip_publish_client_send(struct ast_sip_outbound_publish_client *client,
764         const struct ast_sip_body *body)
765 {
766         SCOPED_AO2LOCK(lock, client);
767         int res = 0;
768
769         ao2_callback_data(client->publishers, OBJ_NODATA,
770                           publisher_client_send, (void *)body, &res);
771         return res;
772 }
773
774 static int sip_outbound_publisher_set_uri(
775         pj_pool_t *pool, const char *uri, const char *user, pj_str_t *res_uri)
776 {
777         pj_str_t tmp;
778         pjsip_uri *parsed;
779         pjsip_sip_uri *parsed_uri;
780         int size;
781
782         pj_strdup2_with_null(pool, &tmp, uri);
783         if (!(parsed = pjsip_parse_uri(pool, tmp.ptr, tmp.slen, 0))) {
784                 return -1;
785         }
786
787         if (!(parsed_uri = pjsip_uri_get_uri(parsed))) {
788                 return -1;
789         }
790
791         if (!ast_strlen_zero(user)) {
792                 pj_strdup2(pool, &parsed_uri->user, user);
793         }
794
795         res_uri->ptr = (char*) pj_pool_alloc(pool, pjsip_max_url_size);
796         if (!res_uri->ptr) {
797                 return -1;
798         }
799
800         if ((size = pjsip_uri_print(PJSIP_URI_IN_OTHER, parsed_uri, res_uri->ptr,
801                                     pjsip_max_url_size - 1)) <= 0) {
802                 return -1;
803         }
804         res_uri->ptr[size] = '\0';
805         res_uri->slen = size;
806
807         return 0;
808 }
809
810 static int sip_outbound_publisher_set_uris(
811         pj_pool_t *pool, struct sip_outbound_publisher *publisher,
812         pj_str_t *server_uri, pj_str_t *to_uri, pj_str_t *from_uri)
813 {
814         struct ast_sip_outbound_publish *publish = publisher->owner->publish;
815
816         if (sip_outbound_publisher_set_uri(pool, publish->server_uri, publisher->user, server_uri)) {
817                 ast_log(LOG_ERROR, "Invalid server URI '%s' specified on outbound publish '%s'\n",
818                         publish->server_uri, ast_sorcery_object_get_id(publish));
819                 return -1;
820         }
821
822         if (ast_strlen_zero(publish->to_uri)) {
823                 to_uri->ptr = server_uri->ptr;
824                 to_uri->slen = server_uri->slen;
825         } else if (sip_outbound_publisher_set_uri(pool, publish->to_uri, publisher->user, to_uri)) {
826                 ast_log(LOG_ERROR, "Invalid to URI '%s' specified on outbound publish '%s'\n",
827                         publish->to_uri, ast_sorcery_object_get_id(publish));
828                 return -1;
829         }
830
831         if (ast_strlen_zero(publish->from_uri)) {
832                 from_uri->ptr = server_uri->ptr;
833                 from_uri->slen = server_uri->slen;
834         } else if (sip_outbound_publisher_set_uri(pool, publish->from_uri, publisher->user, from_uri)) {
835                 ast_log(LOG_ERROR, "Invalid from URI '%s' specified on outbound publish '%s'\n",
836                         publish->from_uri, ast_sorcery_object_get_id(publish));
837                 return -1;
838         }
839
840         return 0;
841 }
842
843 static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param);
844
845 /*! \brief Helper function that allocates a pjsip publish client and configures it */
846 static int sip_outbound_publisher_init(void *data)
847 {
848         struct sip_outbound_publisher *publisher = data;
849         RAII_VAR(struct ast_sip_outbound_publish *, publish, NULL, ao2_cleanup);
850         pjsip_publishc_opt opt = {
851                 .queue_request = PJ_FALSE,
852         };
853         pj_pool_t *pool;
854         pj_str_t event, server_uri, to_uri, from_uri;
855
856         if (publisher->client) {
857                 return 0;
858         }
859
860         if (pjsip_publishc_create(ast_sip_get_pjsip_endpoint(), &opt,
861                                   ao2_bump(publisher), sip_outbound_publish_callback,
862                 &publisher->client) != PJ_SUCCESS) {
863                 ao2_ref(publisher, -1);
864                 return -1;
865         }
866
867         publish = ao2_bump(publisher->owner->publish);
868
869         if (!ast_strlen_zero(publish->outbound_proxy)) {
870                 pjsip_route_hdr route_set, *route;
871                 static const pj_str_t ROUTE_HNAME = { "Route", 5 };
872
873                 pj_list_init(&route_set);
874
875                 if (!(route = pjsip_parse_hdr(pjsip_publishc_get_pool(publisher->client), &ROUTE_HNAME,
876                         (char*)publish->outbound_proxy, strlen(publish->outbound_proxy), NULL))) {
877                         pjsip_publishc_destroy(publisher->client);
878                         return -1;
879                 }
880                 pj_list_insert_nodes_before(&route_set, route);
881
882                 pjsip_publishc_set_route_set(publisher->client, &route_set);
883         }
884
885         pool = pjsip_endpt_create_pool(ast_sip_get_pjsip_endpoint(), "URI Validation",
886                                        pjsip_max_url_size, pjsip_max_url_size);
887         if (!pool) {
888                 ast_log(LOG_ERROR, "Could not create pool for URI validation on outbound publish '%s'\n",
889                         ast_sorcery_object_get_id(publish));
890                 pjsip_publishc_destroy(publisher->client);
891                 return -1;
892         }
893
894         if (sip_outbound_publisher_set_uris(pool, publisher, &server_uri, &from_uri, &to_uri)) {
895                 pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
896                 pjsip_publishc_destroy(publisher->client);
897                 return -1;
898         }
899
900         pj_cstr(&event, publish->event);
901         if (pjsip_publishc_init(publisher->client, &event, &server_uri, &from_uri, &to_uri,
902                                 publish->expiration != PJ_SUCCESS)) {
903                 ast_log(LOG_ERROR, "Failed to initialize publishing client on outbound publish '%s'\n",
904                         ast_sorcery_object_get_id(publish));
905                 pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
906                 pjsip_publishc_destroy(publisher->client);
907                 return -1;
908         }
909
910         pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
911         return 0;
912 }
913
914 static int sip_outbound_publisher_reinit(void *obj, void *arg, int flags)
915 {
916         return sip_outbound_publisher_init(obj);
917 }
918
919 static int sip_outbound_publisher_reinit_all(void *data)
920 {
921         ao2_callback(data, OBJ_NODATA, sip_outbound_publisher_reinit, NULL);
922         return 0;
923 }
924
925 /*! \brief Destructor function for publish client */
926 static void sip_outbound_publisher_destroy(void *obj)
927 {
928         struct sip_outbound_publisher *publisher = obj;
929         struct sip_outbound_publish_message *message;
930
931         /* You might be tempted to think "the publish client isn't being destroyed" but it actually is - just elsewhere */
932
933         while ((message = AST_LIST_REMOVE_HEAD(&publisher->queue, entry))) {
934                 ast_free(message);
935         }
936
937         ao2_cleanup(publisher->owner);
938
939         /* if unloading the module and all objects have been unpublished
940            send the signal to finish unloading */
941         if (unloading.is_unloading) {
942                 ast_mutex_lock(&unloading.lock);
943                 if (--unloading.count == 0) {
944                         ast_cond_signal(&unloading.cond);
945                 }
946                 ast_mutex_unlock(&unloading.lock);
947         }
948 }
949
950 static struct sip_outbound_publisher *sip_outbound_publisher_alloc(
951         struct ast_sip_outbound_publish_client *client, const char *user)
952 {
953         struct sip_outbound_publisher *publisher;
954
955         publisher = ao2_alloc(sizeof(*publisher) + (user ? strlen(user) : 0) + 1,
956                               sip_outbound_publisher_destroy);
957         if (!publisher) {
958                 return NULL;
959         }
960
961         /*
962          * Bump the ref to the client. This essentially creates a circular reference,
963          * but it is needed in order to make sure the client object doesn't get pulled
964          * out from under us when the publisher stops publishing.
965          *
966          * The circular reference is alleviated by calling cancel_and_unpublish for
967          * each client, from the state's destructor. By calling it there all references
968          * to the publishers should go to zero, thus calling the publisher's destructor.
969          * This in turn removes the client reference we added here. The state then removes
970          * its reference to the client, which should take it to zero.
971          */
972         publisher->owner = ao2_bump(client);
973         publisher->timer.user_data = publisher;
974         publisher->timer.cb = sip_outbound_publish_timer_cb;
975         if (user) {
976                 strcpy(publisher->user, user);
977         } else {
978                 *publisher->user = '\0';
979         }
980
981         if (ast_sip_push_task_synchronous(NULL, sip_outbound_publisher_init, publisher)) {
982                 ast_log(LOG_ERROR, "Unable to create publisher for outbound publish '%s'\n",
983                         ast_sorcery_object_get_id(client->publish));
984                 ao2_ref(publisher, -1);
985                 return NULL;
986         }
987
988         return publisher;
989 }
990
991 static struct sip_outbound_publisher *sip_outbound_publish_client_add_publisher(
992         struct ast_sip_outbound_publish_client *client, const char *user)
993 {
994         struct sip_outbound_publisher *publisher =
995                 sip_outbound_publisher_alloc(client, user);
996
997         if (!publisher) {
998                 return NULL;
999         }
1000
1001         if (!ao2_link(client->publishers, publisher)) {
1002                 /*
1003                  * No need to bump the reference here. The task will take care of
1004                  * removing the reference.
1005                  */
1006                 if (ast_sip_push_task(NULL, cancel_refresh_timer_task, publisher)) {
1007                         ao2_ref(publisher, -1);
1008                 }
1009                 return NULL;
1010         }
1011
1012         return publisher;
1013 }
1014
1015 int ast_sip_publish_client_user_send(struct ast_sip_outbound_publish_client *client,
1016                                      const char *user, const struct ast_sip_body *body)
1017 {
1018         struct sip_outbound_publisher *publisher;
1019         int res;
1020
1021         /*
1022          * Lock before searching since there could be a race between searching and adding.
1023          * Just use the load_lock since we might need to lock it anyway (if adding) and
1024          * also it simplifies the code (otherwise we'd have to lock the publishers, no-
1025          * lock the search and pass a flag to 'add publisher to no-lock the potential link).
1026          */
1027         ast_rwlock_wrlock(&load_lock);
1028         publisher = ao2_find(client->publishers, user, OBJ_SEARCH_KEY);
1029         if (!publisher) {
1030                 if (!(publisher = sip_outbound_publish_client_add_publisher(client, user))) {
1031                         ast_rwlock_unlock(&load_lock);
1032                         return -1;
1033                 }
1034         }
1035         ast_rwlock_unlock(&load_lock);
1036
1037         publisher_client_send(publisher, (void *)body, &res, 0);
1038         ao2_ref(publisher, -1);
1039         return res;
1040 }
1041
1042 void ast_sip_publish_client_remove(struct ast_sip_outbound_publish_client *client,
1043                                    const char *user)
1044 {
1045         SCOPED_WRLOCK(lock, &load_lock);
1046         ao2_find(client->publishers, user, OBJ_SEARCH_KEY | OBJ_UNLINK | OBJ_NODATA);
1047 }
1048
1049 static int explicit_publish_destroy(void *data)
1050 {
1051         struct sip_outbound_publisher *publisher = data;
1052
1053         /*
1054          * If there is no pjsip publishing client then we obviously don't need
1055          * to destroy it. Also, the ref for the Asterisk publishing client that
1056          * pjsip had would not exist or should already be gone as well.
1057          */
1058         if (publisher->client) {
1059                 pjsip_publishc_destroy(publisher->client);
1060                 ao2_ref(publisher, -1);
1061         }
1062
1063         return 0;
1064 }
1065
1066 /*! \brief Helper function which cancels and un-publishes a no longer used client */
1067 static int cancel_and_unpublish(void *obj, void *arg, int flags)
1068 {
1069         struct sip_outbound_publisher *publisher = obj;
1070         struct ast_sip_outbound_publish_client *client = publisher->owner;
1071
1072         SCOPED_AO2LOCK(lock, publisher);
1073
1074         if (!client->started) {
1075                 /* If the publisher was never started, there's nothing to unpublish, so just
1076                  * destroy the publication and remove its reference to the publisher.
1077                  */
1078                 ast_sip_push_task(NULL, explicit_publish_destroy, publisher);
1079                 return 0;
1080         }
1081
1082         if (ast_sip_push_task(NULL, cancel_refresh_timer_task, ao2_bump(publisher))) {
1083                 ast_log(LOG_WARNING, "Could not stop refresh timer on outbound publish '%s'\n",
1084                         ast_sorcery_object_get_id(client->publish));
1085                 ao2_ref(publisher, -1);
1086         }
1087
1088         /* If nothing is being sent right now send the unpublish - the destroy will happen in the subsequent callback */
1089         if (!publisher->sending) {
1090                 if (ast_sip_push_task(NULL, send_unpublish_task, ao2_bump(publisher))) {
1091                         ast_log(LOG_WARNING, "Could not send unpublish message on outbound publish '%s'\n",
1092                                 ast_sorcery_object_get_id(client->publish));
1093                         ao2_ref(publisher, -1);
1094                 }
1095         }
1096         publisher->destroy = 1;
1097         return 0;
1098 }
1099
1100 /*! \brief Destructor function for publish client */
1101 static void sip_outbound_publish_client_destroy(void *obj)
1102 {
1103         struct ast_sip_outbound_publish_client *client = obj;
1104
1105         ao2_cleanup(client->datastores);
1106
1107         /*
1108          * The client's publishers have already been unpublished and destroyed
1109          * by this point, so it is safe to finally remove the reference to the
1110          * publish object. The client needed to hold a reference to it until
1111          * the publishers were done with it.
1112          */
1113         ao2_cleanup(client->publish);
1114 }
1115
1116 /*! \brief Destructor function for publish state */
1117 static void sip_outbound_publish_state_destroy(void *obj)
1118 {
1119         struct ast_sip_outbound_publish_state *state = obj;
1120
1121         stop_publishing(state->client, NULL);
1122         /*
1123          * Since the state is being destroyed the associated client needs to also
1124          * be destroyed. However simply removing the reference to the client will
1125          * not initiate client destruction since the client's publisher(s) hold a
1126          * reference to the client object as well. So we need to unpublish the
1127          * the client's publishers here, which will remove the publisher's client
1128          * reference during that process.
1129          *
1130          * That being said we don't want to remove the client's reference to the
1131          * publish object just yet. We'll hold off on that until client destruction
1132          * itself. This is because the publishers need access to the client's
1133          * publish object while they are unpublishing.
1134          */
1135         ao2_callback(state->client->publishers, OBJ_NODATA | OBJ_UNLINK, cancel_and_unpublish, NULL);
1136         ao2_cleanup(state->client->publishers);
1137
1138         state->client->started = 0;
1139         ao2_cleanup(state->client);
1140 }
1141
1142 /*!
1143  * \internal
1144  * \brief Check if a publish can be reused
1145  *
1146  * This checks if the existing outbound publish's configuration differs from a newly-applied
1147  * outbound publish.
1148  *
1149  * \param existing The pre-existing outbound publish
1150  * \param applied The newly-created publish
1151  */
1152 static int can_reuse_publish(struct ast_sip_outbound_publish *existing, struct ast_sip_outbound_publish *applied)
1153 {
1154         int i;
1155
1156         if (strcmp(existing->server_uri, applied->server_uri) || strcmp(existing->from_uri, applied->from_uri) ||
1157                 strcmp(existing->to_uri, applied->to_uri) || strcmp(existing->outbound_proxy, applied->outbound_proxy) ||
1158                 strcmp(existing->event, applied->event) ||
1159                 AST_VECTOR_SIZE(&existing->outbound_auths) != AST_VECTOR_SIZE(&applied->outbound_auths)) {
1160                 return 0;
1161         }
1162
1163         for (i = 0; i < AST_VECTOR_SIZE(&existing->outbound_auths); ++i) {
1164                 if (strcmp(AST_VECTOR_GET(&existing->outbound_auths, i), AST_VECTOR_GET(&applied->outbound_auths, i))) {
1165                         return 0;
1166                 }
1167         }
1168
1169         return 1;
1170 }
1171
1172 /*! \brief Callback function for publish client responses */
1173 static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param)
1174 {
1175 #define DESTROY_CLIENT() do {                      \
1176         pjsip_publishc_destroy(publisher->client); \
1177         publisher->client = NULL; \
1178         ao2_ref(publisher, -1); } while (0)
1179
1180         RAII_VAR(struct sip_outbound_publisher *, publisher, ao2_bump(param->token), ao2_cleanup);
1181         RAII_VAR(struct ast_sip_outbound_publish *, publish, ao2_bump(publisher->owner->publish), ao2_cleanup);
1182         SCOPED_AO2LOCK(lock, publisher);
1183         pjsip_tx_data *tdata;
1184
1185         if (publisher->destroy) {
1186                 if (publisher->sending) {
1187                         publisher->sending = NULL;
1188
1189                         if (!ast_sip_push_task(NULL, send_unpublish_task, ao2_bump(publisher))) {
1190                                 return;
1191                         }
1192                         ast_log(LOG_WARNING, "Could not send unpublish message on outbound publish '%s'\n",
1193                                 ast_sorcery_object_get_id(publish));
1194                         ao2_ref(publisher, -1);
1195                 }
1196                 /* Once the destroy is called this callback will not get called any longer, so drop the publisher ref */
1197                 DESTROY_CLIENT();
1198                 return;
1199         }
1200
1201         if (param->code == 401 || param->code == 407) {
1202                 pjsip_transaction *tsx = pjsip_rdata_get_tsx(param->rdata);
1203
1204                 if (!ast_sip_create_request_with_auth(&publish->outbound_auths,
1205                                 param->rdata, tsx->last_tx, &tdata)) {
1206                         set_transport(publisher, tdata);
1207                         pjsip_publishc_send(publisher->client, tdata);
1208                 }
1209                 publisher->auth_attempts++;
1210
1211                 if (publisher->auth_attempts == publish->max_auth_attempts) {
1212                         DESTROY_CLIENT();
1213                         ast_log(LOG_ERROR, "Reached maximum number of PUBLISH authentication attempts on outbound publish '%s'\n",
1214                                 ast_sorcery_object_get_id(publish));
1215
1216                         goto end;
1217                 }
1218                 return;
1219         }
1220
1221         publisher->auth_attempts = 0;
1222
1223         if (param->code == 412) {
1224                 DESTROY_CLIENT();
1225                 if (sip_outbound_publisher_init(publisher)) {
1226                         ast_log(LOG_ERROR, "Failed to create a new outbound publish client for '%s' on 412 response\n",
1227                                 ast_sorcery_object_get_id(publish));
1228                         goto end;
1229                 }
1230
1231                 /* Setting this to NULL will cause a new PUBLISH to get created and sent for the same underlying body */
1232                 publisher->sending = NULL;
1233         } else if (param->code == 423) {
1234                 /* Update the expiration with the new expiration time if available */
1235                 pjsip_expires_hdr *expires;
1236
1237                 expires = pjsip_msg_find_hdr(param->rdata->msg_info.msg, PJSIP_H_MIN_EXPIRES, NULL);
1238                 if (!expires || !expires->ivalue) {
1239                         DESTROY_CLIENT();
1240                         ast_log(LOG_ERROR, "Received 423 response on outbound publish '%s' without a Min-Expires header\n",
1241                                 ast_sorcery_object_get_id(publish));
1242                         goto end;
1243                 }
1244
1245                 pjsip_publishc_update_expires(publisher->client, expires->ivalue);
1246                 publisher->sending = NULL;
1247         } else if (publisher->sending) {
1248                 /* Remove the message currently being sent so that when the queue is serviced another will get sent */
1249                 AST_LIST_REMOVE_HEAD(&publisher->queue, entry);
1250                 ast_free(publisher->sending);
1251                 publisher->sending = NULL;
1252                 if (!param->rdata) {
1253                         ast_log(LOG_NOTICE, "No response received for outbound publish '%s'\n",
1254                                 ast_sorcery_object_get_id(publish));
1255                 }
1256         }
1257
1258         if (AST_LIST_EMPTY(&publisher->queue)) {
1259                 schedule_publish_refresh(publisher, param->expiration);
1260         }
1261
1262 end:
1263         if (!publisher->client) {
1264                 struct sip_outbound_publish_message *message;
1265
1266                 while ((message = AST_LIST_REMOVE_HEAD(&publisher->queue, entry))) {
1267                         ast_free(message);
1268                 }
1269         } else {
1270                 if (ast_sip_push_task(NULL, sip_publisher_service_queue, ao2_bump(publisher))) {
1271                         ao2_ref(publisher, -1);
1272                 }
1273         }
1274 }
1275
1276 #define DATASTORE_BUCKETS 53
1277
1278 static int datastore_hash(const void *obj, int flags)
1279 {
1280         const struct ast_datastore *datastore;
1281         const char *uid;
1282
1283         switch (flags & OBJ_SEARCH_MASK) {
1284         case OBJ_SEARCH_KEY:
1285                 uid = obj;
1286                 break;
1287         case OBJ_SEARCH_OBJECT:
1288                 datastore = obj;
1289                 uid = datastore->uid;
1290                 break;
1291         default:
1292                 /* Hash can only work on something with a full key. */
1293                 ast_assert(0);
1294                 return 0;
1295         }
1296
1297         return ast_str_hash(uid);
1298 }
1299
1300 static int datastore_cmp(void *obj, void *arg, int flags)
1301 {
1302         const struct ast_datastore *object_left = obj;
1303         const struct ast_datastore *object_right = arg;
1304         const char *right_key = arg;
1305         int cmp;
1306
1307         switch (flags & OBJ_SEARCH_MASK) {
1308         case OBJ_SEARCH_OBJECT:
1309                 right_key = object_right->uid;
1310                 /* Fall through */
1311         case OBJ_SEARCH_KEY:
1312                 cmp = strcmp(object_left->uid, right_key);
1313                 break;
1314         case OBJ_SEARCH_PARTIAL_KEY:
1315         cmp = strncmp(object_left->uid, right_key, strlen(right_key));
1316                 break;
1317         default:
1318                 /*
1319                  * What arg points to is specific to this traversal callback
1320                  * and has no special meaning to astobj2.
1321                  */
1322                 cmp = 0;
1323                 break;
1324         }
1325         if (cmp) {
1326                 return 0;
1327         }
1328         /*
1329          * At this point the traversal callback is identical to a sorted
1330          * container.
1331          */
1332         return CMP_MATCH;
1333 }
1334
1335 /*! \brief Allocator function for publish client */
1336 static struct ast_sip_outbound_publish_state *sip_outbound_publish_state_alloc(
1337         struct ast_sip_outbound_publish *publish)
1338 {
1339         const char *id = ast_sorcery_object_get_id(publish);
1340         struct ast_sip_outbound_publish_state *state =
1341                 ao2_alloc(sizeof(*state) + strlen(id) + 1, sip_outbound_publish_state_destroy);
1342
1343         if (!state) {
1344                 return NULL;
1345         }
1346
1347         state->client = ao2_alloc(sizeof(*state->client), sip_outbound_publish_client_destroy);
1348         if (!state->client) {
1349                 ao2_ref(state, -1);
1350                 return NULL;
1351         }
1352
1353         state->client->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp);
1354         if (!state->client->datastores) {
1355                 ao2_ref(state, -1);
1356                 return NULL;
1357         }
1358
1359         state->client->publishers = ao2_container_alloc(DATASTORE_BUCKETS, sip_outbound_publisher_hash_fn,
1360                                                         sip_outbound_publisher_cmp_fn);
1361         if (!state->client->publishers) {
1362                 ao2_ref(state, -1);
1363                 return NULL;
1364         }
1365         state->client->publish = ao2_bump(publish);
1366
1367         strcpy(state->id, id);
1368         return state;
1369 }
1370
1371 static int validate_publish_config(struct ast_sip_outbound_publish *publish)
1372 {
1373         if (ast_strlen_zero(publish->server_uri)) {
1374                 ast_log(LOG_ERROR, "No server URI specified on outbound publish '%s'\n",
1375                         ast_sorcery_object_get_id(publish));
1376                 return -1;
1377         } else if (ast_strlen_zero(publish->event)) {
1378                 ast_log(LOG_ERROR, "No event type specified for outbound publish '%s'\n",
1379                         ast_sorcery_object_get_id(publish));
1380                 return -1;
1381         }
1382         return 0;
1383 }
1384
1385 static int current_state_reusable(struct ast_sip_outbound_publish *publish,
1386                                   struct ast_sip_outbound_publish_state *current_state)
1387 {
1388         struct ast_sip_outbound_publish *old_publish;
1389
1390         /*
1391          * Don't maintain the old state/client objects if the multi_user option changed.
1392          */
1393         if ((!publish->multi_user && current_state->client->publish->multi_user) ||
1394             (publish->multi_user && !current_state->client->publish->multi_user)) {
1395                 return 0;
1396         }
1397
1398
1399         if (!can_reuse_publish(current_state->client->publish, publish)) {
1400                 /*
1401                  * Something significant has changed in the configuration, so we are
1402                  * unable to use the old state object. The current state needs to go
1403                  * away and a new one needs to be created.
1404                  */
1405                 return 0;
1406         }
1407
1408         /*
1409          * We can reuse the current state object so keep it, but swap out the
1410          * underlying publish object with the new one.
1411          */
1412         old_publish = current_state->client->publish;
1413         current_state->client->publish = publish;
1414         if (ast_sip_push_task_synchronous(
1415                     NULL, sip_outbound_publisher_reinit_all, current_state->client->publishers)) {
1416                 /*
1417                  * If the state object fails to re-initialize then swap
1418                  * the old publish info back in.
1419                  */
1420                 current_state->client->publish = publish;
1421                 ast_log(LOG_ERROR, "Unable to reinitialize client(s) for outbound publish '%s'\n",
1422                         ast_sorcery_object_get_id(current_state->client->publish));
1423                 return -1;
1424         }
1425
1426         /*
1427          * Since we swapped out the publish object the new one needs a ref
1428          * while the old one needs to go away.
1429          */
1430         ao2_ref(current_state->client->publish, +1);
1431         ao2_cleanup(old_publish);
1432
1433         /* Tell the caller that the current state object should be used */
1434         return 1;
1435 }
1436
1437 /*! \brief Apply function which finds or allocates a state structure */
1438 static int sip_outbound_publish_apply(const struct ast_sorcery *sorcery, void *obj)
1439 {
1440 #define ADD_TO_NEW_STATES(__obj) \
1441         do { if (__obj) { \
1442              ao2_link(new_states, __obj); \
1443              ao2_ref(__obj, -1); } } while (0)
1444
1445         struct ast_sip_outbound_publish *applied = obj;
1446         struct ast_sip_outbound_publish_state *current_state, *new_state;
1447         struct sip_outbound_publisher *publisher = NULL;
1448         int res;
1449
1450         /*
1451          * New states are being loaded or reloaded. We'll need to add the new
1452          * object if created/updated, or keep the old object if an error occurs.
1453          */
1454         if (!new_states) {
1455                 new_states = ao2_container_alloc_options(
1456                         AO2_ALLOC_OPT_LOCK_NOLOCK, DEFAULT_STATE_BUCKETS,
1457                         outbound_publish_state_hash, outbound_publish_state_cmp);
1458
1459                 if (!new_states) {
1460                         ast_log(LOG_ERROR, "Unable to allocate new states container\n");
1461                         return -1;
1462                 }
1463         }
1464
1465         /* If there is current state we'll want to maintain it if any errors occur */
1466         current_state = sip_publish_state_get(ast_sorcery_object_get_id(applied));
1467
1468         if ((res = validate_publish_config(applied))) {
1469                 ADD_TO_NEW_STATES(current_state);
1470                 return res;
1471         }
1472
1473         if (current_state && (res = current_state_reusable(applied, current_state))) {
1474                 /*
1475                  * The current state object was able to be reused, or an error
1476                  * occurred. Either way we keep the current state and be done.
1477                  */
1478                 ADD_TO_NEW_STATES(current_state);
1479                 return res == 1 ? 0 : -1;
1480         }
1481
1482         /*
1483          * No current state was found or it was unable to be reused. Either way
1484          * we'll need to create a new state object.
1485          */
1486         new_state = sip_outbound_publish_state_alloc(applied);
1487         if (!new_state) {
1488                 ast_log(LOG_ERROR, "Unable to create state for outbound publish '%s'\n",
1489                         ast_sorcery_object_get_id(applied));
1490                 ADD_TO_NEW_STATES(current_state);
1491                 return -1;
1492         };
1493
1494         if (!applied->multi_user &&
1495             !(publisher = sip_outbound_publish_client_add_publisher(new_state->client, NULL))) {
1496                 ADD_TO_NEW_STATES(current_state);
1497                 ao2_ref(new_state, -1);
1498                 return -1;
1499         }
1500         ao2_cleanup(publisher);
1501
1502         ADD_TO_NEW_STATES(new_state);
1503         ao2_cleanup(current_state);
1504         return res;
1505 }
1506
1507 static int outbound_auth_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
1508 {
1509         struct ast_sip_outbound_publish *publish = obj;
1510
1511         return ast_sip_auth_vector_init(&publish->outbound_auths, var->value);
1512 }
1513
1514 static int load_module(void)
1515 {
1516         CHECK_PJSIP_MODULE_LOADED();
1517
1518         /* As of pjproject 2.4.5, PJSIP_MAX_URL_SIZE isn't exposed yet but we try anyway. */
1519         ast_pjproject_get_buildopt("PJSIP_MAX_URL_SIZE", "%d", &pjsip_max_url_size);
1520
1521         ast_sorcery_apply_config(ast_sip_get_sorcery(), "res_pjsip_outbound_publish");
1522         ast_sorcery_apply_default(ast_sip_get_sorcery(), "outbound-publish", "config", "pjsip.conf,criteria=type=outbound-publish");
1523
1524         if (ast_sorcery_object_register(ast_sip_get_sorcery(), "outbound-publish", sip_outbound_publish_alloc, NULL,
1525                 sip_outbound_publish_apply)) {
1526                 ast_log(LOG_ERROR, "Unable to register 'outbound-publish' type with sorcery\n");
1527                 return AST_MODULE_LOAD_DECLINE;
1528         }
1529
1530         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "type", "", OPT_NOOP_T, 0, 0);
1531         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "server_uri", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_outbound_publish, server_uri));
1532         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "from_uri", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_outbound_publish, from_uri));
1533         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "event", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_outbound_publish, event));
1534         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "to_uri", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_outbound_publish, to_uri));
1535         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "outbound_proxy", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_outbound_publish, outbound_proxy));
1536         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "expiration", "3600", OPT_UINT_T, 0, FLDSET(struct ast_sip_outbound_publish, expiration));
1537         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "max_auth_attempts", "5", OPT_UINT_T, 0, FLDSET(struct ast_sip_outbound_publish, max_auth_attempts));
1538         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "transport", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_outbound_publish, transport));
1539         ast_sorcery_object_field_register_custom(ast_sip_get_sorcery(), "outbound-publish", "outbound_auth", "", outbound_auth_handler, NULL, NULL, 0, 0);
1540         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "multi_user", "no", OPT_BOOL_T, 1, FLDSET(struct ast_sip_outbound_publish, multi_user));
1541
1542         ast_sorcery_reload_object(ast_sip_get_sorcery(), "outbound-publish");
1543
1544         AST_RWLIST_RDLOCK(&publisher_handlers);
1545         sip_outbound_publish_synchronize(NULL);
1546         AST_RWLIST_UNLOCK(&publisher_handlers);
1547
1548         pjsip_publishc_init_module(ast_sip_get_pjsip_endpoint());
1549
1550         return AST_MODULE_LOAD_SUCCESS;
1551 }
1552
1553 static int reload_module(void)
1554 {
1555         ast_sorcery_reload_object(ast_sip_get_sorcery(), "outbound-publish");
1556
1557         AST_RWLIST_RDLOCK(&publisher_handlers);
1558         sip_outbound_publish_synchronize(NULL);
1559         AST_RWLIST_UNLOCK(&publisher_handlers);
1560         return 0;
1561 }
1562
1563 static int current_publishing_count(void *obj, void *arg, int flags)
1564 {
1565         struct ast_sip_outbound_publish_state *state = obj;
1566         unloading.count += ao2_container_count(state->client->publishers);
1567         return 0;
1568 }
1569
1570 static int unload_module(void)
1571 {
1572         struct timeval start = ast_tvnow();
1573         struct timespec end = {
1574                 .tv_sec = start.tv_sec + 10,
1575                 .tv_nsec = start.tv_usec * 1000
1576         };
1577         int res = 0;
1578         struct ao2_container *states = ao2_global_obj_ref(current_states);
1579
1580         if (!states) {
1581                 return 0;
1582         }
1583
1584         unloading.count = 0;
1585         ao2_callback(states, OBJ_NODATA, current_publishing_count, NULL);
1586         ao2_ref(states, -1);
1587
1588         if (!unloading.count) {
1589                 return 0;
1590         }
1591
1592         ast_mutex_init(&unloading.lock);
1593         ast_cond_init(&unloading.cond, NULL);
1594         ast_mutex_lock(&unloading.lock);
1595
1596         unloading.is_unloading = 1;
1597         ao2_global_obj_release(current_states);
1598
1599         /* wait for items to unpublish */
1600         ast_verb(5, "Waiting to complete unpublishing task(s)\n");
1601         while (unloading.count && !res) {
1602                 res = ast_cond_timedwait(&unloading.cond, &unloading.lock, &end);
1603         }
1604         ast_mutex_unlock(&unloading.lock);
1605
1606         ast_mutex_destroy(&unloading.lock);
1607         ast_cond_destroy(&unloading.cond);
1608
1609         if (res) {
1610                 ast_verb(5, "At least %d items were unable to unpublish "
1611                         "in the allowed time\n", unloading.count);
1612         } else {
1613                 ast_verb(5, "All items successfully unpublished\n");
1614                 ast_sorcery_object_unregister(ast_sip_get_sorcery(), "outbound-publish");
1615         }
1616
1617         return res;
1618 }
1619
1620 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS | AST_MODFLAG_LOAD_ORDER, "PJSIP Outbound Publish Support",
1621         .load = load_module,
1622         .reload = reload_module,
1623         .unload = unload_module,
1624         .load_pri = AST_MODPRI_CHANNEL_DEPEND,
1625 );