res_pjsip: Log IPv6 addresses correctly
[asterisk/asterisk.git] / res / res_pjsip / pjsip_distributor.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * Mark Michelson <mmichelson@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 #include "asterisk.h"
20
21 #include <pjsip.h>
22
23 #include "asterisk/res_pjsip.h"
24 #include "asterisk/acl.h"
25 #include "include/res_pjsip_private.h"
26 #include "asterisk/taskprocessor.h"
27 #include "asterisk/threadpool.h"
28 #include "asterisk/res_pjsip_cli.h"
29
30 static int distribute(void *data);
31 static pj_bool_t distributor(pjsip_rx_data *rdata);
32 static pj_status_t record_serializer(pjsip_tx_data *tdata);
33
34 static pjsip_module distributor_mod = {
35         .name = {"Request Distributor", 19},
36         .priority = PJSIP_MOD_PRIORITY_TSX_LAYER - 6,
37         .on_tx_request = record_serializer,
38         .on_rx_request = distributor,
39         .on_rx_response = distributor,
40 };
41
42 struct ast_sched_context *prune_context;
43
44 /* From the auth/realm realtime column size */
45 #define MAX_REALM_LENGTH 40
46
47 #define DEFAULT_SUSPECTS_BUCKETS 53
48
49 static struct ao2_container *unidentified_requests;
50 static unsigned int unidentified_count;
51 static unsigned int unidentified_period;
52 static unsigned int unidentified_prune_interval;
53 static int using_auth_username;
54
55 struct unidentified_request{
56         struct timeval first_seen;
57         int count;
58         char src_name[];
59 };
60
61 /*! Number of serializers in pool if one not otherwise known.  (Best if prime number) */
62 #define DISTRIBUTOR_POOL_SIZE           31
63
64 /*! Pool of serializers to use if not supplied. */
65 static struct ast_taskprocessor *distributor_pool[DISTRIBUTOR_POOL_SIZE];
66
67 /*!
68  * \internal
69  * \brief Record the task's serializer name on the tdata structure.
70  * \since 14.0.0
71  *
72  * \param tdata The outgoing message.
73  *
74  * \retval PJ_SUCCESS.
75  */
76 static pj_status_t record_serializer(pjsip_tx_data *tdata)
77 {
78         struct ast_taskprocessor *serializer;
79
80         serializer = ast_threadpool_serializer_get_current();
81         if (serializer) {
82                 const char *name;
83
84                 name = ast_taskprocessor_name(serializer);
85                 if (!ast_strlen_zero(name)
86                         && (!tdata->mod_data[distributor_mod.id]
87                                 || strcmp(tdata->mod_data[distributor_mod.id], name))) {
88                         char *tdata_name;
89
90                         /* The serializer in use changed. */
91                         tdata_name = pj_pool_alloc(tdata->pool, strlen(name) + 1);
92                         strcpy(tdata_name, name);/* Safe */
93
94                         tdata->mod_data[distributor_mod.id] = tdata_name;
95                 }
96         }
97
98         return PJ_SUCCESS;
99 }
100
101 /*!
102  * \internal
103  * \brief Find the request tdata to get the serializer it used.
104  * \since 14.0.0
105  *
106  * \param rdata The incoming message.
107  *
108  * \retval serializer on success.
109  * \retval NULL on error or could not find the serializer.
110  */
111 static struct ast_taskprocessor *find_request_serializer(pjsip_rx_data *rdata)
112 {
113         struct ast_taskprocessor *serializer = NULL;
114         pj_str_t tsx_key;
115         pjsip_transaction *tsx;
116
117         pjsip_tsx_create_key(rdata->tp_info.pool, &tsx_key, PJSIP_ROLE_UAC,
118                 &rdata->msg_info.cseq->method, rdata);
119
120         tsx = pjsip_tsx_layer_find_tsx(&tsx_key, PJ_TRUE);
121         if (!tsx) {
122                 ast_debug(1, "Could not find transaction for %s.\n",
123                         pjsip_rx_data_get_info(rdata));
124                 return NULL;
125         }
126         ast_debug(3, "Found transaction %s for %s.\n",
127                 tsx->obj_name, pjsip_rx_data_get_info(rdata));
128
129         if (tsx->last_tx) {
130                 const char *serializer_name;
131
132                 serializer_name = tsx->last_tx->mod_data[distributor_mod.id];
133                 if (!ast_strlen_zero(serializer_name)) {
134                         serializer = ast_taskprocessor_get(serializer_name, TPS_REF_IF_EXISTS);
135                         if (serializer) {
136                                 ast_debug(3, "Found serializer %s on transaction %s\n",
137                                                 serializer_name, tsx->obj_name);
138                         }
139                 }
140         }
141
142 #ifdef HAVE_PJ_TRANSACTION_GRP_LOCK
143         pj_grp_lock_release(tsx->grp_lock);
144 #else
145         pj_mutex_unlock(tsx->mutex);
146 #endif
147
148         return serializer;
149 }
150
151 /*! Dialog-specific information the distributor uses */
152 struct distributor_dialog_data {
153         /*! dialog_associations ao2 container key */
154         pjsip_dialog *dlg;
155         /*! Serializer to distribute tasks to for this dialog */
156         struct ast_taskprocessor *serializer;
157         /*! Endpoint associated with this dialog */
158         struct ast_sip_endpoint *endpoint;
159 };
160
161 #define DIALOG_ASSOCIATIONS_BUCKETS 251
162
163 static struct ao2_container *dialog_associations;
164
165 /*!
166  * \internal
167  * \brief Compute a hash value on an arbitrary buffer.
168  * \since 13.17.0
169  *
170  * \param[in] pos The buffer to add to the hash
171  * \param[in] len The buffer length to add to the hash
172  * \param[in] hash The hash value to add to
173  *
174  * \details
175  * This version of the function is for when you need to compute a
176  * hash of more than one buffer.
177  *
178  * This famous hash algorithm was written by Dan Bernstein and is
179  * commonly used.
180  *
181  * \sa http://www.cse.yorku.ca/~oz/hash.html
182  */
183 static int buf_hash_add(const char *pos, size_t len, int hash)
184 {
185         while (len--) {
186                 hash = hash * 33 ^ *pos++;
187         }
188
189         return hash;
190 }
191
192 /*!
193  * \internal
194  * \brief Compute a hash value on an arbitrary buffer.
195  * \since 13.17.0
196  *
197  * \param[in] pos The buffer to add to the hash
198  * \param[in] len The buffer length to add to the hash
199  *
200  * \details
201  * This version of the function is for when you need to compute a
202  * hash of more than one buffer.
203  *
204  * This famous hash algorithm was written by Dan Bernstein and is
205  * commonly used.
206  *
207  * \sa http://www.cse.yorku.ca/~oz/hash.html
208  */
209 static int buf_hash(const char *pos, size_t len)
210 {
211         return buf_hash_add(pos, len, 5381);
212 }
213
214 static int dialog_associations_hash(const void *obj, int flags)
215 {
216         const struct distributor_dialog_data *object;
217         union {
218                 const pjsip_dialog *dlg;
219                 const char buf[sizeof(pjsip_dialog *)];
220         } key;
221
222         switch (flags & OBJ_SEARCH_MASK) {
223         case OBJ_SEARCH_KEY:
224                 key.dlg = obj;
225                 break;
226         case OBJ_SEARCH_OBJECT:
227                 object = obj;
228                 key.dlg = object->dlg;
229                 break;
230         default:
231                 /* Hash can only work on something with a full key. */
232                 ast_assert(0);
233                 return 0;
234         }
235         return ast_str_hash_restrict(buf_hash(key.buf, sizeof(key.buf)));
236 }
237
238 static int dialog_associations_cmp(void *obj, void *arg, int flags)
239 {
240         const struct distributor_dialog_data *object_left = obj;
241         const struct distributor_dialog_data *object_right = arg;
242         const pjsip_dialog *right_key = arg;
243         int cmp = 0;
244
245         switch (flags & OBJ_SEARCH_MASK) {
246         case OBJ_SEARCH_OBJECT:
247                 right_key = object_right->dlg;
248                 /* Fall through */
249         case OBJ_SEARCH_KEY:
250                 if (object_left->dlg == right_key) {
251                         cmp = CMP_MATCH;
252                 }
253                 break;
254         case OBJ_SEARCH_PARTIAL_KEY:
255                 /* There is no such thing for this container. */
256                 ast_assert(0);
257                 break;
258         default:
259                 cmp = 0;
260                 break;
261         }
262         return cmp;
263 }
264
265 void ast_sip_dialog_set_serializer(pjsip_dialog *dlg, struct ast_taskprocessor *serializer)
266 {
267         struct distributor_dialog_data *dist;
268
269         ao2_wrlock(dialog_associations);
270         dist = ao2_find(dialog_associations, dlg, OBJ_SEARCH_KEY | OBJ_NOLOCK);
271         if (!dist) {
272                 if (serializer) {
273                         dist = ao2_alloc(sizeof(*dist), NULL);
274                         if (dist) {
275                                 dist->dlg = dlg;
276                                 dist->serializer = serializer;
277                                 ao2_link_flags(dialog_associations, dist, OBJ_NOLOCK);
278                                 ao2_ref(dist, -1);
279                         }
280                 }
281         } else {
282                 ao2_lock(dist);
283                 dist->serializer = serializer;
284                 if (!dist->serializer && !dist->endpoint) {
285                         ao2_unlink_flags(dialog_associations, dist, OBJ_NOLOCK);
286                 }
287                 ao2_unlock(dist);
288                 ao2_ref(dist, -1);
289         }
290         ao2_unlock(dialog_associations);
291 }
292
293 void ast_sip_dialog_set_endpoint(pjsip_dialog *dlg, struct ast_sip_endpoint *endpoint)
294 {
295         struct distributor_dialog_data *dist;
296
297         ao2_wrlock(dialog_associations);
298         dist = ao2_find(dialog_associations, dlg, OBJ_SEARCH_KEY | OBJ_NOLOCK);
299         if (!dist) {
300                 if (endpoint) {
301                         dist = ao2_alloc(sizeof(*dist), NULL);
302                         if (dist) {
303                                 dist->dlg = dlg;
304                                 dist->endpoint = endpoint;
305                                 ao2_link_flags(dialog_associations, dist, OBJ_NOLOCK);
306                                 ao2_ref(dist, -1);
307                         }
308                 }
309         } else {
310                 ao2_lock(dist);
311                 dist->endpoint = endpoint;
312                 if (!dist->serializer && !dist->endpoint) {
313                         ao2_unlink_flags(dialog_associations, dist, OBJ_NOLOCK);
314                 }
315                 ao2_unlock(dist);
316                 ao2_ref(dist, -1);
317         }
318         ao2_unlock(dialog_associations);
319 }
320
321 struct ast_sip_endpoint *ast_sip_dialog_get_endpoint(pjsip_dialog *dlg)
322 {
323         struct distributor_dialog_data *dist;
324         struct ast_sip_endpoint *endpoint;
325
326         dist = ao2_find(dialog_associations, dlg, OBJ_SEARCH_KEY);
327         if (dist) {
328                 ao2_lock(dist);
329                 endpoint = ao2_bump(dist->endpoint);
330                 ao2_unlock(dist);
331                 ao2_ref(dist, -1);
332         } else {
333                 endpoint = NULL;
334         }
335         return endpoint;
336 }
337
338 static pjsip_dialog *find_dialog(pjsip_rx_data *rdata)
339 {
340         pj_str_t tsx_key;
341         pjsip_transaction *tsx;
342         pjsip_dialog *dlg;
343         pj_str_t *local_tag;
344         pj_str_t *remote_tag;
345
346         if (!rdata->msg_info.msg) {
347                 return NULL;
348         }
349
350         if (rdata->msg_info.msg->type == PJSIP_REQUEST_MSG) {
351                 local_tag = &rdata->msg_info.to->tag;
352                 remote_tag = &rdata->msg_info.from->tag;
353         } else {
354                 local_tag = &rdata->msg_info.from->tag;
355                 remote_tag = &rdata->msg_info.to->tag;
356         }
357
358         /* We can only call the convenient method for
359          *  1) responses
360          *  2) non-CANCEL requests
361          *  3) CANCEL requests with a to-tag
362          */
363         if (rdata->msg_info.msg->type == PJSIP_RESPONSE_MSG ||
364                         pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_cancel_method) ||
365                         rdata->msg_info.to->tag.slen != 0) {
366                 dlg = pjsip_ua_find_dialog(&rdata->msg_info.cid->id, local_tag,
367                                 remote_tag, PJ_FALSE);
368                 if (dlg) {
369                         return dlg;
370                 }
371         }
372
373         /*
374          * There may still be a matching dialog if this is
375          * 1) an incoming CANCEL request without a to-tag
376          * 2) an incoming response to a dialog-creating request.
377          */
378         if (rdata->msg_info.msg->type == PJSIP_REQUEST_MSG) {
379                 /* CANCEL requests will need to match the INVITE we initially received. Any
380                  * other request type will either have been matched already or is not in
381                  * dialog
382                  */
383                 pjsip_tsx_create_key(rdata->tp_info.pool, &tsx_key, PJSIP_ROLE_UAS,
384                                 pjsip_get_invite_method(), rdata);
385         } else {
386                 pjsip_tsx_create_key(rdata->tp_info.pool, &tsx_key, PJSIP_ROLE_UAC,
387                                 &rdata->msg_info.cseq->method, rdata);
388         }
389
390         tsx = pjsip_tsx_layer_find_tsx(&tsx_key, PJ_TRUE);
391         if (!tsx) {
392                 ast_debug(3, "Could not find matching transaction for %s\n",
393                         pjsip_rx_data_get_info(rdata));
394                 return NULL;
395         }
396
397         dlg = pjsip_tsx_get_dlg(tsx);
398
399 #ifdef HAVE_PJ_TRANSACTION_GRP_LOCK
400         pj_grp_lock_release(tsx->grp_lock);
401 #else
402         pj_mutex_unlock(tsx->mutex);
403 #endif
404
405         return dlg;
406 }
407
408 /*!
409  * \internal
410  * \brief Compute a hash value on a pjlib string
411  * \since 13.10.0
412  *
413  * \param[in] str The pjlib string to add to the hash
414  * \param[in] hash The hash value to add to
415  *
416  * \details
417  * This version of the function is for when you need to compute a
418  * string hash of more than one string.
419  *
420  * This famous hash algorithm was written by Dan Bernstein and is
421  * commonly used.
422  *
423  * \sa http://www.cse.yorku.ca/~oz/hash.html
424  */
425 static int pjstr_hash_add(pj_str_t *str, int hash)
426 {
427         return buf_hash_add(pj_strbuf(str), pj_strlen(str), hash);
428 }
429
430 /*!
431  * \internal
432  * \brief Compute a hash value on a pjlib string
433  * \since 13.10.0
434  *
435  * \param[in] str The pjlib string to hash
436  *
437  * This famous hash algorithm was written by Dan Bernstein and is
438  * commonly used.
439  *
440  * http://www.cse.yorku.ca/~oz/hash.html
441  */
442 static int pjstr_hash(pj_str_t *str)
443 {
444         return pjstr_hash_add(str, 5381);
445 }
446
447 struct ast_taskprocessor *ast_sip_get_distributor_serializer(pjsip_rx_data *rdata)
448 {
449         int hash;
450         pj_str_t *remote_tag;
451         struct ast_taskprocessor *serializer;
452
453         if (!rdata->msg_info.msg) {
454                 return NULL;
455         }
456
457         if (rdata->msg_info.msg->type == PJSIP_REQUEST_MSG) {
458                 remote_tag = &rdata->msg_info.from->tag;
459         } else {
460                 remote_tag = &rdata->msg_info.to->tag;
461         }
462
463         /* Compute the hash from the SIP message call-id and remote-tag */
464         hash = pjstr_hash(&rdata->msg_info.cid->id);
465         hash = pjstr_hash_add(remote_tag, hash);
466         hash = ast_str_hash_restrict(hash);
467
468         serializer = ao2_bump(distributor_pool[hash % ARRAY_LEN(distributor_pool)]);
469         if (serializer) {
470                 ast_debug(3, "Calculated serializer %s to use for %s\n",
471                         ast_taskprocessor_name(serializer), pjsip_rx_data_get_info(rdata));
472         }
473         return serializer;
474 }
475
476 static pj_bool_t endpoint_lookup(pjsip_rx_data *rdata);
477
478 static pjsip_module endpoint_mod = {
479         .name = {"Endpoint Identifier", 19},
480         .priority = PJSIP_MOD_PRIORITY_TSX_LAYER - 3,
481         .on_rx_request = endpoint_lookup,
482 };
483
484 static pj_bool_t distributor(pjsip_rx_data *rdata)
485 {
486         pjsip_dialog *dlg;
487         struct distributor_dialog_data *dist = NULL;
488         struct ast_taskprocessor *serializer = NULL;
489         pjsip_rx_data *clone;
490
491         if (!ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) {
492                 /*
493                  * Ignore everything until we are fully booted.  Let the
494                  * peer retransmit messages until we are ready.
495                  */
496                 return PJ_TRUE;
497         }
498
499         dlg = find_dialog(rdata);
500         if (dlg) {
501                 ast_debug(3, "Searching for serializer associated with dialog %s for %s\n",
502                         dlg->obj_name, pjsip_rx_data_get_info(rdata));
503                 dist = ao2_find(dialog_associations, dlg, OBJ_SEARCH_KEY);
504                 if (dist) {
505                         ao2_lock(dist);
506                         serializer = ao2_bump(dist->serializer);
507                         ao2_unlock(dist);
508                         if (serializer) {
509                                 ast_debug(3, "Found serializer %s associated with dialog %s\n",
510                                         ast_taskprocessor_name(serializer), dlg->obj_name);
511                         }
512                 }
513         }
514
515         if (serializer) {
516                 /* We have a serializer so we know where to send the message. */
517         } else if (rdata->msg_info.msg->type == PJSIP_RESPONSE_MSG) {
518                 ast_debug(3, "No dialog serializer for %s.  Using request transaction as basis.\n",
519                         pjsip_rx_data_get_info(rdata));
520                 serializer = find_request_serializer(rdata);
521                 if (!serializer) {
522                         /*
523                          * Pick a serializer for the unmatched response.
524                          * We couldn't determine what serializer originally
525                          * sent the request or the serializer is gone.
526                          */
527                         serializer = ast_sip_get_distributor_serializer(rdata);
528                 }
529         } else if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_cancel_method)
530                 || !pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_bye_method)) {
531                 /* We have a BYE or CANCEL request without a serializer. */
532                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata,
533                         PJSIP_SC_CALL_TSX_DOES_NOT_EXIST, NULL, NULL, NULL);
534                 ao2_cleanup(dist);
535                 return PJ_TRUE;
536         } else {
537                 if (ast_taskprocessor_alert_get()) {
538                         /*
539                          * When taskprocessors get backed up, there is a good chance that
540                          * we are being overloaded and need to defer adding new work to
541                          * the system.  To defer the work we will ignore the request and
542                          * rely on the peer's transport layer to retransmit the message.
543                          * We usually work off the overload within a few seconds.  The
544                          * alternative is to send back a 503 response to these requests
545                          * and be done with it.
546                          */
547                         ast_debug(3, "Taskprocessor overload alert: Ignoring '%s'.\n",
548                                 pjsip_rx_data_get_info(rdata));
549                         ao2_cleanup(dist);
550                         return PJ_TRUE;
551                 }
552
553                 /* Pick a serializer for the out-of-dialog request. */
554                 serializer = ast_sip_get_distributor_serializer(rdata);
555         }
556
557         if (pjsip_rx_data_clone(rdata, 0, &clone) != PJ_SUCCESS) {
558                 ast_taskprocessor_unreference(serializer);
559                 ao2_cleanup(dist);
560                 return PJ_TRUE;
561         }
562
563         if (dist) {
564                 ao2_lock(dist);
565                 clone->endpt_info.mod_data[endpoint_mod.id] = ao2_bump(dist->endpoint);
566                 ao2_unlock(dist);
567                 ao2_cleanup(dist);
568         }
569
570         if (ast_sip_push_task(serializer, distribute, clone)) {
571                 ao2_cleanup(clone->endpt_info.mod_data[endpoint_mod.id]);
572                 pjsip_rx_data_free_cloned(clone);
573         }
574
575         ast_taskprocessor_unreference(serializer);
576
577         return PJ_TRUE;
578 }
579
580 static struct ast_sip_auth *alloc_artificial_auth(char *default_realm)
581 {
582         struct ast_sip_auth *fake_auth;
583
584         fake_auth = ast_sorcery_alloc(ast_sip_get_sorcery(), SIP_SORCERY_AUTH_TYPE,
585                 "artificial");
586         if (!fake_auth) {
587                 return NULL;
588         }
589
590         ast_string_field_set(fake_auth, realm, default_realm);
591         ast_string_field_set(fake_auth, auth_user, "");
592         ast_string_field_set(fake_auth, auth_pass, "");
593         fake_auth->type = AST_SIP_AUTH_TYPE_ARTIFICIAL;
594
595         return fake_auth;
596 }
597
598 static AO2_GLOBAL_OBJ_STATIC(artificial_auth);
599
600 static int create_artificial_auth(void)
601 {
602         char default_realm[MAX_REALM_LENGTH + 1];
603         struct ast_sip_auth *fake_auth;
604
605         ast_sip_get_default_realm(default_realm, sizeof(default_realm));
606         fake_auth = alloc_artificial_auth(default_realm);
607         if (!fake_auth) {
608                 ast_log(LOG_ERROR, "Unable to create artificial auth\n");
609                 return -1;
610         }
611
612         ao2_global_obj_replace_unref(artificial_auth, fake_auth);
613         ao2_ref(fake_auth, -1);
614         return 0;
615 }
616
617 struct ast_sip_auth *ast_sip_get_artificial_auth(void)
618 {
619         return ao2_global_obj_ref(artificial_auth);
620 }
621
622 static struct ast_sip_endpoint *artificial_endpoint = NULL;
623
624 static int create_artificial_endpoint(void)
625 {
626         artificial_endpoint = ast_sorcery_alloc(ast_sip_get_sorcery(), "endpoint", NULL);
627         if (!artificial_endpoint) {
628                 return -1;
629         }
630
631         AST_VECTOR_INIT(&artificial_endpoint->inbound_auths, 1);
632         /* Pushing a bogus value into the vector will ensure that
633          * the proper size of the vector is returned. This value is
634          * not actually used anywhere
635          */
636         AST_VECTOR_APPEND(&artificial_endpoint->inbound_auths, ast_strdup("artificial-auth"));
637         return 0;
638 }
639
640 struct ast_sip_endpoint *ast_sip_get_artificial_endpoint(void)
641 {
642         ao2_ref(artificial_endpoint, +1);
643         return artificial_endpoint;
644 }
645
646 static void log_failed_request(pjsip_rx_data *rdata, char *msg, unsigned int count, unsigned int period)
647 {
648         char from_buf[PJSIP_MAX_URL_SIZE];
649         char callid_buf[PJSIP_MAX_URL_SIZE];
650         char method_buf[PJSIP_MAX_URL_SIZE];
651         char src_addr_buf[AST_SOCKADDR_BUFLEN];
652         pjsip_uri_print(PJSIP_URI_IN_FROMTO_HDR, rdata->msg_info.from->uri, from_buf, PJSIP_MAX_URL_SIZE);
653         ast_copy_pj_str(callid_buf, &rdata->msg_info.cid->id, PJSIP_MAX_URL_SIZE);
654         ast_copy_pj_str(method_buf, &rdata->msg_info.msg->line.req.method.name, PJSIP_MAX_URL_SIZE);
655         if (count) {
656                 ast_log(LOG_NOTICE, "Request '%s' from '%s' failed for '%s' (callid: %s) - %s"
657                         " after %u tries in %.3f ms\n",
658                         method_buf, from_buf,
659                         pj_sockaddr_print(&rdata->pkt_info.src_addr, src_addr_buf, sizeof(src_addr_buf), 3),
660                         callid_buf, msg, count, period / 1000.0);
661         } else {
662                 ast_log(LOG_NOTICE, "Request '%s' from '%s' failed for '%s' (callid: %s) - %s\n",
663                         method_buf, from_buf,
664                         pj_sockaddr_print(&rdata->pkt_info.src_addr, src_addr_buf, sizeof(src_addr_buf), 3),
665                         callid_buf, msg);
666         }
667 }
668
669 static void check_endpoint(pjsip_rx_data *rdata, struct unidentified_request *unid,
670         const char *name)
671 {
672         int64_t ms = ast_tvdiff_ms(ast_tvnow(), unid->first_seen);
673
674         ao2_wrlock(unid);
675         unid->count++;
676
677         if (ms < (unidentified_period * 1000) && unid->count >= unidentified_count) {
678                 log_failed_request(rdata, "No matching endpoint found", unid->count, ms);
679                 ast_sip_report_invalid_endpoint(name, rdata);
680         }
681         ao2_unlock(unid);
682 }
683
684 static int apply_endpoint_acl(pjsip_rx_data *rdata, struct ast_sip_endpoint *endpoint);
685 static int apply_endpoint_contact_acl(pjsip_rx_data *rdata, struct ast_sip_endpoint *endpoint);
686
687 static void apply_acls(pjsip_rx_data *rdata)
688 {
689         struct ast_sip_endpoint *endpoint;
690
691         /* Is the endpoint allowed with the source or contact address? */
692         endpoint = rdata->endpt_info.mod_data[endpoint_mod.id];
693         if (endpoint != artificial_endpoint
694                 && (apply_endpoint_acl(rdata, endpoint)
695                         || apply_endpoint_contact_acl(rdata, endpoint))) {
696                 ast_debug(1, "Endpoint '%s' not allowed by ACL\n",
697                         ast_sorcery_object_get_id(endpoint));
698
699                 /* Replace the rdata endpoint with the artificial endpoint. */
700                 ao2_replace(rdata->endpt_info.mod_data[endpoint_mod.id], artificial_endpoint);
701         }
702 }
703
704 static pj_bool_t endpoint_lookup(pjsip_rx_data *rdata)
705 {
706         struct ast_sip_endpoint *endpoint;
707         struct unidentified_request *unid;
708         int is_ack = rdata->msg_info.msg->line.req.method.id == PJSIP_ACK_METHOD;
709
710         endpoint = rdata->endpt_info.mod_data[endpoint_mod.id];
711         if (endpoint) {
712                 /*
713                  * ao2_find with OBJ_UNLINK always write locks the container before even searching
714                  * for the object.  Since the majority case is that the object won't be found, do
715                  * the find without OBJ_UNLINK to prevent the unnecessary write lock, then unlink
716                  * if needed.
717                  */
718                 unid = ao2_find(unidentified_requests, rdata->pkt_info.src_name, OBJ_SEARCH_KEY);
719                 if (unid) {
720                         ao2_unlink(unidentified_requests, unid);
721                         ao2_ref(unid, -1);
722                 }
723                 apply_acls(rdata);
724                 return PJ_FALSE;
725         }
726
727         endpoint = ast_sip_identify_endpoint(rdata);
728         if (endpoint) {
729                 unid = ao2_find(unidentified_requests, rdata->pkt_info.src_name, OBJ_SEARCH_KEY);
730                 if (unid) {
731                         ao2_unlink(unidentified_requests, unid);
732                         ao2_ref(unid, -1);
733                 }
734         }
735
736         if (!endpoint) {
737                 /* always use an artificial endpoint - per discussion no reason
738                    to have "alwaysauthreject" as an option.  It is felt using it
739                    was a bug fix and it is not needed since we are not worried about
740                    breaking old stuff and we really don't want to enable the discovery
741                    of SIP accounts */
742                 endpoint = ast_sip_get_artificial_endpoint();
743         }
744
745         /* endpoint ref held by mod_data[] */
746         rdata->endpt_info.mod_data[endpoint_mod.id] = endpoint;
747
748         if (endpoint == artificial_endpoint && !is_ack) {
749                 char name[AST_UUID_STR_LEN] = "";
750                 pjsip_uri *from = rdata->msg_info.from->uri;
751
752                 if (PJSIP_URI_SCHEME_IS_SIP(from) || PJSIP_URI_SCHEME_IS_SIPS(from)) {
753                         pjsip_sip_uri *sip_from = pjsip_uri_get_uri(from);
754                         ast_copy_pj_str(name, &sip_from->user, sizeof(name));
755                 }
756
757                 unid = ao2_find(unidentified_requests, rdata->pkt_info.src_name, OBJ_SEARCH_KEY);
758                 if (unid) {
759                         check_endpoint(rdata, unid, name);
760                         ao2_ref(unid, -1);
761                 } else if (using_auth_username) {
762                         ao2_wrlock(unidentified_requests);
763                         /* Checking again with the write lock held allows us to eliminate the DUPS_REPLACE and sort_fn */
764                         unid = ao2_find(unidentified_requests, rdata->pkt_info.src_name,
765                                 OBJ_SEARCH_KEY | OBJ_NOLOCK);
766                         if (unid) {
767                                 check_endpoint(rdata, unid, name);
768                         } else {
769                                 unid = ao2_alloc_options(sizeof(*unid) + strlen(rdata->pkt_info.src_name) + 1,
770                                         NULL, AO2_ALLOC_OPT_LOCK_RWLOCK);
771                                 if (!unid) {
772                                         ao2_unlock(unidentified_requests);
773                                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
774                                         return PJ_TRUE;
775                                 }
776                                 strcpy(unid->src_name, rdata->pkt_info.src_name); /* Safe */
777                                 unid->first_seen = ast_tvnow();
778                                 unid->count = 1;
779                                 ao2_link_flags(unidentified_requests, unid, OBJ_NOLOCK);
780                         }
781                         ao2_ref(unid, -1);
782                         ao2_unlock(unidentified_requests);
783                 } else {
784                         log_failed_request(rdata, "No matching endpoint found", 0, 0);
785                         ast_sip_report_invalid_endpoint(name, rdata);
786                 }
787         }
788
789         apply_acls(rdata);
790         return PJ_FALSE;
791 }
792
793 static int apply_endpoint_acl(pjsip_rx_data *rdata, struct ast_sip_endpoint *endpoint)
794 {
795         struct ast_sockaddr addr;
796
797         if (ast_acl_list_is_empty(endpoint->acl)) {
798                 return 0;
799         }
800
801         memset(&addr, 0, sizeof(addr));
802         ast_sockaddr_parse(&addr, rdata->pkt_info.src_name, PARSE_PORT_FORBID);
803         ast_sockaddr_set_port(&addr, rdata->pkt_info.src_port);
804
805         if (ast_apply_acl(endpoint->acl, &addr, "SIP ACL: ") != AST_SENSE_ALLOW) {
806                 log_failed_request(rdata, "Not match Endpoint ACL", 0, 0);
807                 ast_sip_report_failed_acl(endpoint, rdata, "not_match_endpoint_acl");
808                 return 1;
809         }
810         return 0;
811 }
812
813 static int extract_contact_addr(pjsip_contact_hdr *contact, struct ast_sockaddr **addrs)
814 {
815         pjsip_sip_uri *sip_uri;
816         char host[256];
817
818         if (!contact || contact->star) {
819                 *addrs = NULL;
820                 return 0;
821         }
822         if (!PJSIP_URI_SCHEME_IS_SIP(contact->uri) && !PJSIP_URI_SCHEME_IS_SIPS(contact->uri)) {
823                 *addrs = NULL;
824                 return 0;
825         }
826         sip_uri = pjsip_uri_get_uri(contact->uri);
827         ast_copy_pj_str(host, &sip_uri->host, sizeof(host));
828         return ast_sockaddr_resolve(addrs, host, PARSE_PORT_FORBID, AST_AF_UNSPEC);
829 }
830
831 static int apply_endpoint_contact_acl(pjsip_rx_data *rdata, struct ast_sip_endpoint *endpoint)
832 {
833         int num_contact_addrs;
834         int forbidden = 0;
835         struct ast_sockaddr *contact_addrs;
836         int i;
837         pjsip_contact_hdr *contact = (pjsip_contact_hdr *)&rdata->msg_info.msg->hdr;
838
839         if (ast_acl_list_is_empty(endpoint->contact_acl)) {
840                 return 0;
841         }
842
843         while ((contact = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_CONTACT, contact->next))) {
844                 num_contact_addrs = extract_contact_addr(contact, &contact_addrs);
845                 if (num_contact_addrs <= 0) {
846                         continue;
847                 }
848                 for (i = 0; i < num_contact_addrs; ++i) {
849                         if (ast_apply_acl(endpoint->contact_acl, &contact_addrs[i], "SIP Contact ACL: ") != AST_SENSE_ALLOW) {
850                                 log_failed_request(rdata, "Not match Endpoint Contact ACL", 0, 0);
851                                 ast_sip_report_failed_acl(endpoint, rdata, "not_match_endpoint_contact_acl");
852                                 forbidden = 1;
853                                 break;
854                         }
855                 }
856                 ast_free(contact_addrs);
857                 if (forbidden) {
858                         /* No use checking other contacts if we already have failed ACL check */
859                         break;
860                 }
861         }
862
863         return forbidden;
864 }
865
866 static pj_bool_t authenticate(pjsip_rx_data *rdata)
867 {
868         RAII_VAR(struct ast_sip_endpoint *, endpoint, ast_pjsip_rdata_get_endpoint(rdata), ao2_cleanup);
869         int is_ack = rdata->msg_info.msg->line.req.method.id == PJSIP_ACK_METHOD;
870
871         ast_assert(endpoint != NULL);
872
873         if (is_ack) {
874                 return PJ_FALSE;
875         }
876
877         if (ast_sip_requires_authentication(endpoint, rdata)) {
878                 pjsip_tx_data *tdata;
879                 struct unidentified_request *unid;
880
881                 pjsip_endpt_create_response(ast_sip_get_pjsip_endpoint(), rdata, 401, NULL, &tdata);
882                 switch (ast_sip_check_authentication(endpoint, rdata, tdata)) {
883                 case AST_SIP_AUTHENTICATION_CHALLENGE:
884                         /* Send the 401 we created for them */
885                         ast_sip_report_auth_challenge_sent(endpoint, rdata, tdata);
886                         if (pjsip_endpt_send_response2(ast_sip_get_pjsip_endpoint(), rdata, tdata, NULL, NULL) != PJ_SUCCESS) {
887                                 pjsip_tx_data_dec_ref(tdata);
888                         }
889                         return PJ_TRUE;
890                 case AST_SIP_AUTHENTICATION_SUCCESS:
891                         /* See note in endpoint_lookup about not holding an unnecessary write lock */
892                         unid = ao2_find(unidentified_requests, rdata->pkt_info.src_name, OBJ_SEARCH_KEY);
893                         if (unid) {
894                                 ao2_unlink(unidentified_requests, unid);
895                                 ao2_ref(unid, -1);
896                         }
897                         ast_sip_report_auth_success(endpoint, rdata);
898                         break;
899                 case AST_SIP_AUTHENTICATION_FAILED:
900                         log_failed_request(rdata, "Failed to authenticate", 0, 0);
901                         ast_sip_report_auth_failed_challenge_response(endpoint, rdata);
902                         if (pjsip_endpt_send_response2(ast_sip_get_pjsip_endpoint(), rdata, tdata, NULL, NULL) != PJ_SUCCESS) {
903                                 pjsip_tx_data_dec_ref(tdata);
904                         }
905                         return PJ_TRUE;
906                 case AST_SIP_AUTHENTICATION_ERROR:
907                         log_failed_request(rdata, "Error to authenticate", 0, 0);
908                         ast_sip_report_auth_failed_challenge_response(endpoint, rdata);
909                         pjsip_tx_data_dec_ref(tdata);
910                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
911                         return PJ_TRUE;
912                 }
913                 pjsip_tx_data_dec_ref(tdata);
914         } else if (endpoint == artificial_endpoint) {
915                 /* Uh. Oh.  The artificial endpoint couldn't challenge so block the request. */
916                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
917                 return PJ_TRUE;
918         }
919
920         return PJ_FALSE;
921 }
922
923 static pjsip_module auth_mod = {
924         .name = {"Request Authenticator", 21},
925         .priority = PJSIP_MOD_PRIORITY_APPLICATION - 2,
926         .on_rx_request = authenticate,
927 };
928
929 static int distribute(void *data)
930 {
931         static pjsip_process_rdata_param param = {
932                 .start_mod = &distributor_mod,
933                 .idx_after_start = 1,
934         };
935         pj_bool_t handled = PJ_FALSE;
936         pjsip_rx_data *rdata = data;
937         int is_request = rdata->msg_info.msg->type == PJSIP_REQUEST_MSG;
938         int is_ack = is_request ? rdata->msg_info.msg->line.req.method.id == PJSIP_ACK_METHOD : 0;
939         struct ast_sip_endpoint *endpoint;
940
941         pjsip_endpt_process_rx_data(ast_sip_get_pjsip_endpoint(), rdata, &param, &handled);
942         if (!handled && is_request && !is_ack) {
943                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 501, NULL, NULL, NULL);
944         }
945
946         /* The endpoint_mod stores an endpoint reference in the mod_data of rdata. This
947          * is the only appropriate spot to actually decrement the reference.
948          */
949         endpoint = rdata->endpt_info.mod_data[endpoint_mod.id];
950         ao2_cleanup(endpoint);
951         pjsip_rx_data_free_cloned(rdata);
952         return 0;
953 }
954
955 struct ast_sip_endpoint *ast_pjsip_rdata_get_endpoint(pjsip_rx_data *rdata)
956 {
957         struct ast_sip_endpoint *endpoint = rdata->endpt_info.mod_data[endpoint_mod.id];
958         if (endpoint) {
959                 ao2_ref(endpoint, +1);
960         }
961         return endpoint;
962 }
963
964 static int suspects_sort(const void *obj, const void *arg, int flags)
965 {
966         const struct unidentified_request *object_left = obj;
967         const struct unidentified_request *object_right = arg;
968         const char *right_key = arg;
969         int cmp;
970
971         switch (flags & OBJ_SEARCH_MASK) {
972         case OBJ_SEARCH_OBJECT:
973                 right_key = object_right->src_name;
974                 /* Fall through */
975         case OBJ_SEARCH_KEY:
976                 cmp = strcmp(object_left->src_name, right_key);
977                 break;
978         case OBJ_SEARCH_PARTIAL_KEY:
979                 cmp = strncmp(object_left->src_name, right_key, strlen(right_key));
980                 break;
981         default:
982                 cmp = 0;
983                 break;
984         }
985         return cmp;
986 }
987
988 static int suspects_compare(void *obj, void *arg, int flags)
989 {
990         const struct unidentified_request *object_left = obj;
991         const struct unidentified_request *object_right = arg;
992         const char *right_key = arg;
993         int cmp = 0;
994
995         switch (flags & OBJ_SEARCH_MASK) {
996         case OBJ_SEARCH_OBJECT:
997                 right_key = object_right->src_name;
998                 /* Fall through */
999         case OBJ_SEARCH_KEY:
1000                 if (strcmp(object_left->src_name, right_key) == 0) {
1001                         cmp = CMP_MATCH;
1002                 }
1003                 break;
1004         case OBJ_SEARCH_PARTIAL_KEY:
1005                 if (strncmp(object_left->src_name, right_key, strlen(right_key)) == 0) {
1006                         cmp = CMP_MATCH;
1007                 }
1008                 break;
1009         default:
1010                 cmp = 0;
1011                 break;
1012         }
1013         return cmp;
1014 }
1015
1016 static int suspects_hash(const void *obj, int flags)
1017 {
1018         const struct unidentified_request *object;
1019         const char *key;
1020
1021         switch (flags & OBJ_SEARCH_MASK) {
1022         case OBJ_SEARCH_KEY:
1023                 key = obj;
1024                 break;
1025         case OBJ_SEARCH_OBJECT:
1026                 object = obj;
1027                 key = object->src_name;
1028                 break;
1029         default:
1030                 /* Hash can only work on something with a full key. */
1031                 ast_assert(0);
1032                 return 0;
1033         }
1034         return ast_str_hash(key);
1035 }
1036
1037 static struct ao2_container *cli_unid_get_container(const char *regex)
1038 {
1039         struct ao2_container *s_container;
1040
1041         s_container = ao2_container_alloc_list(AO2_ALLOC_OPT_LOCK_NOLOCK, 0,
1042                 suspects_sort, suspects_compare);
1043         if (!s_container) {
1044                 return NULL;
1045         }
1046
1047         if (ao2_container_dup(s_container, unidentified_requests, 0)) {
1048                 ao2_ref(s_container, -1);
1049                 return NULL;
1050         }
1051
1052         return s_container;
1053 }
1054
1055 static int cli_unid_iterate(void *container, ao2_callback_fn callback, void *args)
1056 {
1057         ao2_callback(container, 0, callback, args);
1058
1059         return 0;
1060 }
1061
1062 static void *cli_unid_retrieve_by_id(const char *id)
1063 {
1064         return ao2_find(unidentified_requests, id, OBJ_SEARCH_KEY);
1065 }
1066
1067 static const char *cli_unid_get_id(const void *obj)
1068 {
1069         const struct unidentified_request *unid = obj;
1070
1071         return unid->src_name;
1072 }
1073
1074 static int cli_unid_print_header(void *obj, void *arg, int flags)
1075 {
1076         struct ast_sip_cli_context *context = arg;
1077         RAII_VAR(struct ast_sip_cli_formatter_entry *, formatter_entry, NULL, ao2_cleanup);
1078
1079         int indent = CLI_INDENT_TO_SPACES(context->indent_level);
1080         int filler = CLI_LAST_TABSTOP - indent - 7;
1081
1082         ast_assert(context->output_buffer != NULL);
1083
1084         ast_str_append(&context->output_buffer, 0,
1085                 "%*s:  <IP Address%*.*s>  <Count> <Age(sec)>\n",
1086                 indent, "Request", filler, filler, CLI_HEADER_FILLER);
1087
1088         return 0;
1089 }
1090
1091 static int cli_unid_print_body(void *obj, void *arg, int flags)
1092 {
1093         struct unidentified_request *unid = obj;
1094         struct ast_sip_cli_context *context = arg;
1095         int indent;
1096         int flexwidth;
1097         int64_t ms = ast_tvdiff_ms(ast_tvnow(), unid->first_seen);
1098
1099         ast_assert(context->output_buffer != NULL);
1100
1101         indent = CLI_INDENT_TO_SPACES(context->indent_level);
1102         flexwidth = CLI_LAST_TABSTOP - 4;
1103
1104         ast_str_append(&context->output_buffer, 0, "%*s:  %-*.*s  %7d %10.3f\n",
1105                 indent,
1106                 "Request",
1107                 flexwidth, flexwidth,
1108                 unid->src_name, unid->count,  ms / 1000.0);
1109
1110         return 0;
1111 }
1112
1113 static struct ast_cli_entry cli_commands[] = {
1114         AST_CLI_DEFINE(ast_sip_cli_traverse_objects, "Show PJSIP Unidentified Requests",
1115                 .command = "pjsip show unidentified_requests",
1116                 .usage = "Usage: pjsip show unidentified_requests\n"
1117                                 "       Show the PJSIP Unidentified Requests\n"),
1118 };
1119
1120 struct ast_sip_cli_formatter_entry *unid_formatter;
1121
1122 static int expire_requests(void *object, void *arg, int flags)
1123 {
1124         struct unidentified_request *unid = object;
1125         int *maxage = arg;
1126         int64_t ms = ast_tvdiff_ms(ast_tvnow(), unid->first_seen);
1127
1128         if (ms > (*maxage) * 2 * 1000) {
1129                 return CMP_MATCH;
1130         }
1131
1132         return 0;
1133 }
1134
1135 static int prune_task(const void *data)
1136 {
1137         unsigned int maxage;
1138
1139         ast_sip_get_unidentified_request_thresholds(&unidentified_count, &unidentified_period, &unidentified_prune_interval);
1140         maxage = unidentified_period * 2;
1141         ao2_callback(unidentified_requests, OBJ_MULTIPLE | OBJ_NODATA | OBJ_UNLINK, expire_requests, &maxage);
1142
1143         return unidentified_prune_interval * 1000;
1144 }
1145
1146 static int clean_task(const void *data)
1147 {
1148         return 0;
1149 }
1150
1151 static void global_loaded(const char *object_type)
1152 {
1153         char default_realm[MAX_REALM_LENGTH + 1];
1154         struct ast_sip_auth *fake_auth;
1155         char *identifier_order;
1156
1157         /* Update using_auth_username */
1158         identifier_order = ast_sip_get_endpoint_identifier_order();
1159         if (identifier_order) {
1160                 char *identify_method;
1161                 char *io_copy = ast_strdupa(identifier_order);
1162                 int new_using = 0;
1163
1164                 ast_free(identifier_order);
1165                 while ((identify_method = ast_strip(strsep(&io_copy, ",")))) {
1166                         if (!strcmp(identify_method, "auth_username")) {
1167                                 new_using = 1;
1168                                 break;
1169                         }
1170                 }
1171                 using_auth_username = new_using;
1172         }
1173
1174         /* Update default_realm of artificial_auth */
1175         ast_sip_get_default_realm(default_realm, sizeof(default_realm));
1176         fake_auth = ast_sip_get_artificial_auth();
1177         if (!fake_auth || strcmp(fake_auth->realm, default_realm)) {
1178                 ao2_cleanup(fake_auth);
1179
1180                 fake_auth = alloc_artificial_auth(default_realm);
1181                 if (fake_auth) {
1182                         ao2_global_obj_replace_unref(artificial_auth, fake_auth);
1183                 }
1184         }
1185         ao2_cleanup(fake_auth);
1186
1187         ast_sip_get_unidentified_request_thresholds(&unidentified_count, &unidentified_period, &unidentified_prune_interval);
1188
1189         /* Clean out the old task, if any */
1190         ast_sched_clean_by_callback(prune_context, prune_task, clean_task);
1191         /* Have to do something with the return value to shut up the stupid compiler. */
1192         if (ast_sched_add_variable(prune_context, unidentified_prune_interval * 1000, prune_task, NULL, 1) < 0) {
1193                 return;
1194         }
1195 }
1196
1197 /*! \brief Observer which is used to update our interval and default_realm when the global setting changes */
1198 static struct ast_sorcery_observer global_observer = {
1199         .loaded = global_loaded,
1200 };
1201
1202 /*!
1203  * \internal
1204  * \brief Shutdown the serializers in the distributor pool.
1205  * \since 13.10.0
1206  *
1207  * \return Nothing
1208  */
1209 static void distributor_pool_shutdown(void)
1210 {
1211         int idx;
1212
1213         for (idx = 0; idx < ARRAY_LEN(distributor_pool); ++idx) {
1214                 ast_taskprocessor_unreference(distributor_pool[idx]);
1215                 distributor_pool[idx] = NULL;
1216         }
1217 }
1218
1219 /*!
1220  * \internal
1221  * \brief Setup the serializers in the distributor pool.
1222  * \since 13.10.0
1223  *
1224  * \retval 0 on success.
1225  * \retval -1 on error.
1226  */
1227 static int distributor_pool_setup(void)
1228 {
1229         char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
1230         int idx;
1231
1232         for (idx = 0; idx < ARRAY_LEN(distributor_pool); ++idx) {
1233                 /* Create name with seq number appended. */
1234                 ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/distributor");
1235
1236                 distributor_pool[idx] = ast_sip_create_serializer(tps_name);
1237                 if (!distributor_pool[idx]) {
1238                         return -1;
1239                 }
1240         }
1241         return 0;
1242 }
1243
1244 int ast_sip_initialize_distributor(void)
1245 {
1246         unidentified_requests = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_RWLOCK, 0,
1247                 DEFAULT_SUSPECTS_BUCKETS, suspects_hash, NULL, suspects_compare);
1248         if (!unidentified_requests) {
1249                 return -1;
1250         }
1251
1252         dialog_associations = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_RWLOCK, 0,
1253                 DIALOG_ASSOCIATIONS_BUCKETS, dialog_associations_hash, NULL,
1254                 dialog_associations_cmp);
1255         if (!dialog_associations) {
1256                 ast_sip_destroy_distributor();
1257                 return -1;
1258         }
1259
1260         if (distributor_pool_setup()) {
1261                 ast_sip_destroy_distributor();
1262                 return -1;
1263         }
1264
1265         prune_context = ast_sched_context_create();
1266         if (!prune_context) {
1267                 ast_sip_destroy_distributor();
1268                 return -1;
1269         }
1270
1271         if (ast_sched_start_thread(prune_context)) {
1272                 ast_sip_destroy_distributor();
1273                 return -1;
1274         }
1275
1276         ast_sorcery_observer_add(ast_sip_get_sorcery(), "global", &global_observer);
1277         ast_sorcery_reload_object(ast_sip_get_sorcery(), "global");
1278
1279         if (create_artificial_endpoint() || create_artificial_auth()) {
1280                 ast_sip_destroy_distributor();
1281                 return -1;
1282         }
1283
1284         if (ast_sip_register_service(&distributor_mod)) {
1285                 ast_sip_destroy_distributor();
1286                 return -1;
1287         }
1288         if (ast_sip_register_service(&endpoint_mod)) {
1289                 ast_sip_destroy_distributor();
1290                 return -1;
1291         }
1292         if (ast_sip_register_service(&auth_mod)) {
1293                 ast_sip_destroy_distributor();
1294                 return -1;
1295         }
1296
1297         unid_formatter = ao2_alloc_options(sizeof(struct ast_sip_cli_formatter_entry), NULL,
1298                 AO2_ALLOC_OPT_LOCK_NOLOCK);
1299         if (!unid_formatter) {
1300                 ast_sip_destroy_distributor();
1301                 ast_log(LOG_ERROR, "Unable to allocate memory for unid_formatter\n");
1302                 return -1;
1303         }
1304         unid_formatter->name = "unidentified_request";
1305         unid_formatter->print_header = cli_unid_print_header;
1306         unid_formatter->print_body = cli_unid_print_body;
1307         unid_formatter->get_container = cli_unid_get_container;
1308         unid_formatter->iterate = cli_unid_iterate;
1309         unid_formatter->get_id = cli_unid_get_id;
1310         unid_formatter->retrieve_by_id = cli_unid_retrieve_by_id;
1311         ast_sip_register_cli_formatter(unid_formatter);
1312
1313         ast_cli_register_multiple(cli_commands, ARRAY_LEN(cli_commands));
1314
1315         return 0;
1316 }
1317
1318 void ast_sip_destroy_distributor(void)
1319 {
1320         ast_cli_unregister_multiple(cli_commands, ARRAY_LEN(cli_commands));
1321         ast_sip_unregister_cli_formatter(unid_formatter);
1322
1323         ast_sip_unregister_service(&auth_mod);
1324         ast_sip_unregister_service(&endpoint_mod);
1325         ast_sip_unregister_service(&distributor_mod);
1326
1327         ao2_global_obj_release(artificial_auth);
1328         ao2_cleanup(artificial_endpoint);
1329
1330         ast_sorcery_observer_remove(ast_sip_get_sorcery(), "global", &global_observer);
1331
1332         if (prune_context) {
1333                 ast_sched_context_destroy(prune_context);
1334         }
1335
1336         distributor_pool_shutdown();
1337
1338         ao2_cleanup(dialog_associations);
1339         ao2_cleanup(unidentified_requests);
1340 }