res_pjsip_outbound_registration: Fix leak on vector add failure.
[asterisk/asterisk.git] / res / res_pjsip_outbound_publish.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2014, Digium, Inc.
5  *
6  * Joshua Colp <jcolp@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*** MODULEINFO
20         <depend>pjproject</depend>
21         <depend>res_pjproject</depend>
22         <depend>res_pjsip</depend>
23         <support_level>core</support_level>
24  ***/
25
26 #include "asterisk.h"
27
28 #include <pjsip.h>
29 #include <pjsip_simple.h>
30
31 #include "asterisk/res_pjproject.h"
32 #include "asterisk/res_pjsip.h"
33 #include "asterisk/res_pjsip_outbound_publish.h"
34 #include "asterisk/module.h"
35 #include "asterisk/taskprocessor.h"
36 #include "asterisk/threadpool.h"
37 #include "asterisk/datastore.h"
38 #include "res_pjsip/include/res_pjsip_private.h"
39
40 /*** DOCUMENTATION
41         <configInfo name="res_pjsip_outbound_publish" language="en_US">
42                 <synopsis>SIP resource for outbound publish</synopsis>
43                 <description><para>
44                         <emphasis>Outbound Publish</emphasis>
45                         </para>
46                         <para>This module allows <literal>res_pjsip</literal> to publish to other SIP servers.</para>
47                 </description>
48                 <configFile name="pjsip.conf">
49                         <configObject name="outbound-publish">
50                                 <synopsis>The configuration for outbound publish</synopsis>
51                                 <description><para>
52                                         Publish is <emphasis>COMPLETELY</emphasis> separate from the rest of
53                                         <literal>pjsip.conf</literal>. A minimal configuration consists of
54                                         setting a <literal>server_uri</literal> and <literal>event</literal>.
55                                 </para></description>
56                                 <configOption name="expiration" default="3600">
57                                         <synopsis>Expiration time for publications in seconds</synopsis>
58                                 </configOption>
59                                 <configOption name="outbound_auth" default="">
60                                         <synopsis>Authentication object(s) to be used for outbound publishes.</synopsis>
61                                         <description><para>
62                                                 This is a comma-delimited list of <replaceable>auth</replaceable>
63                                                 sections defined in <filename>pjsip.conf</filename> used to respond
64                                                 to outbound authentication challenges.</para>
65                                                 <note><para>
66                                                 Using the same auth section for inbound and outbound
67                                                 authentication is not recommended.  There is a difference in
68                                                 meaning for an empty realm setting between inbound and outbound
69                                                 authentication uses.  See the auth realm description for details.
70                                                 </para></note>
71                                         </description>
72                                 </configOption>
73                                 <configOption name="outbound_proxy" default="">
74                                         <synopsis>Full SIP URI of the outbound proxy used to send publishes</synopsis>
75                                 </configOption>
76                                 <configOption name="server_uri">
77                                         <synopsis>SIP URI of the server and entity to publish to</synopsis>
78                                         <description><para>
79                                                 This is the URI at which to find the entity and server to send the outbound PUBLISH to.
80                                                 This URI is used as the request URI of the outbound PUBLISH request from Asterisk.
81                                         </para></description>
82                                 </configOption>
83                                 <configOption name="from_uri">
84                                         <synopsis>SIP URI to use in the From header</synopsis>
85                                         <description><para>
86                                                 This is the URI that will be placed into the From header of outgoing PUBLISH
87                                                 messages. If no URI is specified then the URI provided in <literal>server_uri</literal>
88                                                 will be used.
89                                         </para></description>
90                                 </configOption>
91                                 <configOption name="to_uri">
92                                         <synopsis>SIP URI to use in the To header</synopsis>
93                                         <description><para>
94                                                 This is the URI that will be placed into the To header of outgoing PUBLISH
95                                                 messages. If no URI is specified then the URI provided in <literal>server_uri</literal>
96                                                 will be used.
97                                         </para></description>
98                                 </configOption>
99                                 <configOption name="event" default="">
100                                         <synopsis>Event type of the PUBLISH.</synopsis>
101                                 </configOption>
102                                 <configOption name="max_auth_attempts" default="5">
103                                         <synopsis>Maximum number of authentication attempts before stopping the publication.</synopsis>
104                                 </configOption>
105                                 <configOption name="transport">
106                                         <synopsis>Transport used for outbound publish</synopsis>
107                                         <description>
108                                                 <note><para>A <replaceable>transport</replaceable> configured in
109                                                 <literal>pjsip.conf</literal>. As with other <literal>res_pjsip</literal> modules, this will use the first available transport of the appropriate type if unconfigured.</para></note>
110                                         </description>
111                                 </configOption>
112                                 <configOption name="multi_user" default="no">
113                                         <synopsis>Enable multi-user support</synopsis>
114                                         <description><para>When enabled the user portion of the server uri is replaced by a dynamically created user</para></description>
115                                 </configOption>
116                                 <configOption name="type">
117                                         <synopsis>Must be of type 'outbound-publish'.</synopsis>
118                                 </configOption>
119                         </configObject>
120                 </configFile>
121         </configInfo>
122  ***/
123
124 static int pjsip_max_url_size = PJSIP_MAX_URL_SIZE;
125
126 /*! \brief Queued outbound publish message */
127 struct sip_outbound_publish_message {
128         /*! \brief Optional body */
129         struct ast_sip_body body;
130         /*! \brief Linked list information */
131         AST_LIST_ENTRY(sip_outbound_publish_message) entry;
132         /*! \brief Extra space for body contents */
133         char body_contents[0];
134 };
135
136 /*
137  * A note about some of the object types used in this module:
138  *
139  * The reason we currently have 4 separate object types that relate to configuration,
140  * publishing, state, and client information is due to object lifetimes and order of
141  * destruction dependencies.
142  *
143  * Separation of concerns is a good thing and of course it makes sense to have a
144  * configuration object type as well as an object type wrapper around pjsip's publishing
145  * client class. There also may be run time state data that needs to be tracked, so
146  * again having something to handle that is prudent. However, it may be tempting to think
147  * "why not combine the state and client object types?" Especially seeing as how they have
148  * a one-to-one relationship. The answer is, it's possible, but it'd make the code a bit
149  * more awkward.
150  *
151  * Currently this module maintains a global container of current state objects. When this
152  * states container is replaced, or deleted, it un-references all contained objects. Any
153  * state with a reference left have probably been carried over from a reload/realtime fetch.
154  * States not carried over are destructed and the associated client (and all its publishers)
155  * get unpublished.
156  *
157  * This "unpublishing" goes through a careful process of unpublishing the client, all its
158  * publishers, and making sure all the appropriate references are removed in a sane order.
159  * This process is essentially kicked off with the destruction of the state. If the state
160  * and client objects were to be merged, where clients became the globally tracked object
161  * type, this "unpublishing" process would never start because of the multiple references
162  * held to the client object over it's lifetime. Meaning the global tracking container
163  * would remove its reference to the client object when done with it, but other sources
164  * would still be holding a reference to it (namely the datastore and publisher(s)).
165  *
166  * Thus at this time it is easier to keep them separate.
167  */
168
169 /*! \brief Outbound publish information */
170 struct ast_sip_outbound_publish {
171         /*! \brief Sorcery object details */
172         SORCERY_OBJECT(details);
173         /*! \brief Stringfields */
174         AST_DECLARE_STRING_FIELDS(
175                 /*! \brief URI for the entity and server */
176                 AST_STRING_FIELD(server_uri);
177                 /*! \brief URI for the From header */
178                 AST_STRING_FIELD(from_uri);
179                 /*! \brief URI for the To header */
180                 AST_STRING_FIELD(to_uri);
181                 /*! \brief Explicit transport to use for publish */
182                 AST_STRING_FIELD(transport);
183                 /*! \brief Outbound proxy to use */
184                 AST_STRING_FIELD(outbound_proxy);
185                 /*! \brief The event type to publish */
186                 AST_STRING_FIELD(event);
187         );
188         /*! \brief Requested expiration time */
189         unsigned int expiration;
190         /*! \brief Maximum number of auth attempts before stopping the publish client */
191         unsigned int max_auth_attempts;
192         /*! \brief Configured authentication credentials */
193         struct ast_sip_auth_vector outbound_auths;
194         /*! \brief The publishing client is used for multiple users when true */
195         unsigned int multi_user;
196 };
197
198 struct sip_outbound_publisher {
199         /*! \brief The client object that 'owns' this client
200
201              \note any potential circular reference problems are accounted
202              for (see publisher alloc for more information)
203         */
204         struct ast_sip_outbound_publish_client *owner;
205         /*! \brief Underlying publish client */
206         pjsip_publishc *client;
207         /*! \brief The From URI for this specific publisher */
208         char *from_uri;
209         /*! \brief The To URI for this specific publisher */
210         char *to_uri;
211         /*! \brief Timer entry for refreshing publish */
212         pj_timer_entry timer;
213         /*! \brief The number of auth attempts done */
214         unsigned int auth_attempts;
215         /*! \brief Queue of outgoing publish messages to send*/
216         AST_LIST_HEAD_NOLOCK(, sip_outbound_publish_message) queue;
217         /*! \brief The message currently being sent */
218         struct sip_outbound_publish_message *sending;
219         /*! \brief Publish client should be destroyed */
220         unsigned int destroy;
221         /*! \brief Serializer for stuff and things */
222         struct ast_taskprocessor *serializer;
223         /*! \brief User, if any, associated with the publisher */
224         char user[0];
225 };
226
227 /*! \brief Outbound publish client state information (persists for lifetime of a publish) */
228 struct ast_sip_outbound_publish_client {
229         /*! \brief Outbound publish information */
230         struct ast_sip_outbound_publish *publish;
231         /*! \brief Publisher datastores set up by handlers */
232         struct ao2_container *datastores;
233         /*! \brief Container of all the client publishing objects */
234         struct ao2_container *publishers;
235         /*! \brief Publishing has been fully started and event type informed */
236         unsigned int started;
237 };
238
239 /*! \brief Outbound publish state information (persists for lifetime of a publish) */
240 struct ast_sip_outbound_publish_state {
241         /*! \brief Outbound publish client */
242         struct ast_sip_outbound_publish_client *client;
243         /* publish state id lookup key - same as publish configuration id */
244         char id[0];
245 };
246
247 /*!
248  * \brief Used for locking while loading/reloading
249  *
250  * Mutli-user configurations make it so publishers can be dynamically added and
251  * removed. Publishers should not be added or removed during a [re]load since
252  * it could cause the current_clients container to be out of sync. Thus the
253  * reason for this lock.
254  */
255 AST_RWLOCK_DEFINE_STATIC(load_lock);
256
257 #define DEFAULT_PUBLISHER_BUCKETS 119
258 AO2_STRING_FIELD_HASH_FN(sip_outbound_publisher, user);
259 AO2_STRING_FIELD_CMP_FN(sip_outbound_publisher, user);
260
261 /*! Time needs to be long enough for a transaction to timeout if nothing replies. */
262 #define MAX_UNLOAD_TIMEOUT_TIME         35      /* Seconds */
263
264 /*! Shutdown group to monitor sip_outbound_registration_client_state serializers. */
265 static struct ast_serializer_shutdown_group *shutdown_group;
266
267 /*! \brief Default number of client state container buckets */
268 #define DEFAULT_STATE_BUCKETS 31
269 static AO2_GLOBAL_OBJ_STATIC(current_states);
270 /*! \brief Used on [re]loads to hold new state data */
271 static struct ao2_container *new_states;
272
273 /*! \brief hashing function for state objects */
274 static int outbound_publish_state_hash(const void *obj, const int flags)
275 {
276         const struct ast_sip_outbound_publish_state *object;
277         const char *key;
278
279         switch (flags & OBJ_SEARCH_MASK) {
280         case OBJ_SEARCH_KEY:
281                 key = obj;
282                 break;
283         case OBJ_SEARCH_OBJECT:
284                 object = obj;
285                 key = object->id;
286                 break;
287         default:
288                 ast_assert(0);
289                 return 0;
290         }
291         return ast_str_hash(key);
292 }
293
294 /*! \brief comparator function for client objects */
295 static int outbound_publish_state_cmp(void *obj, void *arg, int flags)
296 {
297         const struct ast_sip_outbound_publish_state *object_left = obj;
298         const struct ast_sip_outbound_publish_state *object_right = arg;
299         const char *right_key = arg;
300         int cmp;
301
302         switch (flags & OBJ_SEARCH_MASK) {
303         case OBJ_SEARCH_OBJECT:
304                 right_key = object_right->id;
305                 /* Fall through */
306         case OBJ_SEARCH_KEY:
307                 cmp = strcmp(object_left->id, right_key);
308                 break;
309         case OBJ_SEARCH_PARTIAL_KEY:
310                 /* Not supported by container. */
311                 ast_assert(0);
312                 return 0;
313         default:
314                 cmp = 0;
315                 break;
316         }
317         if (cmp) {
318                 return 0;
319         }
320         return CMP_MATCH;
321 }
322
323 static struct ao2_container *get_publishes_and_update_state(void)
324 {
325         struct ao2_container *container;
326         SCOPED_WRLOCK(lock, &load_lock);
327
328         container = ast_sorcery_retrieve_by_fields(
329                 ast_sip_get_sorcery(), "outbound-publish",
330                 AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL);
331
332         if (!new_states) {
333                 return container;
334         }
335
336         ao2_global_obj_replace_unref(current_states, new_states);
337         ao2_cleanup(new_states);
338         new_states = NULL;
339
340         return container;
341 }
342
343 AST_RWLIST_HEAD_STATIC(publisher_handlers, ast_sip_event_publisher_handler);
344
345 static void sub_add_handler(struct ast_sip_event_publisher_handler *handler)
346 {
347         AST_RWLIST_INSERT_TAIL(&publisher_handlers, handler, next);
348         ast_module_ref(ast_module_info->self);
349 }
350
351 static struct ast_sip_event_publisher_handler *find_publisher_handler_for_event_name(const char *event_name)
352 {
353         struct ast_sip_event_publisher_handler *iter;
354
355         AST_RWLIST_TRAVERSE(&publisher_handlers, iter, next) {
356                 if (!strcmp(iter->event_name, event_name)) {
357                         break;
358                 }
359         }
360         return iter;
361 }
362
363 /*! \brief Helper function which cancels the refresh timer on a publisher */
364 static void cancel_publish_refresh(struct sip_outbound_publisher *publisher)
365 {
366         if (pj_timer_heap_cancel(pjsip_endpt_get_timer_heap(ast_sip_get_pjsip_endpoint()), &publisher->timer)) {
367                 /* The timer was successfully cancelled, drop the refcount of the publisher */
368                 ao2_ref(publisher, -1);
369         }
370 }
371
372 /*! \brief Helper function which sets up the timer to send publication */
373 static void schedule_publish_refresh(struct sip_outbound_publisher *publisher, int expiration)
374 {
375         struct ast_sip_outbound_publish *publish = ao2_bump(publisher->owner->publish);
376         pj_time_val delay = { .sec = 0, };
377
378         cancel_publish_refresh(publisher);
379
380         if (expiration > 0) {
381                 delay.sec = expiration - PJSIP_PUBLISHC_DELAY_BEFORE_REFRESH;
382         }
383         if (publish->expiration && ((delay.sec > publish->expiration) || !delay.sec)) {
384                 delay.sec = publish->expiration;
385         }
386         if (delay.sec < PJSIP_PUBLISHC_DELAY_BEFORE_REFRESH) {
387                 delay.sec = PJSIP_PUBLISHC_DELAY_BEFORE_REFRESH;
388         }
389
390         ao2_ref(publisher, +1);
391         if (pjsip_endpt_schedule_timer(ast_sip_get_pjsip_endpoint(), &publisher->timer, &delay) != PJ_SUCCESS) {
392                 ast_log(LOG_WARNING, "Failed to pass timed publish refresh to scheduler\n");
393                 ao2_ref(publisher, -1);
394         }
395         ao2_ref(publish, -1);
396 }
397
398 static int publisher_client_send(void *obj, void *arg, void *data, int flags);
399
400 /*! \brief Publish client timer callback function */
401 static void sip_outbound_publish_timer_cb(pj_timer_heap_t *timer_heap, struct pj_timer_entry *entry)
402 {
403         struct sip_outbound_publisher *publisher = entry->user_data;
404
405         ao2_lock(publisher);
406         if (AST_LIST_EMPTY(&publisher->queue)) {
407                 int res;
408                 /* If there are no outstanding messages send an empty PUBLISH message so our publication doesn't expire */
409                 publisher_client_send(publisher, NULL, &res, 0);
410         }
411         ao2_unlock(publisher);
412
413         ao2_ref(publisher, -1);
414 }
415
416 /*! \brief Task for cancelling a refresh timer */
417 static int cancel_refresh_timer_task(void *data)
418 {
419         struct sip_outbound_publisher *publisher = data;
420
421         cancel_publish_refresh(publisher);
422         ao2_ref(publisher, -1);
423
424         return 0;
425 }
426
427 static void set_transport(struct sip_outbound_publisher *publisher, pjsip_tx_data *tdata)
428 {
429         if (!ast_strlen_zero(publisher->owner->publish->transport)) {
430                 pjsip_tpselector selector = { .type = PJSIP_TPSELECTOR_NONE, };
431                 ast_sip_set_tpselector_from_transport_name(
432                         publisher->owner->publish->transport, &selector);
433                 pjsip_tx_data_set_transport(tdata, &selector);
434         }
435 }
436
437 /*! \brief Task for sending an unpublish */
438 static int send_unpublish_task(void *data)
439 {
440         struct sip_outbound_publisher *publisher = data;
441         pjsip_tx_data *tdata;
442
443         if (pjsip_publishc_unpublish(publisher->client, &tdata) == PJ_SUCCESS) {
444                 set_transport(publisher, tdata);
445                 pjsip_publishc_send(publisher->client, tdata);
446         }
447
448         ao2_ref(publisher, -1);
449
450         return 0;
451 }
452
453 static void stop_publishing(struct ast_sip_outbound_publish_client *client,
454                             struct ast_sip_event_publisher_handler *handler)
455 {
456         if (!handler) {
457                 handler = find_publisher_handler_for_event_name(client->publish->event);
458         }
459
460         if (handler) {
461                 handler->stop_publishing(client);
462         }
463 }
464
465 static int cancel_and_unpublish(void *obj, void *arg, int flags);
466
467 /*! \brief Helper function which starts or stops publish clients when applicable */
468 static void sip_outbound_publish_synchronize(struct ast_sip_event_publisher_handler *removed)
469 {
470         RAII_VAR(struct ao2_container *, publishes, get_publishes_and_update_state(), ao2_cleanup);
471         struct ao2_container *states;
472         struct ao2_iterator i;
473         struct ast_sip_outbound_publish_state *state;
474
475         if (!publishes) {
476                 return;
477         }
478
479         states = ao2_global_obj_ref(current_states);
480         if (!states) {
481                 return;
482         }
483
484         i = ao2_iterator_init(states, 0);
485         while ((state = ao2_iterator_next(&i))) {
486                 struct ast_sip_outbound_publish *publish = ao2_bump(state->client->publish);
487                 struct ast_sip_event_publisher_handler *handler = find_publisher_handler_for_event_name(publish->event);
488
489                 if (!state->client->started) {
490                         /* If the publisher client has not yet been started try to start it */
491                         if (!handler) {
492                                 ast_debug(2, "Could not find handler for event '%s' for outbound publish client '%s'\n",
493                                           publish->event, ast_sorcery_object_get_id(publish));
494                         } else if (handler->start_publishing(publish, state->client)) {
495                                 ast_log(LOG_ERROR, "Failed to start outbound publish with event '%s' for client '%s'\n",
496                                         publish->event, ast_sorcery_object_get_id(publish));
497                         } else {
498                                 state->client->started = 1;
499                         }
500                 } else if (state->client->started && !handler && removed && !strcmp(publish->event, removed->event_name)) {
501                         stop_publishing(state->client, removed);
502                         ao2_callback(state->client->publishers, OBJ_NODATA, cancel_and_unpublish, NULL);
503                         state->client->started = 0;
504                 }
505                 ao2_ref(publish, -1);
506                 ao2_ref(state, -1);
507         }
508         ao2_iterator_destroy(&i);
509         ao2_ref(states, -1);
510 }
511
512 static struct ast_sip_outbound_publish_state *sip_publish_state_get(const char *id)
513 {
514         struct ao2_container *states = ao2_global_obj_ref(current_states);
515         struct ast_sip_outbound_publish_state *res;
516
517         if (!states) {
518                 return NULL;
519         }
520
521         res = ao2_find(states, id, OBJ_SEARCH_KEY);
522         ao2_ref(states, -1);
523         return res;
524 }
525
526 struct ast_sip_outbound_publish_client *ast_sip_publish_client_get(const char *name)
527 {
528         struct ast_sip_outbound_publish_state *state = sip_publish_state_get(name);
529
530         if (!state) {
531                 return NULL;
532         }
533
534         ao2_ref(state->client, +1);
535         ao2_ref(state, -1);
536         return state->client;
537 }
538
539 const char *ast_sip_publish_client_get_from_uri(struct ast_sip_outbound_publish_client *client)
540 {
541         struct ast_sip_outbound_publish *publish = client->publish;
542
543         return S_OR(publish->from_uri, S_OR(publish->server_uri, ""));
544 }
545
546 static struct sip_outbound_publisher *sip_outbound_publish_client_add_publisher(
547         struct ast_sip_outbound_publish_client *client, const char *user);
548
549 static struct sip_outbound_publisher *sip_outbound_publish_client_get_publisher(
550         struct ast_sip_outbound_publish_client *client, const char *user)
551 {
552         struct sip_outbound_publisher *publisher;
553
554         /*
555          * Lock before searching since there could be a race between searching and adding.
556          * Just use the load_lock since we might need to lock it anyway (if adding) and
557          * also it simplifies the code (otherwise we'd have to lock the publishers, no-
558          * lock the search and pass a flag to 'add publisher to no-lock the potential link).
559          */
560         ast_rwlock_wrlock(&load_lock);
561         publisher = ao2_find(client->publishers, user, OBJ_SEARCH_KEY);
562         if (!publisher) {
563                 if (!(publisher = sip_outbound_publish_client_add_publisher(client, user))) {
564                         ast_rwlock_unlock(&load_lock);
565                         return NULL;
566                 }
567         }
568         ast_rwlock_unlock(&load_lock);
569
570         return publisher;
571 }
572
573 const char *ast_sip_publish_client_get_user_from_uri(struct ast_sip_outbound_publish_client *client, const char *user,
574         char *uri, size_t size)
575 {
576         struct sip_outbound_publisher *publisher;
577
578         publisher = sip_outbound_publish_client_get_publisher(client, user);
579         if (!publisher) {
580                 return NULL;
581         }
582
583         ast_copy_string(uri, publisher->from_uri, size);
584         ao2_ref(publisher, -1);
585
586         return uri;
587 }
588
589 const char *ast_sip_publish_client_get_to_uri(struct ast_sip_outbound_publish_client *client)
590 {
591         struct ast_sip_outbound_publish *publish = client->publish;
592
593         return S_OR(publish->to_uri, S_OR(publish->server_uri, ""));
594 }
595
596 const char *ast_sip_publish_client_get_user_to_uri(struct ast_sip_outbound_publish_client *client, const char *user,
597         char *uri, size_t size)
598 {
599         struct sip_outbound_publisher *publisher;
600
601         publisher = sip_outbound_publish_client_get_publisher(client, user);
602         if (!publisher) {
603                 return NULL;
604         }
605
606         ast_copy_string(uri, publisher->to_uri, size);
607         ao2_ref(publisher, -1);
608
609         return uri;
610 }
611
612 int ast_sip_register_event_publisher_handler(struct ast_sip_event_publisher_handler *handler)
613 {
614         struct ast_sip_event_publisher_handler *existing;
615         SCOPED_LOCK(lock, &publisher_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
616
617         if (!handler->start_publishing || !handler->stop_publishing) {
618                 ast_log(LOG_ERROR, "Handler does not implement required callbacks. Cannot register\n");
619                 return -1;
620         } else if (ast_strlen_zero(handler->event_name)) {
621                 ast_log(LOG_ERROR, "No event package specified for event publisher handler. Cannot register\n");
622                 return -1;
623         }
624
625         existing = find_publisher_handler_for_event_name(handler->event_name);
626         if (existing) {
627                 ast_log(LOG_ERROR, "Unable to register event publisher handler for event %s. "
628                                 "A handler is already registered\n", handler->event_name);
629                 return -1;
630         }
631
632         sub_add_handler(handler);
633
634         sip_outbound_publish_synchronize(NULL);
635
636         return 0;
637 }
638
639 void ast_sip_unregister_event_publisher_handler(struct ast_sip_event_publisher_handler *handler)
640 {
641         struct ast_sip_event_publisher_handler *iter;
642         SCOPED_LOCK(lock, &publisher_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
643         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&publisher_handlers, iter, next) {
644                 if (handler == iter) {
645                         AST_RWLIST_REMOVE_CURRENT(next);
646                         ast_module_unref(ast_module_info->self);
647                         break;
648                 }
649         }
650         AST_RWLIST_TRAVERSE_SAFE_END;
651
652         sip_outbound_publish_synchronize(handler);
653 }
654
655 /*! \brief Destructor function for publish information */
656 static void sip_outbound_publish_destroy(void *obj)
657 {
658         struct ast_sip_outbound_publish *publish = obj;
659
660         ast_sip_auth_vector_destroy(&publish->outbound_auths);
661
662         ast_string_field_free_memory(publish);
663 }
664
665 /*! \brief Allocator function for publish information */
666 static void *sip_outbound_publish_alloc(const char *name)
667 {
668         struct ast_sip_outbound_publish *publish = ast_sorcery_generic_alloc(sizeof(*publish),
669                 sip_outbound_publish_destroy);
670
671         if (!publish || ast_string_field_init(publish, 256)) {
672                 ao2_cleanup(publish);
673                 return NULL;
674         }
675
676         return publish;
677 }
678
679 static void sip_outbound_publish_datastore_destroy(void *obj)
680 {
681         struct ast_datastore *datastore = obj;
682
683         /* Using the destroy function (if present) destroy the data */
684         if (datastore->info->destroy != NULL && datastore->data != NULL) {
685                 datastore->info->destroy(datastore->data);
686                 datastore->data = NULL;
687         }
688
689         ast_free((void *) datastore->uid);
690         datastore->uid = NULL;
691 }
692
693 struct ast_datastore *ast_sip_publish_client_alloc_datastore(const struct ast_datastore_info *info, const char *uid)
694 {
695         RAII_VAR(struct ast_datastore *, datastore, NULL, ao2_cleanup);
696         const char *uid_ptr = uid;
697         char uuid_buf[AST_UUID_STR_LEN];
698
699         if (!info) {
700                 return NULL;
701         }
702
703         datastore = ao2_alloc(sizeof(*datastore), sip_outbound_publish_datastore_destroy);
704         if (!datastore) {
705                 return NULL;
706         }
707
708         datastore->info = info;
709         if (ast_strlen_zero(uid)) {
710                 /* They didn't provide an ID so we'll provide one ourself */
711                 uid_ptr = ast_uuid_generate_str(uuid_buf, sizeof(uuid_buf));
712         }
713
714         datastore->uid = ast_strdup(uid_ptr);
715         if (!datastore->uid) {
716                 return NULL;
717         }
718
719         ao2_ref(datastore, +1);
720         return datastore;
721 }
722
723 int ast_sip_publish_client_add_datastore(struct ast_sip_outbound_publish_client *client,
724         struct ast_datastore *datastore)
725 {
726         ast_assert(datastore != NULL);
727         ast_assert(datastore->info != NULL);
728         ast_assert(!ast_strlen_zero(datastore->uid));
729
730         if (!ao2_link(client->datastores, datastore)) {
731                 return -1;
732         }
733         return 0;
734 }
735
736 struct ast_datastore *ast_sip_publish_client_get_datastore(struct ast_sip_outbound_publish_client *client,
737         const char *name)
738 {
739         return ao2_find(client->datastores, name, OBJ_SEARCH_KEY);
740 }
741
742 void ast_sip_publish_client_remove_datastore(struct ast_sip_outbound_publish_client *client,
743         const char *name)
744 {
745         ao2_find(client->datastores, name, OBJ_SEARCH_KEY | OBJ_UNLINK | OBJ_NODATA);
746 }
747
748 static int sip_publisher_service_queue(void *data)
749 {
750         RAII_VAR(struct sip_outbound_publisher *, publisher, data, ao2_cleanup);
751         SCOPED_AO2LOCK(lock, publisher);
752         struct sip_outbound_publish_message *message;
753         pjsip_tx_data *tdata;
754         pj_status_t status;
755
756         if (publisher->destroy || publisher->sending || !(message = AST_LIST_FIRST(&publisher->queue))) {
757                 return 0;
758         }
759
760         if (pjsip_publishc_publish(publisher->client, PJ_FALSE, &tdata) != PJ_SUCCESS) {
761                 goto fatal;
762         }
763
764         if (!ast_strlen_zero(message->body.type) && !ast_strlen_zero(message->body.subtype) &&
765                 ast_sip_add_body(tdata, &message->body)) {
766                 pjsip_tx_data_dec_ref(tdata);
767                 goto fatal;
768         }
769
770         set_transport(publisher, tdata);
771
772         status = pjsip_publishc_send(publisher->client, tdata);
773         if (status == PJ_EBUSY) {
774                 /* We attempted to send the message but something else got there first */
775                 goto service;
776         } else if (status != PJ_SUCCESS) {
777                 goto fatal;
778         }
779
780         publisher->sending = message;
781
782         return 0;
783
784 fatal:
785         AST_LIST_REMOVE_HEAD(&publisher->queue, entry);
786         ast_free(message);
787
788 service:
789         if (ast_sip_push_task(publisher->serializer, sip_publisher_service_queue, ao2_bump(publisher))) {
790                 ao2_ref(publisher, -1);
791         }
792         return -1;
793 }
794
795 static int publisher_client_send(void *obj, void *arg, void *data, int flags)
796 {
797         struct sip_outbound_publisher *publisher = obj;
798         const struct ast_sip_body *body = arg;
799         struct sip_outbound_publish_message *message;
800         size_t type_len = 0, subtype_len = 0, body_text_len = 0;
801         int *res = data;
802         SCOPED_AO2LOCK(lock, publisher);
803
804         *res = -1;
805         if (!publisher->client) {
806                 return -1;
807         }
808
809         /* If a body is present we need more space for the contents of it */
810         if (body) {
811                 type_len = strlen(body->type) + 1;
812                 subtype_len = strlen(body->subtype) + 1;
813                 body_text_len = strlen(body->body_text) + 1;
814         }
815
816         message = ast_calloc(1, sizeof(*message) + type_len + subtype_len + body_text_len);
817         if (!message) {
818                 return -1;
819         }
820
821         if (body) {
822                 char *dst = message->body_contents;
823
824                 message->body.type = strcpy(dst, body->type);
825                 dst += type_len;
826                 message->body.subtype = strcpy(dst, body->subtype);
827                 dst += subtype_len;
828                 message->body.body_text = strcpy(dst, body->body_text);
829         }
830
831         AST_LIST_INSERT_TAIL(&publisher->queue, message, entry);
832
833         *res = ast_sip_push_task(publisher->serializer, sip_publisher_service_queue, ao2_bump(publisher));
834         if (*res) {
835                 ao2_ref(publisher, -1);
836         }
837
838         return *res;
839 }
840
841 int ast_sip_publish_client_send(struct ast_sip_outbound_publish_client *client,
842         const struct ast_sip_body *body)
843 {
844         SCOPED_AO2LOCK(lock, client);
845         int res = 0;
846
847         ao2_callback_data(client->publishers, OBJ_NODATA,
848                           publisher_client_send, (void *)body, &res);
849         return res;
850 }
851
852 static int sip_outbound_publisher_set_uri(
853         pj_pool_t *pool, const char *uri, const char *user, pj_str_t *res_uri)
854 {
855         pj_str_t tmp;
856         pjsip_uri *parsed;
857         pjsip_sip_uri *parsed_uri;
858         int size;
859
860         pj_strdup2_with_null(pool, &tmp, uri);
861         if (!(parsed = pjsip_parse_uri(pool, tmp.ptr, tmp.slen, 0))) {
862                 return -1;
863         }
864
865         if (!(parsed_uri = pjsip_uri_get_uri(parsed))) {
866                 return -1;
867         }
868
869         if (!ast_strlen_zero(user)) {
870                 pj_strdup2(pool, &parsed_uri->user, user);
871         }
872
873         res_uri->ptr = (char*) pj_pool_alloc(pool, pjsip_max_url_size);
874         if (!res_uri->ptr) {
875                 return -1;
876         }
877
878         if ((size = pjsip_uri_print(PJSIP_URI_IN_OTHER, parsed_uri, res_uri->ptr,
879                                     pjsip_max_url_size - 1)) <= 0) {
880                 return -1;
881         }
882         res_uri->ptr[size] = '\0';
883         res_uri->slen = size;
884
885         return 0;
886 }
887
888 static int sip_outbound_publisher_set_uris(
889         pj_pool_t *pool, struct sip_outbound_publisher *publisher,
890         pj_str_t *server_uri, pj_str_t *to_uri, pj_str_t *from_uri)
891 {
892         struct ast_sip_outbound_publish *publish = publisher->owner->publish;
893
894         if (sip_outbound_publisher_set_uri(pool, publish->server_uri, publisher->user, server_uri)) {
895                 ast_log(LOG_ERROR, "Invalid server URI '%s' specified on outbound publish '%s'\n",
896                         publish->server_uri, ast_sorcery_object_get_id(publish));
897                 return -1;
898         }
899
900         if (ast_strlen_zero(publish->to_uri)) {
901                 to_uri->ptr = server_uri->ptr;
902                 to_uri->slen = server_uri->slen;
903         } else if (sip_outbound_publisher_set_uri(pool, publish->to_uri, publisher->user, to_uri)) {
904                 ast_log(LOG_ERROR, "Invalid to URI '%s' specified on outbound publish '%s'\n",
905                         publish->to_uri, ast_sorcery_object_get_id(publish));
906                 return -1;
907         }
908
909         publisher->to_uri = ast_strdup(to_uri->ptr);
910         if (!publisher->to_uri) {
911                 return -1;
912         }
913
914         if (ast_strlen_zero(publish->from_uri)) {
915                 from_uri->ptr = server_uri->ptr;
916                 from_uri->slen = server_uri->slen;
917         } else if (sip_outbound_publisher_set_uri(pool, publish->from_uri, publisher->user, from_uri)) {
918                 ast_log(LOG_ERROR, "Invalid from URI '%s' specified on outbound publish '%s'\n",
919                         publish->from_uri, ast_sorcery_object_get_id(publish));
920                 return -1;
921         }
922
923         publisher->from_uri = ast_strdup(from_uri->ptr);
924         if (!publisher->from_uri) {
925                 return -1;
926         }
927
928         return 0;
929 }
930
931 static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param);
932
933 /*! \brief Helper function that allocates a pjsip publish client and configures it */
934 static int sip_outbound_publisher_init(void *data)
935 {
936         struct sip_outbound_publisher *publisher = data;
937         RAII_VAR(struct ast_sip_outbound_publish *, publish, NULL, ao2_cleanup);
938         pjsip_publishc_opt opt = {
939                 .queue_request = PJ_FALSE,
940         };
941         pj_pool_t *pool;
942         pj_str_t event, server_uri, to_uri, from_uri;
943
944         if (publisher->client) {
945                 return 0;
946         }
947
948         if (pjsip_publishc_create(ast_sip_get_pjsip_endpoint(), &opt,
949                                   ao2_bump(publisher), sip_outbound_publish_callback,
950                 &publisher->client) != PJ_SUCCESS) {
951                 ao2_ref(publisher, -1);
952                 return -1;
953         }
954
955         publish = ao2_bump(publisher->owner->publish);
956
957         if (!ast_strlen_zero(publish->outbound_proxy)) {
958                 pjsip_route_hdr route_set, *route;
959                 static const pj_str_t ROUTE_HNAME = { "Route", 5 };
960
961                 pj_list_init(&route_set);
962
963                 if (!(route = pjsip_parse_hdr(pjsip_publishc_get_pool(publisher->client), &ROUTE_HNAME,
964                         (char*)publish->outbound_proxy, strlen(publish->outbound_proxy), NULL))) {
965                         pjsip_publishc_destroy(publisher->client);
966                         return -1;
967                 }
968                 pj_list_insert_nodes_before(&route_set, route);
969
970                 pjsip_publishc_set_route_set(publisher->client, &route_set);
971         }
972
973         pool = pjsip_endpt_create_pool(ast_sip_get_pjsip_endpoint(), "URI Validation",
974                                        pjsip_max_url_size, pjsip_max_url_size);
975         if (!pool) {
976                 ast_log(LOG_ERROR, "Could not create pool for URI validation on outbound publish '%s'\n",
977                         ast_sorcery_object_get_id(publish));
978                 pjsip_publishc_destroy(publisher->client);
979                 return -1;
980         }
981
982         if (sip_outbound_publisher_set_uris(pool, publisher, &server_uri, &from_uri, &to_uri)) {
983                 pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
984                 pjsip_publishc_destroy(publisher->client);
985                 return -1;
986         }
987
988         pj_cstr(&event, publish->event);
989         if (pjsip_publishc_init(publisher->client, &event, &server_uri, &from_uri, &to_uri,
990                         publish->expiration) != PJ_SUCCESS) {
991                 ast_log(LOG_ERROR, "Failed to initialize publishing client on outbound publish '%s'\n",
992                         ast_sorcery_object_get_id(publish));
993                 pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
994                 pjsip_publishc_destroy(publisher->client);
995                 return -1;
996         }
997
998         pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
999         return 0;
1000 }
1001
1002 static int sip_outbound_publisher_reinit(void *obj, void *arg, int flags)
1003 {
1004         return sip_outbound_publisher_init(obj);
1005 }
1006
1007 static int sip_outbound_publisher_reinit_all(void *data)
1008 {
1009         ao2_callback(data, OBJ_NODATA, sip_outbound_publisher_reinit, NULL);
1010         return 0;
1011 }
1012
1013 /*! \brief Destructor function for publish client */
1014 static void sip_outbound_publisher_destroy(void *obj)
1015 {
1016         struct sip_outbound_publisher *publisher = obj;
1017         struct sip_outbound_publish_message *message;
1018
1019         /* You might be tempted to think "the publish client isn't being destroyed" but it actually is - just elsewhere */
1020
1021         while ((message = AST_LIST_REMOVE_HEAD(&publisher->queue, entry))) {
1022                 ast_free(message);
1023         }
1024
1025         ao2_cleanup(publisher->owner);
1026         ast_free(publisher->from_uri);
1027         ast_free(publisher->to_uri);
1028
1029         ast_taskprocessor_unreference(publisher->serializer);
1030 }
1031
1032 static struct sip_outbound_publisher *sip_outbound_publisher_alloc(
1033         struct ast_sip_outbound_publish_client *client, const char *user)
1034 {
1035         struct sip_outbound_publisher *publisher;
1036         char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
1037
1038         publisher = ao2_alloc(sizeof(*publisher) + (user ? strlen(user) : 0) + 1,
1039                               sip_outbound_publisher_destroy);
1040         if (!publisher) {
1041                 return NULL;
1042         }
1043
1044         /*
1045          * Bump the ref to the client. This essentially creates a circular reference,
1046          * but it is needed in order to make sure the client object doesn't get pulled
1047          * out from under us when the publisher stops publishing.
1048          *
1049          * The circular reference is alleviated by calling cancel_and_unpublish for
1050          * each client, from the state's destructor. By calling it there all references
1051          * to the publishers should go to zero, thus calling the publisher's destructor.
1052          * This in turn removes the client reference we added here. The state then removes
1053          * its reference to the client, which should take it to zero.
1054          */
1055         publisher->owner = ao2_bump(client);
1056         publisher->timer.user_data = publisher;
1057         publisher->timer.cb = sip_outbound_publish_timer_cb;
1058         if (user) {
1059                 strcpy(publisher->user, user);
1060         } else {
1061                 *publisher->user = '\0';
1062         }
1063
1064         ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/outpub/%s",
1065                 ast_sorcery_object_get_id(client->publish));
1066
1067         publisher->serializer = ast_sip_create_serializer_group(tps_name,
1068                 shutdown_group);
1069         if (!publisher->serializer) {
1070                 ao2_ref(publisher, -1);
1071                 return NULL;
1072         }
1073
1074         if (ast_sip_push_task_synchronous(NULL, sip_outbound_publisher_init, publisher)) {
1075                 ast_log(LOG_ERROR, "Unable to create publisher for outbound publish '%s'\n",
1076                         ast_sorcery_object_get_id(client->publish));
1077                 ao2_ref(publisher, -1);
1078                 return NULL;
1079         }
1080
1081         return publisher;
1082 }
1083
1084 static struct sip_outbound_publisher *sip_outbound_publish_client_add_publisher(
1085         struct ast_sip_outbound_publish_client *client, const char *user)
1086 {
1087         struct sip_outbound_publisher *publisher =
1088                 sip_outbound_publisher_alloc(client, user);
1089
1090         if (!publisher) {
1091                 return NULL;
1092         }
1093
1094         if (!ao2_link(client->publishers, publisher)) {
1095                 /*
1096                  * No need to bump the reference here. The task will take care of
1097                  * removing the reference.
1098                  */
1099                 if (ast_sip_push_task(publisher->serializer, cancel_refresh_timer_task, publisher)) {
1100                         ao2_ref(publisher, -1);
1101                 }
1102                 return NULL;
1103         }
1104
1105         return publisher;
1106 }
1107
1108 int ast_sip_publish_client_user_send(struct ast_sip_outbound_publish_client *client,
1109                                      const char *user, const struct ast_sip_body *body)
1110 {
1111         struct sip_outbound_publisher *publisher;
1112         int res;
1113
1114         publisher = sip_outbound_publish_client_get_publisher(client, user);
1115         if (!publisher) {
1116                 return -1;
1117         }
1118
1119         publisher_client_send(publisher, (void *)body, &res, 0);
1120         ao2_ref(publisher, -1);
1121         return res;
1122 }
1123
1124 void ast_sip_publish_client_remove(struct ast_sip_outbound_publish_client *client,
1125                                    const char *user)
1126 {
1127         SCOPED_WRLOCK(lock, &load_lock);
1128         ao2_find(client->publishers, user, OBJ_SEARCH_KEY | OBJ_UNLINK | OBJ_NODATA);
1129 }
1130
1131 static int explicit_publish_destroy(void *data)
1132 {
1133         struct sip_outbound_publisher *publisher = data;
1134
1135         /*
1136          * If there is no pjsip publishing client then we obviously don't need
1137          * to destroy it. Also, the ref for the Asterisk publishing client that
1138          * pjsip had would not exist or should already be gone as well.
1139          */
1140         if (publisher->client) {
1141                 pjsip_publishc_destroy(publisher->client);
1142                 ao2_ref(publisher, -1);
1143         }
1144
1145         ao2_ref(publisher, -1);
1146
1147         return 0;
1148 }
1149
1150 /*! \brief Helper function which cancels and un-publishes a no longer used client */
1151 static int cancel_and_unpublish(void *obj, void *arg, int flags)
1152 {
1153         struct sip_outbound_publisher *publisher = obj;
1154         struct ast_sip_outbound_publish_client *client = publisher->owner;
1155
1156         SCOPED_AO2LOCK(lock, publisher);
1157
1158         if (!client->started) {
1159                 /* If the publisher was never started, there's nothing to unpublish, so just
1160                  * destroy the publication and remove its reference to the publisher.
1161                  */
1162                 if (ast_sip_push_task(publisher->serializer, explicit_publish_destroy, ao2_bump(publisher))) {
1163                         ao2_ref(publisher, -1);
1164                 }
1165                 return 0;
1166         }
1167
1168         if (ast_sip_push_task(publisher->serializer, cancel_refresh_timer_task, ao2_bump(publisher))) {
1169                 ast_log(LOG_WARNING, "Could not stop refresh timer on outbound publish '%s'\n",
1170                         ast_sorcery_object_get_id(client->publish));
1171                 ao2_ref(publisher, -1);
1172         }
1173
1174         /* If nothing is being sent right now send the unpublish - the destroy will happen in the subsequent callback */
1175         if (!publisher->sending) {
1176                 if (ast_sip_push_task(publisher->serializer, send_unpublish_task, ao2_bump(publisher))) {
1177                         ast_log(LOG_WARNING, "Could not send unpublish message on outbound publish '%s'\n",
1178                                 ast_sorcery_object_get_id(client->publish));
1179                         ao2_ref(publisher, -1);
1180                 }
1181         }
1182         publisher->destroy = 1;
1183         return 0;
1184 }
1185
1186 /*! \brief Destructor function for publish client */
1187 static void sip_outbound_publish_client_destroy(void *obj)
1188 {
1189         struct ast_sip_outbound_publish_client *client = obj;
1190
1191         ao2_cleanup(client->datastores);
1192
1193         /*
1194          * The client's publishers have already been unpublished and destroyed
1195          * by this point, so it is safe to finally remove the reference to the
1196          * publish object. The client needed to hold a reference to it until
1197          * the publishers were done with it.
1198          */
1199         ao2_cleanup(client->publish);
1200 }
1201
1202 /*! \brief Destructor function for publish state */
1203 static void sip_outbound_publish_state_destroy(void *obj)
1204 {
1205         struct ast_sip_outbound_publish_state *state = obj;
1206
1207         stop_publishing(state->client, NULL);
1208         /*
1209          * Since the state is being destroyed the associated client needs to also
1210          * be destroyed. However simply removing the reference to the client will
1211          * not initiate client destruction since the client's publisher(s) hold a
1212          * reference to the client object as well. So we need to unpublish the
1213          * the client's publishers here, which will remove the publisher's client
1214          * reference during that process.
1215          *
1216          * That being said we don't want to remove the client's reference to the
1217          * publish object just yet. We'll hold off on that until client destruction
1218          * itself. This is because the publishers need access to the client's
1219          * publish object while they are unpublishing.
1220          */
1221         ao2_callback(state->client->publishers, OBJ_NODATA | OBJ_UNLINK, cancel_and_unpublish, NULL);
1222         ao2_cleanup(state->client->publishers);
1223
1224         state->client->started = 0;
1225         ao2_cleanup(state->client);
1226 }
1227
1228 /*!
1229  * \internal
1230  * \brief Check if a publish can be reused
1231  *
1232  * This checks if the existing outbound publish's configuration differs from a newly-applied
1233  * outbound publish.
1234  *
1235  * \param existing The pre-existing outbound publish
1236  * \param applied The newly-created publish
1237  */
1238 static int can_reuse_publish(struct ast_sip_outbound_publish *existing, struct ast_sip_outbound_publish *applied)
1239 {
1240         int i;
1241
1242         if (strcmp(existing->server_uri, applied->server_uri) || strcmp(existing->from_uri, applied->from_uri) ||
1243                 strcmp(existing->to_uri, applied->to_uri) || strcmp(existing->outbound_proxy, applied->outbound_proxy) ||
1244                 strcmp(existing->event, applied->event) ||
1245                 AST_VECTOR_SIZE(&existing->outbound_auths) != AST_VECTOR_SIZE(&applied->outbound_auths)) {
1246                 return 0;
1247         }
1248
1249         for (i = 0; i < AST_VECTOR_SIZE(&existing->outbound_auths); ++i) {
1250                 if (strcmp(AST_VECTOR_GET(&existing->outbound_auths, i), AST_VECTOR_GET(&applied->outbound_auths, i))) {
1251                         return 0;
1252                 }
1253         }
1254
1255         return 1;
1256 }
1257
1258 /*! \brief Callback function for publish client responses */
1259 static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param)
1260 {
1261 #define DESTROY_CLIENT() do {                      \
1262         pjsip_publishc_destroy(publisher->client); \
1263         publisher->client = NULL; \
1264         ao2_ref(publisher, -1); } while (0)
1265
1266         RAII_VAR(struct sip_outbound_publisher *, publisher, ao2_bump(param->token), ao2_cleanup);
1267         RAII_VAR(struct ast_sip_outbound_publish *, publish, ao2_bump(publisher->owner->publish), ao2_cleanup);
1268         SCOPED_AO2LOCK(lock, publisher);
1269         pjsip_tx_data *tdata;
1270
1271         if (publisher->destroy) {
1272                 if (publisher->sending) {
1273                         publisher->sending = NULL;
1274
1275                         if (!ast_sip_push_task(publisher->serializer, send_unpublish_task, ao2_bump(publisher))) {
1276                                 return;
1277                         }
1278                         ast_log(LOG_WARNING, "Could not send unpublish message on outbound publish '%s'\n",
1279                                 ast_sorcery_object_get_id(publish));
1280                         ao2_ref(publisher, -1);
1281                 }
1282                 /* Once the destroy is called this callback will not get called any longer, so drop the publisher ref */
1283                 DESTROY_CLIENT();
1284                 return;
1285         }
1286
1287         if (param->code == 401 || param->code == 407) {
1288                 pjsip_transaction *tsx = pjsip_rdata_get_tsx(param->rdata);
1289
1290                 if (!ast_sip_create_request_with_auth(&publish->outbound_auths,
1291                                 param->rdata, tsx->last_tx, &tdata)) {
1292                         set_transport(publisher, tdata);
1293                         pjsip_publishc_send(publisher->client, tdata);
1294                 }
1295                 publisher->auth_attempts++;
1296
1297                 if (publisher->auth_attempts == publish->max_auth_attempts) {
1298                         DESTROY_CLIENT();
1299                         ast_log(LOG_ERROR, "Reached maximum number of PUBLISH authentication attempts on outbound publish '%s'\n",
1300                                 ast_sorcery_object_get_id(publish));
1301
1302                         goto end;
1303                 }
1304                 return;
1305         }
1306
1307         publisher->auth_attempts = 0;
1308
1309         if (param->code == 412) {
1310                 DESTROY_CLIENT();
1311                 if (sip_outbound_publisher_init(publisher)) {
1312                         ast_log(LOG_ERROR, "Failed to create a new outbound publish client for '%s' on 412 response\n",
1313                                 ast_sorcery_object_get_id(publish));
1314                         goto end;
1315                 }
1316
1317                 /* Setting this to NULL will cause a new PUBLISH to get created and sent for the same underlying body */
1318                 publisher->sending = NULL;
1319         } else if (param->code == 423) {
1320                 /* Update the expiration with the new expiration time if available */
1321                 pjsip_expires_hdr *expires;
1322
1323                 expires = pjsip_msg_find_hdr(param->rdata->msg_info.msg, PJSIP_H_MIN_EXPIRES, NULL);
1324                 if (!expires || !expires->ivalue) {
1325                         DESTROY_CLIENT();
1326                         ast_log(LOG_ERROR, "Received 423 response on outbound publish '%s' without a Min-Expires header\n",
1327                                 ast_sorcery_object_get_id(publish));
1328                         goto end;
1329                 }
1330
1331                 pjsip_publishc_update_expires(publisher->client, expires->ivalue);
1332                 publisher->sending = NULL;
1333         } else if (publisher->sending) {
1334                 /* Remove the message currently being sent so that when the queue is serviced another will get sent */
1335                 AST_LIST_REMOVE_HEAD(&publisher->queue, entry);
1336                 ast_free(publisher->sending);
1337                 publisher->sending = NULL;
1338                 if (!param->rdata) {
1339                         ast_log(LOG_NOTICE, "No response received for outbound publish '%s'\n",
1340                                 ast_sorcery_object_get_id(publish));
1341                 }
1342         }
1343
1344         if (AST_LIST_EMPTY(&publisher->queue)) {
1345                 schedule_publish_refresh(publisher, param->expiration);
1346         }
1347
1348 end:
1349         if (!publisher->client) {
1350                 struct sip_outbound_publish_message *message;
1351
1352                 while ((message = AST_LIST_REMOVE_HEAD(&publisher->queue, entry))) {
1353                         ast_free(message);
1354                 }
1355         } else {
1356                 if (ast_sip_push_task(publisher->serializer, sip_publisher_service_queue, ao2_bump(publisher))) {
1357                         ao2_ref(publisher, -1);
1358                 }
1359         }
1360 }
1361
1362 #define DATASTORE_BUCKETS 53
1363
1364 static int datastore_hash(const void *obj, int flags)
1365 {
1366         const struct ast_datastore *datastore;
1367         const char *uid;
1368
1369         switch (flags & OBJ_SEARCH_MASK) {
1370         case OBJ_SEARCH_KEY:
1371                 uid = obj;
1372                 break;
1373         case OBJ_SEARCH_OBJECT:
1374                 datastore = obj;
1375                 uid = datastore->uid;
1376                 break;
1377         default:
1378                 /* Hash can only work on something with a full key. */
1379                 ast_assert(0);
1380                 return 0;
1381         }
1382
1383         return ast_str_hash(uid);
1384 }
1385
1386 static int datastore_cmp(void *obj, void *arg, int flags)
1387 {
1388         const struct ast_datastore *object_left = obj;
1389         const struct ast_datastore *object_right = arg;
1390         const char *right_key = arg;
1391         int cmp;
1392
1393         switch (flags & OBJ_SEARCH_MASK) {
1394         case OBJ_SEARCH_OBJECT:
1395                 right_key = object_right->uid;
1396                 /* Fall through */
1397         case OBJ_SEARCH_KEY:
1398                 cmp = strcmp(object_left->uid, right_key);
1399                 break;
1400         case OBJ_SEARCH_PARTIAL_KEY:
1401         cmp = strncmp(object_left->uid, right_key, strlen(right_key));
1402                 break;
1403         default:
1404                 /*
1405                  * What arg points to is specific to this traversal callback
1406                  * and has no special meaning to astobj2.
1407                  */
1408                 cmp = 0;
1409                 break;
1410         }
1411         if (cmp) {
1412                 return 0;
1413         }
1414         /*
1415          * At this point the traversal callback is identical to a sorted
1416          * container.
1417          */
1418         return CMP_MATCH;
1419 }
1420
1421 /*! \brief Allocator function for publish client */
1422 static struct ast_sip_outbound_publish_state *sip_outbound_publish_state_alloc(
1423         struct ast_sip_outbound_publish *publish)
1424 {
1425         const char *id = ast_sorcery_object_get_id(publish);
1426         struct ast_sip_outbound_publish_state *state =
1427                 ao2_alloc(sizeof(*state) + strlen(id) + 1, sip_outbound_publish_state_destroy);
1428
1429         if (!state) {
1430                 return NULL;
1431         }
1432
1433         state->client = ao2_alloc(sizeof(*state->client), sip_outbound_publish_client_destroy);
1434         if (!state->client) {
1435                 ao2_ref(state, -1);
1436                 return NULL;
1437         }
1438
1439         state->client->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp);
1440         if (!state->client->datastores) {
1441                 ao2_ref(state, -1);
1442                 return NULL;
1443         }
1444
1445         state->client->publishers = ao2_container_alloc(DATASTORE_BUCKETS, sip_outbound_publisher_hash_fn,
1446                                                         sip_outbound_publisher_cmp_fn);
1447         if (!state->client->publishers) {
1448                 ao2_ref(state, -1);
1449                 return NULL;
1450         }
1451
1452         state->client->publish = ao2_bump(publish);
1453
1454         strcpy(state->id, id);
1455         return state;
1456 }
1457
1458 static int validate_publish_config(struct ast_sip_outbound_publish *publish)
1459 {
1460         if (ast_strlen_zero(publish->server_uri)) {
1461                 ast_log(LOG_ERROR, "No server URI specified on outbound publish '%s'\n",
1462                         ast_sorcery_object_get_id(publish));
1463                 return -1;
1464         } else if (ast_sip_validate_uri_length(publish->server_uri)) {
1465                 ast_log(LOG_ERROR, "Server URI or hostname length exceeds pjproject limit or is not a sip(s) uri: '%s' on outbound publish '%s'\n",
1466                         publish->server_uri,
1467                         ast_sorcery_object_get_id(publish));
1468                 return -1;
1469         } else if (ast_strlen_zero(publish->event)) {
1470                 ast_log(LOG_ERROR, "No event type specified for outbound publish '%s'\n",
1471                         ast_sorcery_object_get_id(publish));
1472                 return -1;
1473         } else if (!ast_strlen_zero(publish->from_uri)
1474                 && ast_sip_validate_uri_length(publish->from_uri)) {
1475                 ast_log(LOG_ERROR, "From URI or hostname length exceeds pjproject limit or is not a sip(s) uri: '%s' on outbound publish '%s'\n",
1476                         publish->from_uri,
1477                         ast_sorcery_object_get_id(publish));
1478                 return -1;
1479         } else if (!ast_strlen_zero(publish->to_uri)
1480                 && ast_sip_validate_uri_length(publish->to_uri)) {
1481                 ast_log(LOG_ERROR, "To URI or hostname length exceeds pjproject limit or is not a sip(s) uri: '%s' on outbound publish '%s'\n",
1482                         publish->to_uri,
1483                         ast_sorcery_object_get_id(publish));
1484                 return -1;
1485         }
1486         return 0;
1487 }
1488
1489 static int current_state_reusable(struct ast_sip_outbound_publish *publish,
1490                                   struct ast_sip_outbound_publish_state *current_state)
1491 {
1492         struct ast_sip_outbound_publish *old_publish;
1493
1494         /*
1495          * Don't maintain the old state/client objects if the multi_user option changed.
1496          */
1497         if ((!publish->multi_user && current_state->client->publish->multi_user) ||
1498             (publish->multi_user && !current_state->client->publish->multi_user)) {
1499                 return 0;
1500         }
1501
1502
1503         if (!can_reuse_publish(current_state->client->publish, publish)) {
1504                 /*
1505                  * Something significant has changed in the configuration, so we are
1506                  * unable to use the old state object. The current state needs to go
1507                  * away and a new one needs to be created.
1508                  */
1509                 return 0;
1510         }
1511
1512         /*
1513          * We can reuse the current state object so keep it, but swap out the
1514          * underlying publish object with the new one.
1515          */
1516         old_publish = current_state->client->publish;
1517         current_state->client->publish = publish;
1518         if (ast_sip_push_task_synchronous(
1519                     NULL, sip_outbound_publisher_reinit_all, current_state->client->publishers)) {
1520                 /*
1521                  * If the state object fails to re-initialize then swap
1522                  * the old publish info back in.
1523                  */
1524                 current_state->client->publish = publish;
1525                 ast_log(LOG_ERROR, "Unable to reinitialize client(s) for outbound publish '%s'\n",
1526                         ast_sorcery_object_get_id(current_state->client->publish));
1527                 return -1;
1528         }
1529
1530         /*
1531          * Since we swapped out the publish object the new one needs a ref
1532          * while the old one needs to go away.
1533          */
1534         ao2_ref(current_state->client->publish, +1);
1535         ao2_cleanup(old_publish);
1536
1537         /* Tell the caller that the current state object should be used */
1538         return 1;
1539 }
1540
1541 /*! \brief Apply function which finds or allocates a state structure */
1542 static int sip_outbound_publish_apply(const struct ast_sorcery *sorcery, void *obj)
1543 {
1544 #define ADD_TO_NEW_STATES(__obj) \
1545         do { if (__obj) { \
1546              ao2_link(new_states, __obj); \
1547              ao2_ref(__obj, -1); } } while (0)
1548
1549         struct ast_sip_outbound_publish *applied = obj;
1550         struct ast_sip_outbound_publish_state *current_state, *new_state;
1551         struct sip_outbound_publisher *publisher = NULL;
1552         int res;
1553
1554         /*
1555          * New states are being loaded or reloaded. We'll need to add the new
1556          * object if created/updated, or keep the old object if an error occurs.
1557          */
1558         if (!new_states) {
1559                 new_states = ao2_container_alloc_options(
1560                         AO2_ALLOC_OPT_LOCK_NOLOCK, DEFAULT_STATE_BUCKETS,
1561                         outbound_publish_state_hash, outbound_publish_state_cmp);
1562
1563                 if (!new_states) {
1564                         ast_log(LOG_ERROR, "Unable to allocate new states container\n");
1565                         return -1;
1566                 }
1567         }
1568
1569         /* If there is current state we'll want to maintain it if any errors occur */
1570         current_state = sip_publish_state_get(ast_sorcery_object_get_id(applied));
1571
1572         if ((res = validate_publish_config(applied))) {
1573                 ADD_TO_NEW_STATES(current_state);
1574                 return res;
1575         }
1576
1577         if (current_state && (res = current_state_reusable(applied, current_state))) {
1578                 /*
1579                  * The current state object was able to be reused, or an error
1580                  * occurred. Either way we keep the current state and be done.
1581                  */
1582                 ADD_TO_NEW_STATES(current_state);
1583                 return res == 1 ? 0 : -1;
1584         }
1585
1586         /*
1587          * No current state was found or it was unable to be reused. Either way
1588          * we'll need to create a new state object.
1589          */
1590         new_state = sip_outbound_publish_state_alloc(applied);
1591         if (!new_state) {
1592                 ast_log(LOG_ERROR, "Unable to create state for outbound publish '%s'\n",
1593                         ast_sorcery_object_get_id(applied));
1594                 ADD_TO_NEW_STATES(current_state);
1595                 return -1;
1596         };
1597
1598         if (!applied->multi_user &&
1599             !(publisher = sip_outbound_publish_client_add_publisher(new_state->client, NULL))) {
1600                 ADD_TO_NEW_STATES(current_state);
1601                 ao2_ref(new_state, -1);
1602                 return -1;
1603         }
1604         ao2_cleanup(publisher);
1605
1606         ADD_TO_NEW_STATES(new_state);
1607         ao2_cleanup(current_state);
1608         return res;
1609 }
1610
1611 static int outbound_auth_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
1612 {
1613         struct ast_sip_outbound_publish *publish = obj;
1614
1615         return ast_sip_auth_vector_init(&publish->outbound_auths, var->value);
1616 }
1617
1618
1619 static int unload_module(void)
1620 {
1621         int remaining;
1622
1623         ast_sorcery_object_unregister(ast_sip_get_sorcery(), "outbound-publish");
1624
1625         ao2_global_obj_release(current_states);
1626
1627         /* Wait for publication serializers to get destroyed. */
1628         ast_debug(2, "Waiting for publication to complete for unload.\n");
1629         remaining = ast_serializer_shutdown_group_join(shutdown_group, MAX_UNLOAD_TIMEOUT_TIME);
1630         if (remaining) {
1631                 ast_log(LOG_WARNING, "Unload incomplete.  Could not stop %d outbound publications.  Try again later.\n",
1632                         remaining);
1633                 return -1;
1634         }
1635
1636         ast_debug(2, "Successful shutdown.\n");
1637
1638         ao2_cleanup(shutdown_group);
1639         shutdown_group = NULL;
1640
1641         return 0;
1642 }
1643
1644 static int load_module(void)
1645 {
1646         CHECK_PJSIP_MODULE_LOADED();
1647
1648         /* As of pjproject 2.4.5, PJSIP_MAX_URL_SIZE isn't exposed yet but we try anyway. */
1649         ast_pjproject_get_buildopt("PJSIP_MAX_URL_SIZE", "%d", &pjsip_max_url_size);
1650
1651         shutdown_group = ast_serializer_shutdown_group_alloc();
1652         if (!shutdown_group) {
1653                 return AST_MODULE_LOAD_DECLINE;
1654         }
1655
1656         ast_sorcery_apply_config(ast_sip_get_sorcery(), "res_pjsip_outbound_publish");
1657         ast_sorcery_apply_default(ast_sip_get_sorcery(), "outbound-publish", "config", "pjsip.conf,criteria=type=outbound-publish");
1658
1659         if (ast_sorcery_object_register(ast_sip_get_sorcery(), "outbound-publish", sip_outbound_publish_alloc, NULL,
1660                 sip_outbound_publish_apply)) {
1661                 ast_log(LOG_ERROR, "Unable to register 'outbound-publish' type with sorcery\n");
1662                 unload_module();
1663                 return AST_MODULE_LOAD_DECLINE;
1664         }
1665
1666         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "type", "", OPT_NOOP_T, 0, 0);
1667         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "server_uri", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_outbound_publish, server_uri));
1668         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "from_uri", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_outbound_publish, from_uri));
1669         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "event", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_outbound_publish, event));
1670         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "to_uri", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_outbound_publish, to_uri));
1671         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "outbound_proxy", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_outbound_publish, outbound_proxy));
1672         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "expiration", "3600", OPT_UINT_T, 0, FLDSET(struct ast_sip_outbound_publish, expiration));
1673         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "max_auth_attempts", "5", OPT_UINT_T, 0, FLDSET(struct ast_sip_outbound_publish, max_auth_attempts));
1674         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "transport", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_outbound_publish, transport));
1675         ast_sorcery_object_field_register_custom(ast_sip_get_sorcery(), "outbound-publish", "outbound_auth", "", outbound_auth_handler, NULL, NULL, 0, 0);
1676         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "multi_user", "no", OPT_BOOL_T, 1, FLDSET(struct ast_sip_outbound_publish, multi_user));
1677
1678         ast_sorcery_reload_object(ast_sip_get_sorcery(), "outbound-publish");
1679
1680         AST_RWLIST_RDLOCK(&publisher_handlers);
1681         sip_outbound_publish_synchronize(NULL);
1682         AST_RWLIST_UNLOCK(&publisher_handlers);
1683
1684         pjsip_publishc_init_module(ast_sip_get_pjsip_endpoint());
1685
1686         return AST_MODULE_LOAD_SUCCESS;
1687 }
1688
1689 static int reload_module(void)
1690 {
1691         ast_sorcery_reload_object(ast_sip_get_sorcery(), "outbound-publish");
1692
1693         AST_RWLIST_RDLOCK(&publisher_handlers);
1694         sip_outbound_publish_synchronize(NULL);
1695         AST_RWLIST_UNLOCK(&publisher_handlers);
1696         return 0;
1697 }
1698
1699 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS | AST_MODFLAG_LOAD_ORDER, "PJSIP Outbound Publish Support",
1700         .load = load_module,
1701         .reload = reload_module,
1702         .unload = unload_module,
1703         .load_pri = AST_MODPRI_CHANNEL_DEPEND,
1704 );