0aad5fcdb643939e220b09fe9c1cb7e9308b78a8
[asterisk/asterisk.git] / res / res_pjsip_outbound_publish.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2014, Digium, Inc.
5  *
6  * Joshua Colp <jcolp@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*** MODULEINFO
20         <depend>pjproject</depend>
21         <depend>res_pjsip</depend>
22         <support_level>core</support_level>
23  ***/
24
25 #include "asterisk.h"
26
27 #include <pjsip.h>
28 #include <pjsip_simple.h>
29
30 #include "asterisk/res_pjproject.h"
31 #include "asterisk/res_pjsip.h"
32 #include "asterisk/res_pjsip_outbound_publish.h"
33 #include "asterisk/module.h"
34 #include "asterisk/taskprocessor.h"
35 #include "asterisk/threadpool.h"
36 #include "asterisk/datastore.h"
37
38 /*** DOCUMENTATION
39         <configInfo name="res_pjsip_outbound_publish" language="en_US">
40                 <synopsis>SIP resource for outbound publish</synopsis>
41                 <description><para>
42                         <emphasis>Outbound Publish</emphasis>
43                         </para>
44                         <para>This module allows <literal>res_pjsip</literal> to publish to other SIP servers.</para>
45                 </description>
46                 <configFile name="pjsip.conf">
47                         <configObject name="outbound-publish">
48                                 <synopsis>The configuration for outbound publish</synopsis>
49                                 <description><para>
50                                         Publish is <emphasis>COMPLETELY</emphasis> separate from the rest of
51                                         <literal>pjsip.conf</literal>. A minimal configuration consists of
52                                         setting a <literal>server_uri</literal> and <literal>event</literal>.
53                                 </para></description>
54                                 <configOption name="expiration" default="3600">
55                                         <synopsis>Expiration time for publications in seconds</synopsis>
56                                 </configOption>
57                                 <configOption name="outbound_auth" default="">
58                                         <synopsis>Authentication object to be used for outbound publishes.</synopsis>
59                                 </configOption>
60                                 <configOption name="outbound_proxy" default="">
61                                         <synopsis>SIP URI of the outbound proxy used to send publishes</synopsis>
62                                 </configOption>
63                                 <configOption name="server_uri">
64                                         <synopsis>SIP URI of the server and entity to publish to</synopsis>
65                                         <description><para>
66                                                 This is the URI at which to find the entity and server to send the outbound PUBLISH to.
67                                                 This URI is used as the request URI of the outbound PUBLISH request from Asterisk.
68                                         </para></description>
69                                 </configOption>
70                                 <configOption name="from_uri">
71                                         <synopsis>SIP URI to use in the From header</synopsis>
72                                         <description><para>
73                                                 This is the URI that will be placed into the From header of outgoing PUBLISH
74                                                 messages. If no URI is specified then the URI provided in <literal>server_uri</literal>
75                                                 will be used.
76                                         </para></description>
77                                 </configOption>
78                                 <configOption name="to_uri">
79                                         <synopsis>SIP URI to use in the To header</synopsis>
80                                         <description><para>
81                                                 This is the URI that will be placed into the To header of outgoing PUBLISH
82                                                 messages. If no URI is specified then the URI provided in <literal>server_uri</literal>
83                                                 will be used.
84                                         </para></description>
85                                 </configOption>
86                                 <configOption name="event" default="">
87                                         <synopsis>Event type of the PUBLISH.</synopsis>
88                                 </configOption>
89                                 <configOption name="max_auth_attempts" default="5">
90                                         <synopsis>Maximum number of authentication attempts before stopping the publication.</synopsis>
91                                 </configOption>
92                                 <configOption name="transport">
93                                         <synopsis>Transport used for outbound publish</synopsis>
94                                         <description>
95                                                 <note><para>A <replaceable>transport</replaceable> configured in
96                                                 <literal>pjsip.conf</literal>. As with other <literal>res_pjsip</literal> modules, this will use the first available transport of the appropriate type if unconfigured.</para></note>
97                                         </description>
98                                 </configOption>
99                                 <configOption name="multi_user" default="no">
100                                         <synopsis>Enable multi-user support</synopsis>
101                                         <description><para>When enabled the user portion of the server uri is replaced by a dynamically created user</para></description>
102                                 </configOption>
103                                 <configOption name="type">
104                                         <synopsis>Must be of type 'outbound-publish'.</synopsis>
105                                 </configOption>
106                         </configObject>
107                 </configFile>
108         </configInfo>
109  ***/
110
111 static int pjsip_max_url_size = PJSIP_MAX_URL_SIZE;
112
113 /*! \brief Queued outbound publish message */
114 struct sip_outbound_publish_message {
115         /*! \brief Optional body */
116         struct ast_sip_body body;
117         /*! \brief Linked list information */
118         AST_LIST_ENTRY(sip_outbound_publish_message) entry;
119         /*! \brief Extra space for body contents */
120         char body_contents[0];
121 };
122
123 /*
124  * A note about some of the object types used in this module:
125  *
126  * The reason we currently have 4 separate object types that relate to configuration,
127  * publishing, state, and client information is due to object lifetimes and order of
128  * destruction dependencies.
129  *
130  * Separation of concerns is a good thing and of course it makes sense to have a
131  * configuration object type as well as an object type wrapper around pjsip's publishing
132  * client class. There also may be run time state data that needs to be tracked, so
133  * again having something to handle that is prudent. However, it may be tempting to think
134  * "why not combine the state and client object types?" Especially seeing as how they have
135  * a one-to-one relationship. The answer is, it's possible, but it'd make the code a bit
136  * more awkward.
137  *
138  * Currently this module maintains a global container of current state objects. When this
139  * states container is replaced, or deleted, it un-references all contained objects. Any
140  * state with a reference left have probably been carried over from a reload/realtime fetch.
141  * States not carried over are destructed and the associated client (and all its publishers)
142  * get unpublished.
143  *
144  * This "unpublishing" goes through a careful process of unpublishing the client, all its
145  * publishers, and making sure all the appropriate references are removed in a sane order.
146  * This process is essentially kicked off with the destruction of the state. If the state
147  * and client objects were to be merged, where clients became the globally tracked object
148  * type, this "unpublishing" process would never start because of the multiple references
149  * held to the client object over it's lifetime. Meaning the global tracking container
150  * would remove its reference to the client object when done with it, but other sources
151  * would still be holding a reference to it (namely the datastore and publisher(s)).
152  *
153  * Thus at this time it is easier to keep them separate.
154  */
155
156 /*! \brief Outbound publish information */
157 struct ast_sip_outbound_publish {
158         /*! \brief Sorcery object details */
159         SORCERY_OBJECT(details);
160         /*! \brief Stringfields */
161         AST_DECLARE_STRING_FIELDS(
162                 /*! \brief URI for the entity and server */
163                 AST_STRING_FIELD(server_uri);
164                 /*! \brief URI for the From header */
165                 AST_STRING_FIELD(from_uri);
166                 /*! \brief URI for the To header */
167                 AST_STRING_FIELD(to_uri);
168                 /*! \brief Explicit transport to use for publish */
169                 AST_STRING_FIELD(transport);
170                 /*! \brief Outbound proxy to use */
171                 AST_STRING_FIELD(outbound_proxy);
172                 /*! \brief The event type to publish */
173                 AST_STRING_FIELD(event);
174         );
175         /*! \brief Requested expiration time */
176         unsigned int expiration;
177         /*! \brief Maximum number of auth attempts before stopping the publish client */
178         unsigned int max_auth_attempts;
179         /*! \brief Configured authentication credentials */
180         struct ast_sip_auth_vector outbound_auths;
181         /*! \brief The publishing client is used for multiple users when true */
182         unsigned int multi_user;
183 };
184
185 struct sip_outbound_publisher {
186         /*! \brief The client object that 'owns' this client
187
188              \note any potential circular reference problems are accounted
189              for (see publisher alloc for more information)
190         */
191         struct ast_sip_outbound_publish_client *owner;
192         /*! \brief Underlying publish client */
193         pjsip_publishc *client;
194         /*! \brief The From URI for this specific publisher */
195         char *from_uri;
196         /*! \brief The To URI for this specific publisher */
197         char *to_uri;
198         /*! \brief Timer entry for refreshing publish */
199         pj_timer_entry timer;
200         /*! \brief The number of auth attempts done */
201         unsigned int auth_attempts;
202         /*! \brief Queue of outgoing publish messages to send*/
203         AST_LIST_HEAD_NOLOCK(, sip_outbound_publish_message) queue;
204         /*! \brief The message currently being sent */
205         struct sip_outbound_publish_message *sending;
206         /*! \brief Publish client should be destroyed */
207         unsigned int destroy;
208         /*! \brief Serializer for stuff and things */
209         struct ast_taskprocessor *serializer;
210         /*! \brief User, if any, associated with the publisher */
211         char user[0];
212 };
213
214 /*! \brief Outbound publish client state information (persists for lifetime of a publish) */
215 struct ast_sip_outbound_publish_client {
216         /*! \brief Outbound publish information */
217         struct ast_sip_outbound_publish *publish;
218         /*! \brief Publisher datastores set up by handlers */
219         struct ao2_container *datastores;
220         /*! \brief Container of all the client publishing objects */
221         struct ao2_container *publishers;
222         /*! \brief Publishing has been fully started and event type informed */
223         unsigned int started;
224 };
225
226 /*! \brief Outbound publish state information (persists for lifetime of a publish) */
227 struct ast_sip_outbound_publish_state {
228         /*! \brief Outbound publish client */
229         struct ast_sip_outbound_publish_client *client;
230         /* publish state id lookup key - same as publish configuration id */
231         char id[0];
232 };
233
234 /*!
235  * \brief Used for locking while loading/reloading
236  *
237  * Mutli-user configurations make it so publishers can be dynamically added and
238  * removed. Publishers should not be added or removed during a [re]load since
239  * it could cause the current_clients container to be out of sync. Thus the
240  * reason for this lock.
241  */
242 AST_RWLOCK_DEFINE_STATIC(load_lock);
243
244 #define DEFAULT_PUBLISHER_BUCKETS 119
245 AO2_STRING_FIELD_HASH_FN(sip_outbound_publisher, user);
246 AO2_STRING_FIELD_CMP_FN(sip_outbound_publisher, user);
247
248 /*! Time needs to be long enough for a transaction to timeout if nothing replies. */
249 #define MAX_UNLOAD_TIMEOUT_TIME         35      /* Seconds */
250
251 /*! Shutdown group to monitor sip_outbound_registration_client_state serializers. */
252 static struct ast_serializer_shutdown_group *shutdown_group;
253
254 /*! \brief Default number of client state container buckets */
255 #define DEFAULT_STATE_BUCKETS 31
256 static AO2_GLOBAL_OBJ_STATIC(current_states);
257 /*! \brief Used on [re]loads to hold new state data */
258 static struct ao2_container *new_states;
259
260 /*! \brief hashing function for state objects */
261 static int outbound_publish_state_hash(const void *obj, const int flags)
262 {
263         const struct ast_sip_outbound_publish_state *object;
264         const char *key;
265
266         switch (flags & OBJ_SEARCH_MASK) {
267         case OBJ_SEARCH_KEY:
268                 key = obj;
269                 break;
270         case OBJ_SEARCH_OBJECT:
271                 object = obj;
272                 key = object->id;
273                 break;
274         default:
275                 ast_assert(0);
276                 return 0;
277         }
278         return ast_str_hash(key);
279 }
280
281 /*! \brief comparator function for client objects */
282 static int outbound_publish_state_cmp(void *obj, void *arg, int flags)
283 {
284         const struct ast_sip_outbound_publish_state *object_left = obj;
285         const struct ast_sip_outbound_publish_state *object_right = arg;
286         const char *right_key = arg;
287         int cmp;
288
289         switch (flags & OBJ_SEARCH_MASK) {
290         case OBJ_SEARCH_OBJECT:
291                 right_key = object_right->id;
292                 /* Fall through */
293         case OBJ_SEARCH_KEY:
294                 cmp = strcmp(object_left->id, right_key);
295                 break;
296         case OBJ_SEARCH_PARTIAL_KEY:
297                 /* Not supported by container. */
298                 ast_assert(0);
299                 return 0;
300         default:
301                 cmp = 0;
302                 break;
303         }
304         if (cmp) {
305                 return 0;
306         }
307         return CMP_MATCH;
308 }
309
310 static struct ao2_container *get_publishes_and_update_state(void)
311 {
312         struct ao2_container *container;
313         SCOPED_WRLOCK(lock, &load_lock);
314
315         container = ast_sorcery_retrieve_by_fields(
316                 ast_sip_get_sorcery(), "outbound-publish",
317                 AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL);
318
319         if (!new_states) {
320                 return container;
321         }
322
323         ao2_global_obj_replace_unref(current_states, new_states);
324         ao2_cleanup(new_states);
325         new_states = NULL;
326
327         return container;
328 }
329
330 AST_RWLIST_HEAD_STATIC(publisher_handlers, ast_sip_event_publisher_handler);
331
332 static void sub_add_handler(struct ast_sip_event_publisher_handler *handler)
333 {
334         AST_RWLIST_INSERT_TAIL(&publisher_handlers, handler, next);
335         ast_module_ref(ast_module_info->self);
336 }
337
338 static struct ast_sip_event_publisher_handler *find_publisher_handler_for_event_name(const char *event_name)
339 {
340         struct ast_sip_event_publisher_handler *iter;
341
342         AST_RWLIST_TRAVERSE(&publisher_handlers, iter, next) {
343                 if (!strcmp(iter->event_name, event_name)) {
344                         break;
345                 }
346         }
347         return iter;
348 }
349
350 /*! \brief Helper function which cancels the refresh timer on a publisher */
351 static void cancel_publish_refresh(struct sip_outbound_publisher *publisher)
352 {
353         if (pj_timer_heap_cancel(pjsip_endpt_get_timer_heap(ast_sip_get_pjsip_endpoint()), &publisher->timer)) {
354                 /* The timer was successfully cancelled, drop the refcount of the publisher */
355                 ao2_ref(publisher, -1);
356         }
357 }
358
359 /*! \brief Helper function which sets up the timer to send publication */
360 static void schedule_publish_refresh(struct sip_outbound_publisher *publisher, int expiration)
361 {
362         struct ast_sip_outbound_publish *publish = ao2_bump(publisher->owner->publish);
363         pj_time_val delay = { .sec = 0, };
364
365         cancel_publish_refresh(publisher);
366
367         if (expiration > 0) {
368                 delay.sec = expiration - PJSIP_PUBLISHC_DELAY_BEFORE_REFRESH;
369         }
370         if (publish->expiration && ((delay.sec > publish->expiration) || !delay.sec)) {
371                 delay.sec = publish->expiration;
372         }
373         if (delay.sec < PJSIP_PUBLISHC_DELAY_BEFORE_REFRESH) {
374                 delay.sec = PJSIP_PUBLISHC_DELAY_BEFORE_REFRESH;
375         }
376
377         ao2_ref(publisher, +1);
378         if (pjsip_endpt_schedule_timer(ast_sip_get_pjsip_endpoint(), &publisher->timer, &delay) != PJ_SUCCESS) {
379                 ast_log(LOG_WARNING, "Failed to pass timed publish refresh to scheduler\n");
380                 ao2_ref(publisher, -1);
381         }
382         ao2_ref(publish, -1);
383 }
384
385 static int publisher_client_send(void *obj, void *arg, void *data, int flags);
386
387 /*! \brief Publish client timer callback function */
388 static void sip_outbound_publish_timer_cb(pj_timer_heap_t *timer_heap, struct pj_timer_entry *entry)
389 {
390         struct sip_outbound_publisher *publisher = entry->user_data;
391
392         ao2_lock(publisher);
393         if (AST_LIST_EMPTY(&publisher->queue)) {
394                 int res;
395                 /* If there are no outstanding messages send an empty PUBLISH message so our publication doesn't expire */
396                 publisher_client_send(publisher, NULL, &res, 0);
397         }
398         ao2_unlock(publisher);
399
400         ao2_ref(publisher, -1);
401 }
402
403 /*! \brief Task for cancelling a refresh timer */
404 static int cancel_refresh_timer_task(void *data)
405 {
406         struct sip_outbound_publisher *publisher = data;
407
408         cancel_publish_refresh(publisher);
409         ao2_ref(publisher, -1);
410
411         return 0;
412 }
413
414 static void set_transport(struct sip_outbound_publisher *publisher, pjsip_tx_data *tdata)
415 {
416         if (!ast_strlen_zero(publisher->owner->publish->transport)) {
417                 pjsip_tpselector selector = { .type = PJSIP_TPSELECTOR_NONE, };
418                 ast_sip_set_tpselector_from_transport_name(
419                         publisher->owner->publish->transport, &selector);
420                 pjsip_tx_data_set_transport(tdata, &selector);
421         }
422 }
423
424 /*! \brief Task for sending an unpublish */
425 static int send_unpublish_task(void *data)
426 {
427         struct sip_outbound_publisher *publisher = data;
428         pjsip_tx_data *tdata;
429
430         if (pjsip_publishc_unpublish(publisher->client, &tdata) == PJ_SUCCESS) {
431                 set_transport(publisher, tdata);
432                 pjsip_publishc_send(publisher->client, tdata);
433         }
434
435         ao2_ref(publisher, -1);
436
437         return 0;
438 }
439
440 static void stop_publishing(struct ast_sip_outbound_publish_client *client,
441                             struct ast_sip_event_publisher_handler *handler)
442 {
443         if (!handler) {
444                 handler = find_publisher_handler_for_event_name(client->publish->event);
445         }
446
447         if (handler) {
448                 handler->stop_publishing(client);
449         }
450 }
451
452 static int cancel_and_unpublish(void *obj, void *arg, int flags);
453
454 /*! \brief Helper function which starts or stops publish clients when applicable */
455 static void sip_outbound_publish_synchronize(struct ast_sip_event_publisher_handler *removed)
456 {
457         RAII_VAR(struct ao2_container *, publishes, get_publishes_and_update_state(), ao2_cleanup);
458         struct ao2_container *states;
459         struct ao2_iterator i;
460         struct ast_sip_outbound_publish_state *state;
461
462         if (!publishes) {
463                 return;
464         }
465
466         states = ao2_global_obj_ref(current_states);
467         if (!states) {
468                 return;
469         }
470
471         i = ao2_iterator_init(states, 0);
472         while ((state = ao2_iterator_next(&i))) {
473                 struct ast_sip_outbound_publish *publish = ao2_bump(state->client->publish);
474                 struct ast_sip_event_publisher_handler *handler = find_publisher_handler_for_event_name(publish->event);
475
476                 if (!state->client->started) {
477                         /* If the publisher client has not yet been started try to start it */
478                         if (!handler) {
479                                 ast_debug(2, "Could not find handler for event '%s' for outbound publish client '%s'\n",
480                                           publish->event, ast_sorcery_object_get_id(publish));
481                         } else if (handler->start_publishing(publish, state->client)) {
482                                 ast_log(LOG_ERROR, "Failed to start outbound publish with event '%s' for client '%s'\n",
483                                         publish->event, ast_sorcery_object_get_id(publish));
484                         } else {
485                                 state->client->started = 1;
486                         }
487                 } else if (state->client->started && !handler && removed && !strcmp(publish->event, removed->event_name)) {
488                         stop_publishing(state->client, removed);
489                         ao2_callback(state->client->publishers, OBJ_NODATA, cancel_and_unpublish, NULL);
490                         state->client->started = 0;
491                 }
492                 ao2_ref(publish, -1);
493                 ao2_ref(state, -1);
494         }
495         ao2_iterator_destroy(&i);
496         ao2_ref(states, -1);
497 }
498
499 static struct ast_sip_outbound_publish_state *sip_publish_state_get(const char *id)
500 {
501         struct ao2_container *states = ao2_global_obj_ref(current_states);
502         struct ast_sip_outbound_publish_state *res;
503
504         if (!states) {
505                 return NULL;
506         }
507
508         res = ao2_find(states, id, OBJ_SEARCH_KEY);
509         ao2_ref(states, -1);
510         return res;
511 }
512
513 struct ast_sip_outbound_publish_client *ast_sip_publish_client_get(const char *name)
514 {
515         struct ast_sip_outbound_publish_state *state = sip_publish_state_get(name);
516
517         if (!state) {
518                 return NULL;
519         }
520
521         ao2_ref(state->client, +1);
522         ao2_ref(state, -1);
523         return state->client;
524 }
525
526 const char *ast_sip_publish_client_get_from_uri(struct ast_sip_outbound_publish_client *client)
527 {
528         struct ast_sip_outbound_publish *publish = client->publish;
529
530         return S_OR(publish->from_uri, S_OR(publish->server_uri, ""));
531 }
532
533 static struct sip_outbound_publisher *sip_outbound_publish_client_add_publisher(
534         struct ast_sip_outbound_publish_client *client, const char *user);
535
536 static struct sip_outbound_publisher *sip_outbound_publish_client_get_publisher(
537         struct ast_sip_outbound_publish_client *client, const char *user)
538 {
539         struct sip_outbound_publisher *publisher;
540
541         /*
542          * Lock before searching since there could be a race between searching and adding.
543          * Just use the load_lock since we might need to lock it anyway (if adding) and
544          * also it simplifies the code (otherwise we'd have to lock the publishers, no-
545          * lock the search and pass a flag to 'add publisher to no-lock the potential link).
546          */
547         ast_rwlock_wrlock(&load_lock);
548         publisher = ao2_find(client->publishers, user, OBJ_SEARCH_KEY);
549         if (!publisher) {
550                 if (!(publisher = sip_outbound_publish_client_add_publisher(client, user))) {
551                         ast_rwlock_unlock(&load_lock);
552                         return NULL;
553                 }
554         }
555         ast_rwlock_unlock(&load_lock);
556
557         return publisher;
558 }
559
560 const char *ast_sip_publish_client_get_user_from_uri(struct ast_sip_outbound_publish_client *client, const char *user,
561         char *uri, size_t size)
562 {
563         struct sip_outbound_publisher *publisher;
564
565         publisher = sip_outbound_publish_client_get_publisher(client, user);
566         if (!publisher) {
567                 return NULL;
568         }
569
570         ast_copy_string(uri, publisher->from_uri, size);
571         ao2_ref(publisher, -1);
572
573         return uri;
574 }
575
576 const char *ast_sip_publish_client_get_to_uri(struct ast_sip_outbound_publish_client *client)
577 {
578         struct ast_sip_outbound_publish *publish = client->publish;
579
580         return S_OR(publish->to_uri, S_OR(publish->server_uri, ""));
581 }
582
583 const char *ast_sip_publish_client_get_user_to_uri(struct ast_sip_outbound_publish_client *client, const char *user,
584         char *uri, size_t size)
585 {
586         struct sip_outbound_publisher *publisher;
587
588         publisher = sip_outbound_publish_client_get_publisher(client, user);
589         if (!publisher) {
590                 return NULL;
591         }
592
593         ast_copy_string(uri, publisher->to_uri, size);
594         ao2_ref(publisher, -1);
595
596         return uri;
597 }
598
599 int ast_sip_register_event_publisher_handler(struct ast_sip_event_publisher_handler *handler)
600 {
601         struct ast_sip_event_publisher_handler *existing;
602         SCOPED_LOCK(lock, &publisher_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
603
604         if (!handler->start_publishing || !handler->stop_publishing) {
605                 ast_log(LOG_ERROR, "Handler does not implement required callbacks. Cannot register\n");
606                 return -1;
607         } else if (ast_strlen_zero(handler->event_name)) {
608                 ast_log(LOG_ERROR, "No event package specified for event publisher handler. Cannot register\n");
609                 return -1;
610         }
611
612         existing = find_publisher_handler_for_event_name(handler->event_name);
613         if (existing) {
614                 ast_log(LOG_ERROR, "Unable to register event publisher handler for event %s. "
615                                 "A handler is already registered\n", handler->event_name);
616                 return -1;
617         }
618
619         sub_add_handler(handler);
620
621         sip_outbound_publish_synchronize(NULL);
622
623         return 0;
624 }
625
626 void ast_sip_unregister_event_publisher_handler(struct ast_sip_event_publisher_handler *handler)
627 {
628         struct ast_sip_event_publisher_handler *iter;
629         SCOPED_LOCK(lock, &publisher_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
630         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&publisher_handlers, iter, next) {
631                 if (handler == iter) {
632                         AST_RWLIST_REMOVE_CURRENT(next);
633                         ast_module_unref(ast_module_info->self);
634                         break;
635                 }
636         }
637         AST_RWLIST_TRAVERSE_SAFE_END;
638
639         sip_outbound_publish_synchronize(handler);
640 }
641
642 /*! \brief Destructor function for publish information */
643 static void sip_outbound_publish_destroy(void *obj)
644 {
645         struct ast_sip_outbound_publish *publish = obj;
646
647         ast_sip_auth_vector_destroy(&publish->outbound_auths);
648
649         ast_string_field_free_memory(publish);
650 }
651
652 /*! \brief Allocator function for publish information */
653 static void *sip_outbound_publish_alloc(const char *name)
654 {
655         struct ast_sip_outbound_publish *publish = ast_sorcery_generic_alloc(sizeof(*publish),
656                 sip_outbound_publish_destroy);
657
658         if (!publish || ast_string_field_init(publish, 256)) {
659                 ao2_cleanup(publish);
660                 return NULL;
661         }
662
663         return publish;
664 }
665
666 static void sip_outbound_publish_datastore_destroy(void *obj)
667 {
668         struct ast_datastore *datastore = obj;
669
670         /* Using the destroy function (if present) destroy the data */
671         if (datastore->info->destroy != NULL && datastore->data != NULL) {
672                 datastore->info->destroy(datastore->data);
673                 datastore->data = NULL;
674         }
675
676         ast_free((void *) datastore->uid);
677         datastore->uid = NULL;
678 }
679
680 struct ast_datastore *ast_sip_publish_client_alloc_datastore(const struct ast_datastore_info *info, const char *uid)
681 {
682         RAII_VAR(struct ast_datastore *, datastore, NULL, ao2_cleanup);
683         const char *uid_ptr = uid;
684         char uuid_buf[AST_UUID_STR_LEN];
685
686         if (!info) {
687                 return NULL;
688         }
689
690         datastore = ao2_alloc(sizeof(*datastore), sip_outbound_publish_datastore_destroy);
691         if (!datastore) {
692                 return NULL;
693         }
694
695         datastore->info = info;
696         if (ast_strlen_zero(uid)) {
697                 /* They didn't provide an ID so we'll provide one ourself */
698                 uid_ptr = ast_uuid_generate_str(uuid_buf, sizeof(uuid_buf));
699         }
700
701         datastore->uid = ast_strdup(uid_ptr);
702         if (!datastore->uid) {
703                 return NULL;
704         }
705
706         ao2_ref(datastore, +1);
707         return datastore;
708 }
709
710 int ast_sip_publish_client_add_datastore(struct ast_sip_outbound_publish_client *client,
711         struct ast_datastore *datastore)
712 {
713         ast_assert(datastore != NULL);
714         ast_assert(datastore->info != NULL);
715         ast_assert(!ast_strlen_zero(datastore->uid));
716
717         if (!ao2_link(client->datastores, datastore)) {
718                 return -1;
719         }
720         return 0;
721 }
722
723 struct ast_datastore *ast_sip_publish_client_get_datastore(struct ast_sip_outbound_publish_client *client,
724         const char *name)
725 {
726         return ao2_find(client->datastores, name, OBJ_SEARCH_KEY);
727 }
728
729 void ast_sip_publish_client_remove_datastore(struct ast_sip_outbound_publish_client *client,
730         const char *name)
731 {
732         ao2_find(client->datastores, name, OBJ_SEARCH_KEY | OBJ_UNLINK | OBJ_NODATA);
733 }
734
735 static int sip_publisher_service_queue(void *data)
736 {
737         RAII_VAR(struct sip_outbound_publisher *, publisher, data, ao2_cleanup);
738         SCOPED_AO2LOCK(lock, publisher);
739         struct sip_outbound_publish_message *message;
740         pjsip_tx_data *tdata;
741         pj_status_t status;
742
743         if (publisher->destroy || publisher->sending || !(message = AST_LIST_FIRST(&publisher->queue))) {
744                 return 0;
745         }
746
747         if (pjsip_publishc_publish(publisher->client, PJ_FALSE, &tdata) != PJ_SUCCESS) {
748                 goto fatal;
749         }
750
751         if (!ast_strlen_zero(message->body.type) && !ast_strlen_zero(message->body.subtype) &&
752                 ast_sip_add_body(tdata, &message->body)) {
753                 pjsip_tx_data_dec_ref(tdata);
754                 goto fatal;
755         }
756
757         set_transport(publisher, tdata);
758
759         status = pjsip_publishc_send(publisher->client, tdata);
760         if (status == PJ_EBUSY) {
761                 /* We attempted to send the message but something else got there first */
762                 goto service;
763         } else if (status != PJ_SUCCESS) {
764                 goto fatal;
765         }
766
767         publisher->sending = message;
768
769         return 0;
770
771 fatal:
772         AST_LIST_REMOVE_HEAD(&publisher->queue, entry);
773         ast_free(message);
774
775 service:
776         if (ast_sip_push_task(publisher->serializer, sip_publisher_service_queue, ao2_bump(publisher))) {
777                 ao2_ref(publisher, -1);
778         }
779         return -1;
780 }
781
782 static int publisher_client_send(void *obj, void *arg, void *data, int flags)
783 {
784         struct sip_outbound_publisher *publisher = obj;
785         const struct ast_sip_body *body = arg;
786         struct sip_outbound_publish_message *message;
787         size_t type_len = 0, subtype_len = 0, body_text_len = 0;
788         int *res = data;
789
790         *res = -1;
791         if (!publisher->client) {
792                 return -1;
793         }
794
795         /* If a body is present we need more space for the contents of it */
796         if (body) {
797                 type_len = strlen(body->type) + 1;
798                 subtype_len = strlen(body->subtype) + 1;
799                 body_text_len = strlen(body->body_text) + 1;
800         }
801
802         message = ast_calloc(1, sizeof(*message) + type_len + subtype_len + body_text_len);
803         if (!message) {
804                 return -1;
805         }
806
807         if (body) {
808                 char *dst = message->body_contents;
809
810                 message->body.type = strcpy(dst, body->type);
811                 dst += type_len;
812                 message->body.subtype = strcpy(dst, body->subtype);
813                 dst += subtype_len;
814                 message->body.body_text = strcpy(dst, body->body_text);
815         }
816
817         AST_LIST_INSERT_TAIL(&publisher->queue, message, entry);
818
819         *res = ast_sip_push_task(publisher->serializer, sip_publisher_service_queue, ao2_bump(publisher));
820         if (*res) {
821                 ao2_ref(publisher, -1);
822         }
823
824         return *res;
825 }
826
827 int ast_sip_publish_client_send(struct ast_sip_outbound_publish_client *client,
828         const struct ast_sip_body *body)
829 {
830         SCOPED_AO2LOCK(lock, client);
831         int res = 0;
832
833         ao2_callback_data(client->publishers, OBJ_NODATA,
834                           publisher_client_send, (void *)body, &res);
835         return res;
836 }
837
838 static int sip_outbound_publisher_set_uri(
839         pj_pool_t *pool, const char *uri, const char *user, pj_str_t *res_uri)
840 {
841         pj_str_t tmp;
842         pjsip_uri *parsed;
843         pjsip_sip_uri *parsed_uri;
844         int size;
845
846         pj_strdup2_with_null(pool, &tmp, uri);
847         if (!(parsed = pjsip_parse_uri(pool, tmp.ptr, tmp.slen, 0))) {
848                 return -1;
849         }
850
851         if (!(parsed_uri = pjsip_uri_get_uri(parsed))) {
852                 return -1;
853         }
854
855         if (!ast_strlen_zero(user)) {
856                 pj_strdup2(pool, &parsed_uri->user, user);
857         }
858
859         res_uri->ptr = (char*) pj_pool_alloc(pool, pjsip_max_url_size);
860         if (!res_uri->ptr) {
861                 return -1;
862         }
863
864         if ((size = pjsip_uri_print(PJSIP_URI_IN_OTHER, parsed_uri, res_uri->ptr,
865                                     pjsip_max_url_size - 1)) <= 0) {
866                 return -1;
867         }
868         res_uri->ptr[size] = '\0';
869         res_uri->slen = size;
870
871         return 0;
872 }
873
874 static int sip_outbound_publisher_set_uris(
875         pj_pool_t *pool, struct sip_outbound_publisher *publisher,
876         pj_str_t *server_uri, pj_str_t *to_uri, pj_str_t *from_uri)
877 {
878         struct ast_sip_outbound_publish *publish = publisher->owner->publish;
879
880         if (sip_outbound_publisher_set_uri(pool, publish->server_uri, publisher->user, server_uri)) {
881                 ast_log(LOG_ERROR, "Invalid server URI '%s' specified on outbound publish '%s'\n",
882                         publish->server_uri, ast_sorcery_object_get_id(publish));
883                 return -1;
884         }
885
886         if (ast_strlen_zero(publish->to_uri)) {
887                 to_uri->ptr = server_uri->ptr;
888                 to_uri->slen = server_uri->slen;
889         } else if (sip_outbound_publisher_set_uri(pool, publish->to_uri, publisher->user, to_uri)) {
890                 ast_log(LOG_ERROR, "Invalid to URI '%s' specified on outbound publish '%s'\n",
891                         publish->to_uri, ast_sorcery_object_get_id(publish));
892                 return -1;
893         }
894
895         publisher->to_uri = ast_strdup(to_uri->ptr);
896         if (!publisher->to_uri) {
897                 return -1;
898         }
899
900         if (ast_strlen_zero(publish->from_uri)) {
901                 from_uri->ptr = server_uri->ptr;
902                 from_uri->slen = server_uri->slen;
903         } else if (sip_outbound_publisher_set_uri(pool, publish->from_uri, publisher->user, from_uri)) {
904                 ast_log(LOG_ERROR, "Invalid from URI '%s' specified on outbound publish '%s'\n",
905                         publish->from_uri, ast_sorcery_object_get_id(publish));
906                 return -1;
907         }
908
909         publisher->from_uri = ast_strdup(from_uri->ptr);
910         if (!publisher->from_uri) {
911                 return -1;
912         }
913
914         return 0;
915 }
916
917 static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param);
918
919 /*! \brief Helper function that allocates a pjsip publish client and configures it */
920 static int sip_outbound_publisher_init(void *data)
921 {
922         struct sip_outbound_publisher *publisher = data;
923         RAII_VAR(struct ast_sip_outbound_publish *, publish, NULL, ao2_cleanup);
924         pjsip_publishc_opt opt = {
925                 .queue_request = PJ_FALSE,
926         };
927         pj_pool_t *pool;
928         pj_str_t event, server_uri, to_uri, from_uri;
929
930         if (publisher->client) {
931                 return 0;
932         }
933
934         if (pjsip_publishc_create(ast_sip_get_pjsip_endpoint(), &opt,
935                                   ao2_bump(publisher), sip_outbound_publish_callback,
936                 &publisher->client) != PJ_SUCCESS) {
937                 ao2_ref(publisher, -1);
938                 return -1;
939         }
940
941         publish = ao2_bump(publisher->owner->publish);
942
943         if (!ast_strlen_zero(publish->outbound_proxy)) {
944                 pjsip_route_hdr route_set, *route;
945                 static const pj_str_t ROUTE_HNAME = { "Route", 5 };
946
947                 pj_list_init(&route_set);
948
949                 if (!(route = pjsip_parse_hdr(pjsip_publishc_get_pool(publisher->client), &ROUTE_HNAME,
950                         (char*)publish->outbound_proxy, strlen(publish->outbound_proxy), NULL))) {
951                         pjsip_publishc_destroy(publisher->client);
952                         return -1;
953                 }
954                 pj_list_insert_nodes_before(&route_set, route);
955
956                 pjsip_publishc_set_route_set(publisher->client, &route_set);
957         }
958
959         pool = pjsip_endpt_create_pool(ast_sip_get_pjsip_endpoint(), "URI Validation",
960                                        pjsip_max_url_size, pjsip_max_url_size);
961         if (!pool) {
962                 ast_log(LOG_ERROR, "Could not create pool for URI validation on outbound publish '%s'\n",
963                         ast_sorcery_object_get_id(publish));
964                 pjsip_publishc_destroy(publisher->client);
965                 return -1;
966         }
967
968         if (sip_outbound_publisher_set_uris(pool, publisher, &server_uri, &from_uri, &to_uri)) {
969                 pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
970                 pjsip_publishc_destroy(publisher->client);
971                 return -1;
972         }
973
974         pj_cstr(&event, publish->event);
975         if (pjsip_publishc_init(publisher->client, &event, &server_uri, &from_uri, &to_uri,
976                                 publish->expiration != PJ_SUCCESS)) {
977                 ast_log(LOG_ERROR, "Failed to initialize publishing client on outbound publish '%s'\n",
978                         ast_sorcery_object_get_id(publish));
979                 pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
980                 pjsip_publishc_destroy(publisher->client);
981                 return -1;
982         }
983
984         pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
985         return 0;
986 }
987
988 static int sip_outbound_publisher_reinit(void *obj, void *arg, int flags)
989 {
990         return sip_outbound_publisher_init(obj);
991 }
992
993 static int sip_outbound_publisher_reinit_all(void *data)
994 {
995         ao2_callback(data, OBJ_NODATA, sip_outbound_publisher_reinit, NULL);
996         return 0;
997 }
998
999 /*! \brief Destructor function for publish client */
1000 static void sip_outbound_publisher_destroy(void *obj)
1001 {
1002         struct sip_outbound_publisher *publisher = obj;
1003         struct sip_outbound_publish_message *message;
1004
1005         /* You might be tempted to think "the publish client isn't being destroyed" but it actually is - just elsewhere */
1006
1007         while ((message = AST_LIST_REMOVE_HEAD(&publisher->queue, entry))) {
1008                 ast_free(message);
1009         }
1010
1011         ao2_cleanup(publisher->owner);
1012         ast_free(publisher->from_uri);
1013         ast_free(publisher->to_uri);
1014
1015         ast_taskprocessor_unreference(publisher->serializer);
1016 }
1017
1018 static struct sip_outbound_publisher *sip_outbound_publisher_alloc(
1019         struct ast_sip_outbound_publish_client *client, const char *user)
1020 {
1021         struct sip_outbound_publisher *publisher;
1022         char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
1023
1024         publisher = ao2_alloc(sizeof(*publisher) + (user ? strlen(user) : 0) + 1,
1025                               sip_outbound_publisher_destroy);
1026         if (!publisher) {
1027                 return NULL;
1028         }
1029
1030         /*
1031          * Bump the ref to the client. This essentially creates a circular reference,
1032          * but it is needed in order to make sure the client object doesn't get pulled
1033          * out from under us when the publisher stops publishing.
1034          *
1035          * The circular reference is alleviated by calling cancel_and_unpublish for
1036          * each client, from the state's destructor. By calling it there all references
1037          * to the publishers should go to zero, thus calling the publisher's destructor.
1038          * This in turn removes the client reference we added here. The state then removes
1039          * its reference to the client, which should take it to zero.
1040          */
1041         publisher->owner = ao2_bump(client);
1042         publisher->timer.user_data = publisher;
1043         publisher->timer.cb = sip_outbound_publish_timer_cb;
1044         if (user) {
1045                 strcpy(publisher->user, user);
1046         } else {
1047                 *publisher->user = '\0';
1048         }
1049
1050         ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/outpub/%s",
1051                 ast_sorcery_object_get_id(client->publish));
1052
1053         publisher->serializer = ast_sip_create_serializer_group(tps_name,
1054                 shutdown_group);
1055         if (!publisher->serializer) {
1056                 ao2_ref(publisher, -1);
1057                 return NULL;
1058         }
1059
1060         if (ast_sip_push_task_synchronous(NULL, sip_outbound_publisher_init, publisher)) {
1061                 ast_log(LOG_ERROR, "Unable to create publisher for outbound publish '%s'\n",
1062                         ast_sorcery_object_get_id(client->publish));
1063                 ao2_ref(publisher, -1);
1064                 return NULL;
1065         }
1066
1067         return publisher;
1068 }
1069
1070 static struct sip_outbound_publisher *sip_outbound_publish_client_add_publisher(
1071         struct ast_sip_outbound_publish_client *client, const char *user)
1072 {
1073         struct sip_outbound_publisher *publisher =
1074                 sip_outbound_publisher_alloc(client, user);
1075
1076         if (!publisher) {
1077                 return NULL;
1078         }
1079
1080         if (!ao2_link(client->publishers, publisher)) {
1081                 /*
1082                  * No need to bump the reference here. The task will take care of
1083                  * removing the reference.
1084                  */
1085                 if (ast_sip_push_task(publisher->serializer, cancel_refresh_timer_task, publisher)) {
1086                         ao2_ref(publisher, -1);
1087                 }
1088                 return NULL;
1089         }
1090
1091         return publisher;
1092 }
1093
1094 int ast_sip_publish_client_user_send(struct ast_sip_outbound_publish_client *client,
1095                                      const char *user, const struct ast_sip_body *body)
1096 {
1097         struct sip_outbound_publisher *publisher;
1098         int res;
1099
1100         publisher = sip_outbound_publish_client_get_publisher(client, user);
1101         if (!publisher) {
1102                 return -1;
1103         }
1104
1105         publisher_client_send(publisher, (void *)body, &res, 0);
1106         ao2_ref(publisher, -1);
1107         return res;
1108 }
1109
1110 void ast_sip_publish_client_remove(struct ast_sip_outbound_publish_client *client,
1111                                    const char *user)
1112 {
1113         SCOPED_WRLOCK(lock, &load_lock);
1114         ao2_find(client->publishers, user, OBJ_SEARCH_KEY | OBJ_UNLINK | OBJ_NODATA);
1115 }
1116
1117 static int explicit_publish_destroy(void *data)
1118 {
1119         struct sip_outbound_publisher *publisher = data;
1120
1121         /*
1122          * If there is no pjsip publishing client then we obviously don't need
1123          * to destroy it. Also, the ref for the Asterisk publishing client that
1124          * pjsip had would not exist or should already be gone as well.
1125          */
1126         if (publisher->client) {
1127                 pjsip_publishc_destroy(publisher->client);
1128                 ao2_ref(publisher, -1);
1129         }
1130
1131         ao2_ref(publisher, -1);
1132
1133         return 0;
1134 }
1135
1136 /*! \brief Helper function which cancels and un-publishes a no longer used client */
1137 static int cancel_and_unpublish(void *obj, void *arg, int flags)
1138 {
1139         struct sip_outbound_publisher *publisher = obj;
1140         struct ast_sip_outbound_publish_client *client = publisher->owner;
1141
1142         SCOPED_AO2LOCK(lock, publisher);
1143
1144         if (!client->started) {
1145                 /* If the publisher was never started, there's nothing to unpublish, so just
1146                  * destroy the publication and remove its reference to the publisher.
1147                  */
1148                 if (ast_sip_push_task(publisher->serializer, explicit_publish_destroy, ao2_bump(publisher))) {
1149                         ao2_ref(publisher, -1);
1150                 }
1151                 return 0;
1152         }
1153
1154         if (ast_sip_push_task(publisher->serializer, cancel_refresh_timer_task, ao2_bump(publisher))) {
1155                 ast_log(LOG_WARNING, "Could not stop refresh timer on outbound publish '%s'\n",
1156                         ast_sorcery_object_get_id(client->publish));
1157                 ao2_ref(publisher, -1);
1158         }
1159
1160         /* If nothing is being sent right now send the unpublish - the destroy will happen in the subsequent callback */
1161         if (!publisher->sending) {
1162                 if (ast_sip_push_task(publisher->serializer, send_unpublish_task, ao2_bump(publisher))) {
1163                         ast_log(LOG_WARNING, "Could not send unpublish message on outbound publish '%s'\n",
1164                                 ast_sorcery_object_get_id(client->publish));
1165                         ao2_ref(publisher, -1);
1166                 }
1167         }
1168         publisher->destroy = 1;
1169         return 0;
1170 }
1171
1172 /*! \brief Destructor function for publish client */
1173 static void sip_outbound_publish_client_destroy(void *obj)
1174 {
1175         struct ast_sip_outbound_publish_client *client = obj;
1176
1177         ao2_cleanup(client->datastores);
1178
1179         /*
1180          * The client's publishers have already been unpublished and destroyed
1181          * by this point, so it is safe to finally remove the reference to the
1182          * publish object. The client needed to hold a reference to it until
1183          * the publishers were done with it.
1184          */
1185         ao2_cleanup(client->publish);
1186 }
1187
1188 /*! \brief Destructor function for publish state */
1189 static void sip_outbound_publish_state_destroy(void *obj)
1190 {
1191         struct ast_sip_outbound_publish_state *state = obj;
1192
1193         stop_publishing(state->client, NULL);
1194         /*
1195          * Since the state is being destroyed the associated client needs to also
1196          * be destroyed. However simply removing the reference to the client will
1197          * not initiate client destruction since the client's publisher(s) hold a
1198          * reference to the client object as well. So we need to unpublish the
1199          * the client's publishers here, which will remove the publisher's client
1200          * reference during that process.
1201          *
1202          * That being said we don't want to remove the client's reference to the
1203          * publish object just yet. We'll hold off on that until client destruction
1204          * itself. This is because the publishers need access to the client's
1205          * publish object while they are unpublishing.
1206          */
1207         ao2_callback(state->client->publishers, OBJ_NODATA | OBJ_UNLINK, cancel_and_unpublish, NULL);
1208         ao2_cleanup(state->client->publishers);
1209
1210         state->client->started = 0;
1211         ao2_cleanup(state->client);
1212 }
1213
1214 /*!
1215  * \internal
1216  * \brief Check if a publish can be reused
1217  *
1218  * This checks if the existing outbound publish's configuration differs from a newly-applied
1219  * outbound publish.
1220  *
1221  * \param existing The pre-existing outbound publish
1222  * \param applied The newly-created publish
1223  */
1224 static int can_reuse_publish(struct ast_sip_outbound_publish *existing, struct ast_sip_outbound_publish *applied)
1225 {
1226         int i;
1227
1228         if (strcmp(existing->server_uri, applied->server_uri) || strcmp(existing->from_uri, applied->from_uri) ||
1229                 strcmp(existing->to_uri, applied->to_uri) || strcmp(existing->outbound_proxy, applied->outbound_proxy) ||
1230                 strcmp(existing->event, applied->event) ||
1231                 AST_VECTOR_SIZE(&existing->outbound_auths) != AST_VECTOR_SIZE(&applied->outbound_auths)) {
1232                 return 0;
1233         }
1234
1235         for (i = 0; i < AST_VECTOR_SIZE(&existing->outbound_auths); ++i) {
1236                 if (strcmp(AST_VECTOR_GET(&existing->outbound_auths, i), AST_VECTOR_GET(&applied->outbound_auths, i))) {
1237                         return 0;
1238                 }
1239         }
1240
1241         return 1;
1242 }
1243
1244 /*! \brief Callback function for publish client responses */
1245 static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param)
1246 {
1247 #define DESTROY_CLIENT() do {                      \
1248         pjsip_publishc_destroy(publisher->client); \
1249         publisher->client = NULL; \
1250         ao2_ref(publisher, -1); } while (0)
1251
1252         RAII_VAR(struct sip_outbound_publisher *, publisher, ao2_bump(param->token), ao2_cleanup);
1253         RAII_VAR(struct ast_sip_outbound_publish *, publish, ao2_bump(publisher->owner->publish), ao2_cleanup);
1254         SCOPED_AO2LOCK(lock, publisher);
1255         pjsip_tx_data *tdata;
1256
1257         if (publisher->destroy) {
1258                 if (publisher->sending) {
1259                         publisher->sending = NULL;
1260
1261                         if (!ast_sip_push_task(publisher->serializer, send_unpublish_task, ao2_bump(publisher))) {
1262                                 return;
1263                         }
1264                         ast_log(LOG_WARNING, "Could not send unpublish message on outbound publish '%s'\n",
1265                                 ast_sorcery_object_get_id(publish));
1266                         ao2_ref(publisher, -1);
1267                 }
1268                 /* Once the destroy is called this callback will not get called any longer, so drop the publisher ref */
1269                 DESTROY_CLIENT();
1270                 return;
1271         }
1272
1273         if (param->code == 401 || param->code == 407) {
1274                 pjsip_transaction *tsx = pjsip_rdata_get_tsx(param->rdata);
1275
1276                 if (!ast_sip_create_request_with_auth(&publish->outbound_auths,
1277                                 param->rdata, tsx->last_tx, &tdata)) {
1278                         set_transport(publisher, tdata);
1279                         pjsip_publishc_send(publisher->client, tdata);
1280                 }
1281                 publisher->auth_attempts++;
1282
1283                 if (publisher->auth_attempts == publish->max_auth_attempts) {
1284                         DESTROY_CLIENT();
1285                         ast_log(LOG_ERROR, "Reached maximum number of PUBLISH authentication attempts on outbound publish '%s'\n",
1286                                 ast_sorcery_object_get_id(publish));
1287
1288                         goto end;
1289                 }
1290                 return;
1291         }
1292
1293         publisher->auth_attempts = 0;
1294
1295         if (param->code == 412) {
1296                 DESTROY_CLIENT();
1297                 if (sip_outbound_publisher_init(publisher)) {
1298                         ast_log(LOG_ERROR, "Failed to create a new outbound publish client for '%s' on 412 response\n",
1299                                 ast_sorcery_object_get_id(publish));
1300                         goto end;
1301                 }
1302
1303                 /* Setting this to NULL will cause a new PUBLISH to get created and sent for the same underlying body */
1304                 publisher->sending = NULL;
1305         } else if (param->code == 423) {
1306                 /* Update the expiration with the new expiration time if available */
1307                 pjsip_expires_hdr *expires;
1308
1309                 expires = pjsip_msg_find_hdr(param->rdata->msg_info.msg, PJSIP_H_MIN_EXPIRES, NULL);
1310                 if (!expires || !expires->ivalue) {
1311                         DESTROY_CLIENT();
1312                         ast_log(LOG_ERROR, "Received 423 response on outbound publish '%s' without a Min-Expires header\n",
1313                                 ast_sorcery_object_get_id(publish));
1314                         goto end;
1315                 }
1316
1317                 pjsip_publishc_update_expires(publisher->client, expires->ivalue);
1318                 publisher->sending = NULL;
1319         } else if (publisher->sending) {
1320                 /* Remove the message currently being sent so that when the queue is serviced another will get sent */
1321                 AST_LIST_REMOVE_HEAD(&publisher->queue, entry);
1322                 ast_free(publisher->sending);
1323                 publisher->sending = NULL;
1324                 if (!param->rdata) {
1325                         ast_log(LOG_NOTICE, "No response received for outbound publish '%s'\n",
1326                                 ast_sorcery_object_get_id(publish));
1327                 }
1328         }
1329
1330         if (AST_LIST_EMPTY(&publisher->queue)) {
1331                 schedule_publish_refresh(publisher, param->expiration);
1332         }
1333
1334 end:
1335         if (!publisher->client) {
1336                 struct sip_outbound_publish_message *message;
1337
1338                 while ((message = AST_LIST_REMOVE_HEAD(&publisher->queue, entry))) {
1339                         ast_free(message);
1340                 }
1341         } else {
1342                 if (ast_sip_push_task(publisher->serializer, sip_publisher_service_queue, ao2_bump(publisher))) {
1343                         ao2_ref(publisher, -1);
1344                 }
1345         }
1346 }
1347
1348 #define DATASTORE_BUCKETS 53
1349
1350 static int datastore_hash(const void *obj, int flags)
1351 {
1352         const struct ast_datastore *datastore;
1353         const char *uid;
1354
1355         switch (flags & OBJ_SEARCH_MASK) {
1356         case OBJ_SEARCH_KEY:
1357                 uid = obj;
1358                 break;
1359         case OBJ_SEARCH_OBJECT:
1360                 datastore = obj;
1361                 uid = datastore->uid;
1362                 break;
1363         default:
1364                 /* Hash can only work on something with a full key. */
1365                 ast_assert(0);
1366                 return 0;
1367         }
1368
1369         return ast_str_hash(uid);
1370 }
1371
1372 static int datastore_cmp(void *obj, void *arg, int flags)
1373 {
1374         const struct ast_datastore *object_left = obj;
1375         const struct ast_datastore *object_right = arg;
1376         const char *right_key = arg;
1377         int cmp;
1378
1379         switch (flags & OBJ_SEARCH_MASK) {
1380         case OBJ_SEARCH_OBJECT:
1381                 right_key = object_right->uid;
1382                 /* Fall through */
1383         case OBJ_SEARCH_KEY:
1384                 cmp = strcmp(object_left->uid, right_key);
1385                 break;
1386         case OBJ_SEARCH_PARTIAL_KEY:
1387         cmp = strncmp(object_left->uid, right_key, strlen(right_key));
1388                 break;
1389         default:
1390                 /*
1391                  * What arg points to is specific to this traversal callback
1392                  * and has no special meaning to astobj2.
1393                  */
1394                 cmp = 0;
1395                 break;
1396         }
1397         if (cmp) {
1398                 return 0;
1399         }
1400         /*
1401          * At this point the traversal callback is identical to a sorted
1402          * container.
1403          */
1404         return CMP_MATCH;
1405 }
1406
1407 /*! \brief Allocator function for publish client */
1408 static struct ast_sip_outbound_publish_state *sip_outbound_publish_state_alloc(
1409         struct ast_sip_outbound_publish *publish)
1410 {
1411         const char *id = ast_sorcery_object_get_id(publish);
1412         struct ast_sip_outbound_publish_state *state =
1413                 ao2_alloc(sizeof(*state) + strlen(id) + 1, sip_outbound_publish_state_destroy);
1414
1415         if (!state) {
1416                 return NULL;
1417         }
1418
1419         state->client = ao2_alloc(sizeof(*state->client), sip_outbound_publish_client_destroy);
1420         if (!state->client) {
1421                 ao2_ref(state, -1);
1422                 return NULL;
1423         }
1424
1425         state->client->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp);
1426         if (!state->client->datastores) {
1427                 ao2_ref(state, -1);
1428                 return NULL;
1429         }
1430
1431         state->client->publishers = ao2_container_alloc(DATASTORE_BUCKETS, sip_outbound_publisher_hash_fn,
1432                                                         sip_outbound_publisher_cmp_fn);
1433         if (!state->client->publishers) {
1434                 ao2_ref(state, -1);
1435                 return NULL;
1436         }
1437
1438         state->client->publish = ao2_bump(publish);
1439
1440         strcpy(state->id, id);
1441         return state;
1442 }
1443
1444 static int validate_publish_config(struct ast_sip_outbound_publish *publish)
1445 {
1446         if (ast_strlen_zero(publish->server_uri)) {
1447                 ast_log(LOG_ERROR, "No server URI specified on outbound publish '%s'\n",
1448                         ast_sorcery_object_get_id(publish));
1449                 return -1;
1450         } else if (ast_strlen_zero(publish->event)) {
1451                 ast_log(LOG_ERROR, "No event type specified for outbound publish '%s'\n",
1452                         ast_sorcery_object_get_id(publish));
1453                 return -1;
1454         }
1455         return 0;
1456 }
1457
1458 static int current_state_reusable(struct ast_sip_outbound_publish *publish,
1459                                   struct ast_sip_outbound_publish_state *current_state)
1460 {
1461         struct ast_sip_outbound_publish *old_publish;
1462
1463         /*
1464          * Don't maintain the old state/client objects if the multi_user option changed.
1465          */
1466         if ((!publish->multi_user && current_state->client->publish->multi_user) ||
1467             (publish->multi_user && !current_state->client->publish->multi_user)) {
1468                 return 0;
1469         }
1470
1471
1472         if (!can_reuse_publish(current_state->client->publish, publish)) {
1473                 /*
1474                  * Something significant has changed in the configuration, so we are
1475                  * unable to use the old state object. The current state needs to go
1476                  * away and a new one needs to be created.
1477                  */
1478                 return 0;
1479         }
1480
1481         /*
1482          * We can reuse the current state object so keep it, but swap out the
1483          * underlying publish object with the new one.
1484          */
1485         old_publish = current_state->client->publish;
1486         current_state->client->publish = publish;
1487         if (ast_sip_push_task_synchronous(
1488                     NULL, sip_outbound_publisher_reinit_all, current_state->client->publishers)) {
1489                 /*
1490                  * If the state object fails to re-initialize then swap
1491                  * the old publish info back in.
1492                  */
1493                 current_state->client->publish = publish;
1494                 ast_log(LOG_ERROR, "Unable to reinitialize client(s) for outbound publish '%s'\n",
1495                         ast_sorcery_object_get_id(current_state->client->publish));
1496                 return -1;
1497         }
1498
1499         /*
1500          * Since we swapped out the publish object the new one needs a ref
1501          * while the old one needs to go away.
1502          */
1503         ao2_ref(current_state->client->publish, +1);
1504         ao2_cleanup(old_publish);
1505
1506         /* Tell the caller that the current state object should be used */
1507         return 1;
1508 }
1509
1510 /*! \brief Apply function which finds or allocates a state structure */
1511 static int sip_outbound_publish_apply(const struct ast_sorcery *sorcery, void *obj)
1512 {
1513 #define ADD_TO_NEW_STATES(__obj) \
1514         do { if (__obj) { \
1515              ao2_link(new_states, __obj); \
1516              ao2_ref(__obj, -1); } } while (0)
1517
1518         struct ast_sip_outbound_publish *applied = obj;
1519         struct ast_sip_outbound_publish_state *current_state, *new_state;
1520         struct sip_outbound_publisher *publisher = NULL;
1521         int res;
1522
1523         /*
1524          * New states are being loaded or reloaded. We'll need to add the new
1525          * object if created/updated, or keep the old object if an error occurs.
1526          */
1527         if (!new_states) {
1528                 new_states = ao2_container_alloc_options(
1529                         AO2_ALLOC_OPT_LOCK_NOLOCK, DEFAULT_STATE_BUCKETS,
1530                         outbound_publish_state_hash, outbound_publish_state_cmp);
1531
1532                 if (!new_states) {
1533                         ast_log(LOG_ERROR, "Unable to allocate new states container\n");
1534                         return -1;
1535                 }
1536         }
1537
1538         /* If there is current state we'll want to maintain it if any errors occur */
1539         current_state = sip_publish_state_get(ast_sorcery_object_get_id(applied));
1540
1541         if ((res = validate_publish_config(applied))) {
1542                 ADD_TO_NEW_STATES(current_state);
1543                 return res;
1544         }
1545
1546         if (current_state && (res = current_state_reusable(applied, current_state))) {
1547                 /*
1548                  * The current state object was able to be reused, or an error
1549                  * occurred. Either way we keep the current state and be done.
1550                  */
1551                 ADD_TO_NEW_STATES(current_state);
1552                 return res == 1 ? 0 : -1;
1553         }
1554
1555         /*
1556          * No current state was found or it was unable to be reused. Either way
1557          * we'll need to create a new state object.
1558          */
1559         new_state = sip_outbound_publish_state_alloc(applied);
1560         if (!new_state) {
1561                 ast_log(LOG_ERROR, "Unable to create state for outbound publish '%s'\n",
1562                         ast_sorcery_object_get_id(applied));
1563                 ADD_TO_NEW_STATES(current_state);
1564                 return -1;
1565         };
1566
1567         if (!applied->multi_user &&
1568             !(publisher = sip_outbound_publish_client_add_publisher(new_state->client, NULL))) {
1569                 ADD_TO_NEW_STATES(current_state);
1570                 ao2_ref(new_state, -1);
1571                 return -1;
1572         }
1573         ao2_cleanup(publisher);
1574
1575         ADD_TO_NEW_STATES(new_state);
1576         ao2_cleanup(current_state);
1577         return res;
1578 }
1579
1580 static int outbound_auth_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
1581 {
1582         struct ast_sip_outbound_publish *publish = obj;
1583
1584         return ast_sip_auth_vector_init(&publish->outbound_auths, var->value);
1585 }
1586
1587
1588 static int unload_module(void)
1589 {
1590         int remaining;
1591
1592         ast_sorcery_object_unregister(ast_sip_get_sorcery(), "outbound-publish");
1593
1594         ao2_global_obj_release(current_states);
1595
1596         /* Wait for publication serializers to get destroyed. */
1597         ast_debug(2, "Waiting for publication to complete for unload.\n");
1598         remaining = ast_serializer_shutdown_group_join(shutdown_group, MAX_UNLOAD_TIMEOUT_TIME);
1599         if (remaining) {
1600                 ast_log(LOG_WARNING, "Unload incomplete.  Could not stop %d outbound publications.  Try again later.\n",
1601                         remaining);
1602                 return -1;
1603         }
1604
1605         ast_debug(2, "Successful shutdown.\n");
1606
1607         ao2_cleanup(shutdown_group);
1608         shutdown_group = NULL;
1609
1610         return 0;
1611 }
1612
1613 static int load_module(void)
1614 {
1615         CHECK_PJSIP_MODULE_LOADED();
1616
1617         /* As of pjproject 2.4.5, PJSIP_MAX_URL_SIZE isn't exposed yet but we try anyway. */
1618         ast_pjproject_get_buildopt("PJSIP_MAX_URL_SIZE", "%d", &pjsip_max_url_size);
1619
1620         shutdown_group = ast_serializer_shutdown_group_alloc();
1621         if (!shutdown_group) {
1622                 return AST_MODULE_LOAD_FAILURE;
1623         }
1624
1625         ast_sorcery_apply_config(ast_sip_get_sorcery(), "res_pjsip_outbound_publish");
1626         ast_sorcery_apply_default(ast_sip_get_sorcery(), "outbound-publish", "config", "pjsip.conf,criteria=type=outbound-publish");
1627
1628         if (ast_sorcery_object_register(ast_sip_get_sorcery(), "outbound-publish", sip_outbound_publish_alloc, NULL,
1629                 sip_outbound_publish_apply)) {
1630                 ast_log(LOG_ERROR, "Unable to register 'outbound-publish' type with sorcery\n");
1631                 unload_module();
1632                 return AST_MODULE_LOAD_DECLINE;
1633         }
1634
1635         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "type", "", OPT_NOOP_T, 0, 0);
1636         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "server_uri", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_outbound_publish, server_uri));
1637         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "from_uri", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_outbound_publish, from_uri));
1638         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "event", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_outbound_publish, event));
1639         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "to_uri", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_outbound_publish, to_uri));
1640         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "outbound_proxy", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_outbound_publish, outbound_proxy));
1641         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "expiration", "3600", OPT_UINT_T, 0, FLDSET(struct ast_sip_outbound_publish, expiration));
1642         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "max_auth_attempts", "5", OPT_UINT_T, 0, FLDSET(struct ast_sip_outbound_publish, max_auth_attempts));
1643         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "transport", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_outbound_publish, transport));
1644         ast_sorcery_object_field_register_custom(ast_sip_get_sorcery(), "outbound-publish", "outbound_auth", "", outbound_auth_handler, NULL, NULL, 0, 0);
1645         ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "multi_user", "no", OPT_BOOL_T, 1, FLDSET(struct ast_sip_outbound_publish, multi_user));
1646
1647         ast_sorcery_reload_object(ast_sip_get_sorcery(), "outbound-publish");
1648
1649         AST_RWLIST_RDLOCK(&publisher_handlers);
1650         sip_outbound_publish_synchronize(NULL);
1651         AST_RWLIST_UNLOCK(&publisher_handlers);
1652
1653         pjsip_publishc_init_module(ast_sip_get_pjsip_endpoint());
1654
1655         return AST_MODULE_LOAD_SUCCESS;
1656 }
1657
1658 static int reload_module(void)
1659 {
1660         ast_sorcery_reload_object(ast_sip_get_sorcery(), "outbound-publish");
1661
1662         AST_RWLIST_RDLOCK(&publisher_handlers);
1663         sip_outbound_publish_synchronize(NULL);
1664         AST_RWLIST_UNLOCK(&publisher_handlers);
1665         return 0;
1666 }
1667
1668 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS | AST_MODFLAG_LOAD_ORDER, "PJSIP Outbound Publish Support",
1669         .load = load_module,
1670         .reload = reload_module,
1671         .unload = unload_module,
1672         .load_pri = AST_MODPRI_CHANNEL_DEPEND,
1673 );