626f07cefbade5144023d423a9c99f48b2b53018
[asterisk/asterisk.git] / res / res_pjsip / pjsip_distributor.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * Mark Michelson <mmichelson@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 #include "asterisk.h"
20
21 #include <pjsip.h>
22
23 #include "asterisk/res_pjsip.h"
24 #include "asterisk/acl.h"
25 #include "include/res_pjsip_private.h"
26 #include "asterisk/taskprocessor.h"
27 #include "asterisk/threadpool.h"
28 #include "asterisk/res_pjsip_cli.h"
29
30 static int distribute(void *data);
31 static pj_bool_t distributor(pjsip_rx_data *rdata);
32 static pj_status_t record_serializer(pjsip_tx_data *tdata);
33
34 static pjsip_module distributor_mod = {
35         .name = {"Request Distributor", 19},
36         .priority = PJSIP_MOD_PRIORITY_TSX_LAYER - 6,
37         .on_tx_request = record_serializer,
38         .on_rx_request = distributor,
39         .on_rx_response = distributor,
40 };
41
42 struct ast_sched_context *prune_context;
43
44 /* From the auth/realm realtime column size */
45 #define MAX_REALM_LENGTH 40
46
47 #define DEFAULT_SUSPECTS_BUCKETS 53
48
49 static struct ao2_container *unidentified_requests;
50 static unsigned int unidentified_count;
51 static unsigned int unidentified_period;
52 static unsigned int unidentified_prune_interval;
53 static int using_auth_username;
54
55 struct unidentified_request{
56         struct timeval first_seen;
57         int count;
58         char src_name[];
59 };
60
61 /*! Number of serializers in pool if one not otherwise known.  (Best if prime number) */
62 #define DISTRIBUTOR_POOL_SIZE           31
63
64 /*! Pool of serializers to use if not supplied. */
65 static struct ast_taskprocessor *distributor_pool[DISTRIBUTOR_POOL_SIZE];
66
67 /*!
68  * \internal
69  * \brief Record the task's serializer name on the tdata structure.
70  * \since 14.0.0
71  *
72  * \param tdata The outgoing message.
73  *
74  * \retval PJ_SUCCESS.
75  */
76 static pj_status_t record_serializer(pjsip_tx_data *tdata)
77 {
78         struct ast_taskprocessor *serializer;
79
80         serializer = ast_threadpool_serializer_get_current();
81         if (serializer) {
82                 const char *name;
83
84                 name = ast_taskprocessor_name(serializer);
85                 if (!ast_strlen_zero(name)
86                         && (!tdata->mod_data[distributor_mod.id]
87                                 || strcmp(tdata->mod_data[distributor_mod.id], name))) {
88                         char *tdata_name;
89
90                         /* The serializer in use changed. */
91                         tdata_name = pj_pool_alloc(tdata->pool, strlen(name) + 1);
92                         strcpy(tdata_name, name);/* Safe */
93
94                         tdata->mod_data[distributor_mod.id] = tdata_name;
95                 }
96         }
97
98         return PJ_SUCCESS;
99 }
100
101 /*!
102  * \internal
103  * \brief Find the request tdata to get the serializer it used.
104  * \since 14.0.0
105  *
106  * \param rdata The incoming message.
107  *
108  * \retval serializer on success.
109  * \retval NULL on error or could not find the serializer.
110  */
111 static struct ast_taskprocessor *find_request_serializer(pjsip_rx_data *rdata)
112 {
113         struct ast_taskprocessor *serializer = NULL;
114         pj_str_t tsx_key;
115         pjsip_transaction *tsx;
116
117         pjsip_tsx_create_key(rdata->tp_info.pool, &tsx_key, PJSIP_ROLE_UAC,
118                 &rdata->msg_info.cseq->method, rdata);
119
120         tsx = pjsip_tsx_layer_find_tsx(&tsx_key, PJ_TRUE);
121         if (!tsx) {
122                 ast_debug(1, "Could not find transaction for %s.\n",
123                         pjsip_rx_data_get_info(rdata));
124                 return NULL;
125         }
126         ast_debug(3, "Found transaction %s for %s.\n",
127                 tsx->obj_name, pjsip_rx_data_get_info(rdata));
128
129         if (tsx->last_tx) {
130                 const char *serializer_name;
131
132                 serializer_name = tsx->last_tx->mod_data[distributor_mod.id];
133                 if (!ast_strlen_zero(serializer_name)) {
134                         serializer = ast_taskprocessor_get(serializer_name, TPS_REF_IF_EXISTS);
135                         if (serializer) {
136                                 ast_debug(3, "Found serializer %s on transaction %s\n",
137                                                 serializer_name, tsx->obj_name);
138                         }
139                 }
140         }
141
142 #ifdef HAVE_PJ_TRANSACTION_GRP_LOCK
143         pj_grp_lock_release(tsx->grp_lock);
144 #else
145         pj_mutex_unlock(tsx->mutex);
146 #endif
147
148         return serializer;
149 }
150
151 /*! Dialog-specific information the distributor uses */
152 struct distributor_dialog_data {
153         /*! dialog_associations ao2 container key */
154         pjsip_dialog *dlg;
155         /*! Serializer to distribute tasks to for this dialog */
156         struct ast_taskprocessor *serializer;
157         /*! Endpoint associated with this dialog */
158         struct ast_sip_endpoint *endpoint;
159 };
160
161 #define DIALOG_ASSOCIATIONS_BUCKETS 251
162
163 static struct ao2_container *dialog_associations;
164
165 /*!
166  * \internal
167  * \brief Compute a hash value on an arbitrary buffer.
168  * \since 13.17.0
169  *
170  * \param[in] pos The buffer to add to the hash
171  * \param[in] len The buffer length to add to the hash
172  * \param[in] hash The hash value to add to
173  *
174  * \details
175  * This version of the function is for when you need to compute a
176  * hash of more than one buffer.
177  *
178  * This famous hash algorithm was written by Dan Bernstein and is
179  * commonly used.
180  *
181  * \sa http://www.cse.yorku.ca/~oz/hash.html
182  */
183 static int buf_hash_add(const char *pos, size_t len, int hash)
184 {
185         while (len--) {
186                 hash = hash * 33 ^ *pos++;
187         }
188
189         return hash;
190 }
191
192 /*!
193  * \internal
194  * \brief Compute a hash value on an arbitrary buffer.
195  * \since 13.17.0
196  *
197  * \param[in] pos The buffer to add to the hash
198  * \param[in] len The buffer length to add to the hash
199  *
200  * \details
201  * This version of the function is for when you need to compute a
202  * hash of more than one buffer.
203  *
204  * This famous hash algorithm was written by Dan Bernstein and is
205  * commonly used.
206  *
207  * \sa http://www.cse.yorku.ca/~oz/hash.html
208  */
209 static int buf_hash(const char *pos, size_t len)
210 {
211         return buf_hash_add(pos, len, 5381);
212 }
213
214 static int dialog_associations_hash(const void *obj, int flags)
215 {
216         const struct distributor_dialog_data *object;
217         union {
218                 const pjsip_dialog *dlg;
219                 const char buf[sizeof(pjsip_dialog *)];
220         } key;
221
222         switch (flags & OBJ_SEARCH_MASK) {
223         case OBJ_SEARCH_KEY:
224                 key.dlg = obj;
225                 break;
226         case OBJ_SEARCH_OBJECT:
227                 object = obj;
228                 key.dlg = object->dlg;
229                 break;
230         default:
231                 /* Hash can only work on something with a full key. */
232                 ast_assert(0);
233                 return 0;
234         }
235         return ast_str_hash_restrict(buf_hash(key.buf, sizeof(key.buf)));
236 }
237
238 static int dialog_associations_cmp(void *obj, void *arg, int flags)
239 {
240         const struct distributor_dialog_data *object_left = obj;
241         const struct distributor_dialog_data *object_right = arg;
242         const pjsip_dialog *right_key = arg;
243         int cmp = 0;
244
245         switch (flags & OBJ_SEARCH_MASK) {
246         case OBJ_SEARCH_OBJECT:
247                 right_key = object_right->dlg;
248                 /* Fall through */
249         case OBJ_SEARCH_KEY:
250                 if (object_left->dlg == right_key) {
251                         cmp = CMP_MATCH;
252                 }
253                 break;
254         case OBJ_SEARCH_PARTIAL_KEY:
255                 /* There is no such thing for this container. */
256                 ast_assert(0);
257                 break;
258         default:
259                 cmp = 0;
260                 break;
261         }
262         return cmp;
263 }
264
265 void ast_sip_dialog_set_serializer(pjsip_dialog *dlg, struct ast_taskprocessor *serializer)
266 {
267         struct distributor_dialog_data *dist;
268
269         ao2_wrlock(dialog_associations);
270         dist = ao2_find(dialog_associations, dlg, OBJ_SEARCH_KEY | OBJ_NOLOCK);
271         if (!dist) {
272                 if (serializer) {
273                         dist = ao2_alloc(sizeof(*dist), NULL);
274                         if (dist) {
275                                 dist->dlg = dlg;
276                                 dist->serializer = serializer;
277                                 ao2_link_flags(dialog_associations, dist, OBJ_NOLOCK);
278                                 ao2_ref(dist, -1);
279                         }
280                 }
281         } else {
282                 ao2_lock(dist);
283                 dist->serializer = serializer;
284                 if (!dist->serializer && !dist->endpoint) {
285                         ao2_unlink_flags(dialog_associations, dist, OBJ_NOLOCK);
286                 }
287                 ao2_unlock(dist);
288                 ao2_ref(dist, -1);
289         }
290         ao2_unlock(dialog_associations);
291 }
292
293 void ast_sip_dialog_set_endpoint(pjsip_dialog *dlg, struct ast_sip_endpoint *endpoint)
294 {
295         struct distributor_dialog_data *dist;
296
297         ao2_wrlock(dialog_associations);
298         dist = ao2_find(dialog_associations, dlg, OBJ_SEARCH_KEY | OBJ_NOLOCK);
299         if (!dist) {
300                 if (endpoint) {
301                         dist = ao2_alloc(sizeof(*dist), NULL);
302                         if (dist) {
303                                 dist->dlg = dlg;
304                                 dist->endpoint = endpoint;
305                                 ao2_link_flags(dialog_associations, dist, OBJ_NOLOCK);
306                                 ao2_ref(dist, -1);
307                         }
308                 }
309         } else {
310                 ao2_lock(dist);
311                 dist->endpoint = endpoint;
312                 if (!dist->serializer && !dist->endpoint) {
313                         ao2_unlink_flags(dialog_associations, dist, OBJ_NOLOCK);
314                 }
315                 ao2_unlock(dist);
316                 ao2_ref(dist, -1);
317         }
318         ao2_unlock(dialog_associations);
319 }
320
321 struct ast_sip_endpoint *ast_sip_dialog_get_endpoint(pjsip_dialog *dlg)
322 {
323         struct distributor_dialog_data *dist;
324         struct ast_sip_endpoint *endpoint;
325
326         dist = ao2_find(dialog_associations, dlg, OBJ_SEARCH_KEY);
327         if (dist) {
328                 ao2_lock(dist);
329                 endpoint = ao2_bump(dist->endpoint);
330                 ao2_unlock(dist);
331                 ao2_ref(dist, -1);
332         } else {
333                 endpoint = NULL;
334         }
335         return endpoint;
336 }
337
338 static pjsip_dialog *find_dialog(pjsip_rx_data *rdata)
339 {
340         pj_str_t tsx_key;
341         pjsip_transaction *tsx;
342         pjsip_dialog *dlg;
343         pj_str_t *local_tag;
344         pj_str_t *remote_tag;
345
346         if (!rdata->msg_info.msg) {
347                 return NULL;
348         }
349
350         if (rdata->msg_info.msg->type == PJSIP_REQUEST_MSG) {
351                 local_tag = &rdata->msg_info.to->tag;
352                 remote_tag = &rdata->msg_info.from->tag;
353         } else {
354                 local_tag = &rdata->msg_info.from->tag;
355                 remote_tag = &rdata->msg_info.to->tag;
356         }
357
358         /* We can only call the convenient method for
359          *  1) responses
360          *  2) non-CANCEL requests
361          *  3) CANCEL requests with a to-tag
362          */
363         if (rdata->msg_info.msg->type == PJSIP_RESPONSE_MSG ||
364                         pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_cancel_method) ||
365                         rdata->msg_info.to->tag.slen != 0) {
366                 dlg = pjsip_ua_find_dialog(&rdata->msg_info.cid->id, local_tag,
367                                 remote_tag, PJ_FALSE);
368                 if (dlg) {
369                         return dlg;
370                 }
371         }
372
373         /*
374          * There may still be a matching dialog if this is
375          * 1) an incoming CANCEL request without a to-tag
376          * 2) an incoming response to a dialog-creating request.
377          */
378         if (rdata->msg_info.msg->type == PJSIP_REQUEST_MSG) {
379                 /* CANCEL requests will need to match the INVITE we initially received. Any
380                  * other request type will either have been matched already or is not in
381                  * dialog
382                  */
383                 pjsip_tsx_create_key(rdata->tp_info.pool, &tsx_key, PJSIP_ROLE_UAS,
384                                 pjsip_get_invite_method(), rdata);
385         } else {
386                 pjsip_tsx_create_key(rdata->tp_info.pool, &tsx_key, PJSIP_ROLE_UAC,
387                                 &rdata->msg_info.cseq->method, rdata);
388         }
389
390         tsx = pjsip_tsx_layer_find_tsx(&tsx_key, PJ_TRUE);
391         if (!tsx) {
392                 ast_debug(3, "Could not find matching transaction for %s\n",
393                         pjsip_rx_data_get_info(rdata));
394                 return NULL;
395         }
396
397         dlg = pjsip_tsx_get_dlg(tsx);
398
399 #ifdef HAVE_PJ_TRANSACTION_GRP_LOCK
400         pj_grp_lock_release(tsx->grp_lock);
401 #else
402         pj_mutex_unlock(tsx->mutex);
403 #endif
404
405         return dlg;
406 }
407
408 /*!
409  * \internal
410  * \brief Compute a hash value on a pjlib string
411  * \since 13.10.0
412  *
413  * \param[in] str The pjlib string to add to the hash
414  * \param[in] hash The hash value to add to
415  *
416  * \details
417  * This version of the function is for when you need to compute a
418  * string hash of more than one string.
419  *
420  * This famous hash algorithm was written by Dan Bernstein and is
421  * commonly used.
422  *
423  * \sa http://www.cse.yorku.ca/~oz/hash.html
424  */
425 static int pjstr_hash_add(pj_str_t *str, int hash)
426 {
427         return buf_hash_add(pj_strbuf(str), pj_strlen(str), hash);
428 }
429
430 /*!
431  * \internal
432  * \brief Compute a hash value on a pjlib string
433  * \since 13.10.0
434  *
435  * \param[in] str The pjlib string to hash
436  *
437  * This famous hash algorithm was written by Dan Bernstein and is
438  * commonly used.
439  *
440  * http://www.cse.yorku.ca/~oz/hash.html
441  */
442 static int pjstr_hash(pj_str_t *str)
443 {
444         return pjstr_hash_add(str, 5381);
445 }
446
447 struct ast_taskprocessor *ast_sip_get_distributor_serializer(pjsip_rx_data *rdata)
448 {
449         int hash;
450         pj_str_t *remote_tag;
451         struct ast_taskprocessor *serializer;
452
453         if (!rdata->msg_info.msg) {
454                 return NULL;
455         }
456
457         if (rdata->msg_info.msg->type == PJSIP_REQUEST_MSG) {
458                 remote_tag = &rdata->msg_info.from->tag;
459         } else {
460                 remote_tag = &rdata->msg_info.to->tag;
461         }
462
463         /* Compute the hash from the SIP message call-id and remote-tag */
464         hash = pjstr_hash(&rdata->msg_info.cid->id);
465         hash = pjstr_hash_add(remote_tag, hash);
466         hash = ast_str_hash_restrict(hash);
467
468         serializer = ao2_bump(distributor_pool[hash % ARRAY_LEN(distributor_pool)]);
469         if (serializer) {
470                 ast_debug(3, "Calculated serializer %s to use for %s\n",
471                         ast_taskprocessor_name(serializer), pjsip_rx_data_get_info(rdata));
472         }
473         return serializer;
474 }
475
476 static pj_bool_t endpoint_lookup(pjsip_rx_data *rdata);
477
478 static pjsip_module endpoint_mod = {
479         .name = {"Endpoint Identifier", 19},
480         .priority = PJSIP_MOD_PRIORITY_TSX_LAYER - 3,
481         .on_rx_request = endpoint_lookup,
482 };
483
484 static pj_bool_t distributor(pjsip_rx_data *rdata)
485 {
486         pjsip_dialog *dlg;
487         struct distributor_dialog_data *dist = NULL;
488         struct ast_taskprocessor *serializer = NULL;
489         pjsip_rx_data *clone;
490
491         if (!ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) {
492                 /*
493                  * Ignore everything until we are fully booted.  Let the
494                  * peer retransmit messages until we are ready.
495                  */
496                 return PJ_TRUE;
497         }
498
499         dlg = find_dialog(rdata);
500         if (dlg) {
501                 ast_debug(3, "Searching for serializer associated with dialog %s for %s\n",
502                         dlg->obj_name, pjsip_rx_data_get_info(rdata));
503                 dist = ao2_find(dialog_associations, dlg, OBJ_SEARCH_KEY);
504                 if (dist) {
505                         ao2_lock(dist);
506                         serializer = ao2_bump(dist->serializer);
507                         ao2_unlock(dist);
508                         if (serializer) {
509                                 ast_debug(3, "Found serializer %s associated with dialog %s\n",
510                                         ast_taskprocessor_name(serializer), dlg->obj_name);
511                         }
512                 }
513         }
514
515         if (serializer) {
516                 /* We have a serializer so we know where to send the message. */
517         } else if (rdata->msg_info.msg->type == PJSIP_RESPONSE_MSG) {
518                 ast_debug(3, "No dialog serializer for %s.  Using request transaction as basis.\n",
519                         pjsip_rx_data_get_info(rdata));
520                 serializer = find_request_serializer(rdata);
521                 if (!serializer) {
522                         /*
523                          * Pick a serializer for the unmatched response.
524                          * We couldn't determine what serializer originally
525                          * sent the request or the serializer is gone.
526                          */
527                         serializer = ast_sip_get_distributor_serializer(rdata);
528                 }
529         } else if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_cancel_method)
530                 || !pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_bye_method)) {
531                 /* We have a BYE or CANCEL request without a serializer. */
532                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata,
533                         PJSIP_SC_CALL_TSX_DOES_NOT_EXIST, NULL, NULL, NULL);
534                 ao2_cleanup(dist);
535                 return PJ_TRUE;
536         } else {
537                 if (ast_taskprocessor_alert_get()) {
538                         /*
539                          * When taskprocessors get backed up, there is a good chance that
540                          * we are being overloaded and need to defer adding new work to
541                          * the system.  To defer the work we will ignore the request and
542                          * rely on the peer's transport layer to retransmit the message.
543                          * We usually work off the overload within a few seconds.  The
544                          * alternative is to send back a 503 response to these requests
545                          * and be done with it.
546                          */
547                         ast_debug(3, "Taskprocessor overload alert: Ignoring '%s'.\n",
548                                 pjsip_rx_data_get_info(rdata));
549                         ao2_cleanup(dist);
550                         return PJ_TRUE;
551                 }
552
553                 /* Pick a serializer for the out-of-dialog request. */
554                 serializer = ast_sip_get_distributor_serializer(rdata);
555         }
556
557         if (pjsip_rx_data_clone(rdata, 0, &clone) != PJ_SUCCESS) {
558                 ast_taskprocessor_unreference(serializer);
559                 ao2_cleanup(dist);
560                 return PJ_TRUE;
561         }
562
563         if (dist) {
564                 ao2_lock(dist);
565                 clone->endpt_info.mod_data[endpoint_mod.id] = ao2_bump(dist->endpoint);
566                 ao2_unlock(dist);
567                 ao2_cleanup(dist);
568         }
569
570         if (ast_sip_push_task(serializer, distribute, clone)) {
571                 ao2_cleanup(clone->endpt_info.mod_data[endpoint_mod.id]);
572                 pjsip_rx_data_free_cloned(clone);
573         }
574
575         ast_taskprocessor_unreference(serializer);
576
577         return PJ_TRUE;
578 }
579
580 static struct ast_sip_auth *alloc_artificial_auth(char *default_realm)
581 {
582         struct ast_sip_auth *fake_auth;
583
584         fake_auth = ast_sorcery_alloc(ast_sip_get_sorcery(), SIP_SORCERY_AUTH_TYPE,
585                 "artificial");
586         if (!fake_auth) {
587                 return NULL;
588         }
589
590         ast_string_field_set(fake_auth, realm, default_realm);
591         ast_string_field_set(fake_auth, auth_user, "");
592         ast_string_field_set(fake_auth, auth_pass, "");
593         fake_auth->type = AST_SIP_AUTH_TYPE_ARTIFICIAL;
594
595         return fake_auth;
596 }
597
598 static AO2_GLOBAL_OBJ_STATIC(artificial_auth);
599
600 static int create_artificial_auth(void)
601 {
602         char default_realm[MAX_REALM_LENGTH + 1];
603         struct ast_sip_auth *fake_auth;
604
605         ast_sip_get_default_realm(default_realm, sizeof(default_realm));
606         fake_auth = alloc_artificial_auth(default_realm);
607         if (!fake_auth) {
608                 ast_log(LOG_ERROR, "Unable to create artificial auth\n");
609                 return -1;
610         }
611
612         ao2_global_obj_replace_unref(artificial_auth, fake_auth);
613         ao2_ref(fake_auth, -1);
614         return 0;
615 }
616
617 struct ast_sip_auth *ast_sip_get_artificial_auth(void)
618 {
619         return ao2_global_obj_ref(artificial_auth);
620 }
621
622 static struct ast_sip_endpoint *artificial_endpoint = NULL;
623
624 static int create_artificial_endpoint(void)
625 {
626         artificial_endpoint = ast_sorcery_alloc(ast_sip_get_sorcery(), "endpoint", NULL);
627         if (!artificial_endpoint) {
628                 return -1;
629         }
630
631         AST_VECTOR_INIT(&artificial_endpoint->inbound_auths, 1);
632         /* Pushing a bogus value into the vector will ensure that
633          * the proper size of the vector is returned. This value is
634          * not actually used anywhere
635          */
636         AST_VECTOR_APPEND(&artificial_endpoint->inbound_auths, ast_strdup("artificial-auth"));
637         return 0;
638 }
639
640 struct ast_sip_endpoint *ast_sip_get_artificial_endpoint(void)
641 {
642         ao2_ref(artificial_endpoint, +1);
643         return artificial_endpoint;
644 }
645
646 static void log_failed_request(pjsip_rx_data *rdata, char *msg, unsigned int count, unsigned int period)
647 {
648         char from_buf[PJSIP_MAX_URL_SIZE];
649         char callid_buf[PJSIP_MAX_URL_SIZE];
650         char method_buf[PJSIP_MAX_URL_SIZE];
651         pjsip_uri_print(PJSIP_URI_IN_FROMTO_HDR, rdata->msg_info.from->uri, from_buf, PJSIP_MAX_URL_SIZE);
652         ast_copy_pj_str(callid_buf, &rdata->msg_info.cid->id, PJSIP_MAX_URL_SIZE);
653         ast_copy_pj_str(method_buf, &rdata->msg_info.msg->line.req.method.name, PJSIP_MAX_URL_SIZE);
654         if (count) {
655                 ast_log(LOG_NOTICE, "Request '%s' from '%s' failed for '%s:%d' (callid: %s) - %s"
656                         " after %u tries in %.3f ms\n",
657                         method_buf, from_buf, rdata->pkt_info.src_name, rdata->pkt_info.src_port, callid_buf, msg, count, period / 1000.0);
658         } else {
659                 ast_log(LOG_NOTICE, "Request '%s' from '%s' failed for '%s:%d' (callid: %s) - %s\n",
660                         method_buf, from_buf, rdata->pkt_info.src_name, rdata->pkt_info.src_port, callid_buf, msg);
661         }
662 }
663
664 static void check_endpoint(pjsip_rx_data *rdata, struct unidentified_request *unid,
665         const char *name)
666 {
667         int64_t ms = ast_tvdiff_ms(ast_tvnow(), unid->first_seen);
668
669         ao2_wrlock(unid);
670         unid->count++;
671
672         if (ms < (unidentified_period * 1000) && unid->count >= unidentified_count) {
673                 log_failed_request(rdata, "No matching endpoint found", unid->count, ms);
674                 ast_sip_report_invalid_endpoint(name, rdata);
675         }
676         ao2_unlock(unid);
677 }
678
679 static int apply_endpoint_acl(pjsip_rx_data *rdata, struct ast_sip_endpoint *endpoint);
680 static int apply_endpoint_contact_acl(pjsip_rx_data *rdata, struct ast_sip_endpoint *endpoint);
681
682 static void apply_acls(pjsip_rx_data *rdata)
683 {
684         struct ast_sip_endpoint *endpoint;
685
686         /* Is the endpoint allowed with the source or contact address? */
687         endpoint = rdata->endpt_info.mod_data[endpoint_mod.id];
688         if (endpoint != artificial_endpoint
689                 && (apply_endpoint_acl(rdata, endpoint)
690                         || apply_endpoint_contact_acl(rdata, endpoint))) {
691                 ast_debug(1, "Endpoint '%s' not allowed by ACL\n",
692                         ast_sorcery_object_get_id(endpoint));
693
694                 /* Replace the rdata endpoint with the artificial endpoint. */
695                 ao2_replace(rdata->endpt_info.mod_data[endpoint_mod.id], artificial_endpoint);
696         }
697 }
698
699 static pj_bool_t endpoint_lookup(pjsip_rx_data *rdata)
700 {
701         struct ast_sip_endpoint *endpoint;
702         struct unidentified_request *unid;
703         int is_ack = rdata->msg_info.msg->line.req.method.id == PJSIP_ACK_METHOD;
704
705         endpoint = rdata->endpt_info.mod_data[endpoint_mod.id];
706         if (endpoint) {
707                 /*
708                  * ao2_find with OBJ_UNLINK always write locks the container before even searching
709                  * for the object.  Since the majority case is that the object won't be found, do
710                  * the find without OBJ_UNLINK to prevent the unnecessary write lock, then unlink
711                  * if needed.
712                  */
713                 unid = ao2_find(unidentified_requests, rdata->pkt_info.src_name, OBJ_SEARCH_KEY);
714                 if (unid) {
715                         ao2_unlink(unidentified_requests, unid);
716                         ao2_ref(unid, -1);
717                 }
718                 apply_acls(rdata);
719                 return PJ_FALSE;
720         }
721
722         endpoint = ast_sip_identify_endpoint(rdata);
723         if (endpoint) {
724                 unid = ao2_find(unidentified_requests, rdata->pkt_info.src_name, OBJ_SEARCH_KEY);
725                 if (unid) {
726                         ao2_unlink(unidentified_requests, unid);
727                         ao2_ref(unid, -1);
728                 }
729         }
730
731         if (!endpoint) {
732                 /* always use an artificial endpoint - per discussion no reason
733                    to have "alwaysauthreject" as an option.  It is felt using it
734                    was a bug fix and it is not needed since we are not worried about
735                    breaking old stuff and we really don't want to enable the discovery
736                    of SIP accounts */
737                 endpoint = ast_sip_get_artificial_endpoint();
738         }
739
740         /* endpoint ref held by mod_data[] */
741         rdata->endpt_info.mod_data[endpoint_mod.id] = endpoint;
742
743         if (endpoint == artificial_endpoint && !is_ack) {
744                 char name[AST_UUID_STR_LEN] = "";
745                 pjsip_uri *from = rdata->msg_info.from->uri;
746
747                 if (PJSIP_URI_SCHEME_IS_SIP(from) || PJSIP_URI_SCHEME_IS_SIPS(from)) {
748                         pjsip_sip_uri *sip_from = pjsip_uri_get_uri(from);
749                         ast_copy_pj_str(name, &sip_from->user, sizeof(name));
750                 }
751
752                 unid = ao2_find(unidentified_requests, rdata->pkt_info.src_name, OBJ_SEARCH_KEY);
753                 if (unid) {
754                         check_endpoint(rdata, unid, name);
755                         ao2_ref(unid, -1);
756                 } else if (using_auth_username) {
757                         ao2_wrlock(unidentified_requests);
758                         /* Checking again with the write lock held allows us to eliminate the DUPS_REPLACE and sort_fn */
759                         unid = ao2_find(unidentified_requests, rdata->pkt_info.src_name,
760                                 OBJ_SEARCH_KEY | OBJ_NOLOCK);
761                         if (unid) {
762                                 check_endpoint(rdata, unid, name);
763                         } else {
764                                 unid = ao2_alloc_options(sizeof(*unid) + strlen(rdata->pkt_info.src_name) + 1,
765                                         NULL, AO2_ALLOC_OPT_LOCK_RWLOCK);
766                                 if (!unid) {
767                                         ao2_unlock(unidentified_requests);
768                                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
769                                         return PJ_TRUE;
770                                 }
771                                 strcpy(unid->src_name, rdata->pkt_info.src_name); /* Safe */
772                                 unid->first_seen = ast_tvnow();
773                                 unid->count = 1;
774                                 ao2_link_flags(unidentified_requests, unid, OBJ_NOLOCK);
775                         }
776                         ao2_ref(unid, -1);
777                         ao2_unlock(unidentified_requests);
778                 } else {
779                         log_failed_request(rdata, "No matching endpoint found", 0, 0);
780                         ast_sip_report_invalid_endpoint(name, rdata);
781                 }
782         }
783
784         apply_acls(rdata);
785         return PJ_FALSE;
786 }
787
788 static int apply_endpoint_acl(pjsip_rx_data *rdata, struct ast_sip_endpoint *endpoint)
789 {
790         struct ast_sockaddr addr;
791
792         if (ast_acl_list_is_empty(endpoint->acl)) {
793                 return 0;
794         }
795
796         memset(&addr, 0, sizeof(addr));
797         ast_sockaddr_parse(&addr, rdata->pkt_info.src_name, PARSE_PORT_FORBID);
798         ast_sockaddr_set_port(&addr, rdata->pkt_info.src_port);
799
800         if (ast_apply_acl(endpoint->acl, &addr, "SIP ACL: ") != AST_SENSE_ALLOW) {
801                 log_failed_request(rdata, "Not match Endpoint ACL", 0, 0);
802                 ast_sip_report_failed_acl(endpoint, rdata, "not_match_endpoint_acl");
803                 return 1;
804         }
805         return 0;
806 }
807
808 static int extract_contact_addr(pjsip_contact_hdr *contact, struct ast_sockaddr **addrs)
809 {
810         pjsip_sip_uri *sip_uri;
811         char host[256];
812
813         if (!contact || contact->star) {
814                 *addrs = NULL;
815                 return 0;
816         }
817         if (!PJSIP_URI_SCHEME_IS_SIP(contact->uri) && !PJSIP_URI_SCHEME_IS_SIPS(contact->uri)) {
818                 *addrs = NULL;
819                 return 0;
820         }
821         sip_uri = pjsip_uri_get_uri(contact->uri);
822         ast_copy_pj_str(host, &sip_uri->host, sizeof(host));
823         return ast_sockaddr_resolve(addrs, host, PARSE_PORT_FORBID, AST_AF_UNSPEC);
824 }
825
826 static int apply_endpoint_contact_acl(pjsip_rx_data *rdata, struct ast_sip_endpoint *endpoint)
827 {
828         int num_contact_addrs;
829         int forbidden = 0;
830         struct ast_sockaddr *contact_addrs;
831         int i;
832         pjsip_contact_hdr *contact = (pjsip_contact_hdr *)&rdata->msg_info.msg->hdr;
833
834         if (ast_acl_list_is_empty(endpoint->contact_acl)) {
835                 return 0;
836         }
837
838         while ((contact = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_CONTACT, contact->next))) {
839                 num_contact_addrs = extract_contact_addr(contact, &contact_addrs);
840                 if (num_contact_addrs <= 0) {
841                         continue;
842                 }
843                 for (i = 0; i < num_contact_addrs; ++i) {
844                         if (ast_apply_acl(endpoint->contact_acl, &contact_addrs[i], "SIP Contact ACL: ") != AST_SENSE_ALLOW) {
845                                 log_failed_request(rdata, "Not match Endpoint Contact ACL", 0, 0);
846                                 ast_sip_report_failed_acl(endpoint, rdata, "not_match_endpoint_contact_acl");
847                                 forbidden = 1;
848                                 break;
849                         }
850                 }
851                 ast_free(contact_addrs);
852                 if (forbidden) {
853                         /* No use checking other contacts if we already have failed ACL check */
854                         break;
855                 }
856         }
857
858         return forbidden;
859 }
860
861 static pj_bool_t authenticate(pjsip_rx_data *rdata)
862 {
863         RAII_VAR(struct ast_sip_endpoint *, endpoint, ast_pjsip_rdata_get_endpoint(rdata), ao2_cleanup);
864         int is_ack = rdata->msg_info.msg->line.req.method.id == PJSIP_ACK_METHOD;
865
866         ast_assert(endpoint != NULL);
867
868         if (is_ack) {
869                 return PJ_FALSE;
870         }
871
872         if (ast_sip_requires_authentication(endpoint, rdata)) {
873                 pjsip_tx_data *tdata;
874                 struct unidentified_request *unid;
875
876                 pjsip_endpt_create_response(ast_sip_get_pjsip_endpoint(), rdata, 401, NULL, &tdata);
877                 switch (ast_sip_check_authentication(endpoint, rdata, tdata)) {
878                 case AST_SIP_AUTHENTICATION_CHALLENGE:
879                         /* Send the 401 we created for them */
880                         ast_sip_report_auth_challenge_sent(endpoint, rdata, tdata);
881                         if (pjsip_endpt_send_response2(ast_sip_get_pjsip_endpoint(), rdata, tdata, NULL, NULL) != PJ_SUCCESS) {
882                                 pjsip_tx_data_dec_ref(tdata);
883                         }
884                         return PJ_TRUE;
885                 case AST_SIP_AUTHENTICATION_SUCCESS:
886                         /* See note in endpoint_lookup about not holding an unnecessary write lock */
887                         unid = ao2_find(unidentified_requests, rdata->pkt_info.src_name, OBJ_SEARCH_KEY);
888                         if (unid) {
889                                 ao2_unlink(unidentified_requests, unid);
890                                 ao2_ref(unid, -1);
891                         }
892                         ast_sip_report_auth_success(endpoint, rdata);
893                         break;
894                 case AST_SIP_AUTHENTICATION_FAILED:
895                         log_failed_request(rdata, "Failed to authenticate", 0, 0);
896                         ast_sip_report_auth_failed_challenge_response(endpoint, rdata);
897                         if (pjsip_endpt_send_response2(ast_sip_get_pjsip_endpoint(), rdata, tdata, NULL, NULL) != PJ_SUCCESS) {
898                                 pjsip_tx_data_dec_ref(tdata);
899                         }
900                         return PJ_TRUE;
901                 case AST_SIP_AUTHENTICATION_ERROR:
902                         log_failed_request(rdata, "Error to authenticate", 0, 0);
903                         ast_sip_report_auth_failed_challenge_response(endpoint, rdata);
904                         pjsip_tx_data_dec_ref(tdata);
905                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
906                         return PJ_TRUE;
907                 }
908                 pjsip_tx_data_dec_ref(tdata);
909         } else if (endpoint == artificial_endpoint) {
910                 /* Uh. Oh.  The artificial endpoint couldn't challenge so block the request. */
911                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
912                 return PJ_TRUE;
913         }
914
915         return PJ_FALSE;
916 }
917
918 static pjsip_module auth_mod = {
919         .name = {"Request Authenticator", 21},
920         .priority = PJSIP_MOD_PRIORITY_APPLICATION - 2,
921         .on_rx_request = authenticate,
922 };
923
924 static int distribute(void *data)
925 {
926         static pjsip_process_rdata_param param = {
927                 .start_mod = &distributor_mod,
928                 .idx_after_start = 1,
929         };
930         pj_bool_t handled = PJ_FALSE;
931         pjsip_rx_data *rdata = data;
932         int is_request = rdata->msg_info.msg->type == PJSIP_REQUEST_MSG;
933         int is_ack = is_request ? rdata->msg_info.msg->line.req.method.id == PJSIP_ACK_METHOD : 0;
934         struct ast_sip_endpoint *endpoint;
935
936         pjsip_endpt_process_rx_data(ast_sip_get_pjsip_endpoint(), rdata, &param, &handled);
937         if (!handled && is_request && !is_ack) {
938                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 501, NULL, NULL, NULL);
939         }
940
941         /* The endpoint_mod stores an endpoint reference in the mod_data of rdata. This
942          * is the only appropriate spot to actually decrement the reference.
943          */
944         endpoint = rdata->endpt_info.mod_data[endpoint_mod.id];
945         ao2_cleanup(endpoint);
946         pjsip_rx_data_free_cloned(rdata);
947         return 0;
948 }
949
950 struct ast_sip_endpoint *ast_pjsip_rdata_get_endpoint(pjsip_rx_data *rdata)
951 {
952         struct ast_sip_endpoint *endpoint = rdata->endpt_info.mod_data[endpoint_mod.id];
953         if (endpoint) {
954                 ao2_ref(endpoint, +1);
955         }
956         return endpoint;
957 }
958
959 static int suspects_sort(const void *obj, const void *arg, int flags)
960 {
961         const struct unidentified_request *object_left = obj;
962         const struct unidentified_request *object_right = arg;
963         const char *right_key = arg;
964         int cmp;
965
966         switch (flags & OBJ_SEARCH_MASK) {
967         case OBJ_SEARCH_OBJECT:
968                 right_key = object_right->src_name;
969                 /* Fall through */
970         case OBJ_SEARCH_KEY:
971                 cmp = strcmp(object_left->src_name, right_key);
972                 break;
973         case OBJ_SEARCH_PARTIAL_KEY:
974                 cmp = strncmp(object_left->src_name, right_key, strlen(right_key));
975                 break;
976         default:
977                 cmp = 0;
978                 break;
979         }
980         return cmp;
981 }
982
983 static int suspects_compare(void *obj, void *arg, int flags)
984 {
985         const struct unidentified_request *object_left = obj;
986         const struct unidentified_request *object_right = arg;
987         const char *right_key = arg;
988         int cmp = 0;
989
990         switch (flags & OBJ_SEARCH_MASK) {
991         case OBJ_SEARCH_OBJECT:
992                 right_key = object_right->src_name;
993                 /* Fall through */
994         case OBJ_SEARCH_KEY:
995                 if (strcmp(object_left->src_name, right_key) == 0) {
996                         cmp = CMP_MATCH;
997                 }
998                 break;
999         case OBJ_SEARCH_PARTIAL_KEY:
1000                 if (strncmp(object_left->src_name, right_key, strlen(right_key)) == 0) {
1001                         cmp = CMP_MATCH;
1002                 }
1003                 break;
1004         default:
1005                 cmp = 0;
1006                 break;
1007         }
1008         return cmp;
1009 }
1010
1011 static int suspects_hash(const void *obj, int flags)
1012 {
1013         const struct unidentified_request *object;
1014         const char *key;
1015
1016         switch (flags & OBJ_SEARCH_MASK) {
1017         case OBJ_SEARCH_KEY:
1018                 key = obj;
1019                 break;
1020         case OBJ_SEARCH_OBJECT:
1021                 object = obj;
1022                 key = object->src_name;
1023                 break;
1024         default:
1025                 /* Hash can only work on something with a full key. */
1026                 ast_assert(0);
1027                 return 0;
1028         }
1029         return ast_str_hash(key);
1030 }
1031
1032 static struct ao2_container *cli_unid_get_container(const char *regex)
1033 {
1034         struct ao2_container *s_container;
1035
1036         s_container = ao2_container_alloc_list(AO2_ALLOC_OPT_LOCK_NOLOCK, 0,
1037                 suspects_sort, suspects_compare);
1038         if (!s_container) {
1039                 return NULL;
1040         }
1041
1042         if (ao2_container_dup(s_container, unidentified_requests, 0)) {
1043                 ao2_ref(s_container, -1);
1044                 return NULL;
1045         }
1046
1047         return s_container;
1048 }
1049
1050 static int cli_unid_iterate(void *container, ao2_callback_fn callback, void *args)
1051 {
1052         ao2_callback(container, 0, callback, args);
1053
1054         return 0;
1055 }
1056
1057 static void *cli_unid_retrieve_by_id(const char *id)
1058 {
1059         return ao2_find(unidentified_requests, id, OBJ_SEARCH_KEY);
1060 }
1061
1062 static const char *cli_unid_get_id(const void *obj)
1063 {
1064         const struct unidentified_request *unid = obj;
1065
1066         return unid->src_name;
1067 }
1068
1069 static int cli_unid_print_header(void *obj, void *arg, int flags)
1070 {
1071         struct ast_sip_cli_context *context = arg;
1072         RAII_VAR(struct ast_sip_cli_formatter_entry *, formatter_entry, NULL, ao2_cleanup);
1073
1074         int indent = CLI_INDENT_TO_SPACES(context->indent_level);
1075         int filler = CLI_LAST_TABSTOP - indent - 7;
1076
1077         ast_assert(context->output_buffer != NULL);
1078
1079         ast_str_append(&context->output_buffer, 0,
1080                 "%*s:  <IP Address%*.*s>  <Count> <Age(sec)>\n",
1081                 indent, "Request", filler, filler, CLI_HEADER_FILLER);
1082
1083         return 0;
1084 }
1085
1086 static int cli_unid_print_body(void *obj, void *arg, int flags)
1087 {
1088         struct unidentified_request *unid = obj;
1089         struct ast_sip_cli_context *context = arg;
1090         int indent;
1091         int flexwidth;
1092         int64_t ms = ast_tvdiff_ms(ast_tvnow(), unid->first_seen);
1093
1094         ast_assert(context->output_buffer != NULL);
1095
1096         indent = CLI_INDENT_TO_SPACES(context->indent_level);
1097         flexwidth = CLI_LAST_TABSTOP - 4;
1098
1099         ast_str_append(&context->output_buffer, 0, "%*s:  %-*.*s  %7d %10.3f\n",
1100                 indent,
1101                 "Request",
1102                 flexwidth, flexwidth,
1103                 unid->src_name, unid->count,  ms / 1000.0);
1104
1105         return 0;
1106 }
1107
1108 static struct ast_cli_entry cli_commands[] = {
1109         AST_CLI_DEFINE(ast_sip_cli_traverse_objects, "Show PJSIP Unidentified Requests",
1110                 .command = "pjsip show unidentified_requests",
1111                 .usage = "Usage: pjsip show unidentified_requests\n"
1112                                 "       Show the PJSIP Unidentified Requests\n"),
1113 };
1114
1115 struct ast_sip_cli_formatter_entry *unid_formatter;
1116
1117 static int expire_requests(void *object, void *arg, int flags)
1118 {
1119         struct unidentified_request *unid = object;
1120         int *maxage = arg;
1121         int64_t ms = ast_tvdiff_ms(ast_tvnow(), unid->first_seen);
1122
1123         if (ms > (*maxage) * 2 * 1000) {
1124                 return CMP_MATCH;
1125         }
1126
1127         return 0;
1128 }
1129
1130 static int prune_task(const void *data)
1131 {
1132         unsigned int maxage;
1133
1134         ast_sip_get_unidentified_request_thresholds(&unidentified_count, &unidentified_period, &unidentified_prune_interval);
1135         maxage = unidentified_period * 2;
1136         ao2_callback(unidentified_requests, OBJ_MULTIPLE | OBJ_NODATA | OBJ_UNLINK, expire_requests, &maxage);
1137
1138         return unidentified_prune_interval * 1000;
1139 }
1140
1141 static int clean_task(const void *data)
1142 {
1143         return 0;
1144 }
1145
1146 static void global_loaded(const char *object_type)
1147 {
1148         char default_realm[MAX_REALM_LENGTH + 1];
1149         struct ast_sip_auth *fake_auth;
1150         char *identifier_order;
1151
1152         /* Update using_auth_username */
1153         identifier_order = ast_sip_get_endpoint_identifier_order();
1154         if (identifier_order) {
1155                 char *identify_method;
1156                 char *io_copy = ast_strdupa(identifier_order);
1157                 int new_using = 0;
1158
1159                 ast_free(identifier_order);
1160                 while ((identify_method = ast_strip(strsep(&io_copy, ",")))) {
1161                         if (!strcmp(identify_method, "auth_username")) {
1162                                 new_using = 1;
1163                                 break;
1164                         }
1165                 }
1166                 using_auth_username = new_using;
1167         }
1168
1169         /* Update default_realm of artificial_auth */
1170         ast_sip_get_default_realm(default_realm, sizeof(default_realm));
1171         fake_auth = ast_sip_get_artificial_auth();
1172         if (!fake_auth || strcmp(fake_auth->realm, default_realm)) {
1173                 ao2_cleanup(fake_auth);
1174
1175                 fake_auth = alloc_artificial_auth(default_realm);
1176                 if (fake_auth) {
1177                         ao2_global_obj_replace_unref(artificial_auth, fake_auth);
1178                 }
1179         }
1180         ao2_cleanup(fake_auth);
1181
1182         ast_sip_get_unidentified_request_thresholds(&unidentified_count, &unidentified_period, &unidentified_prune_interval);
1183
1184         /* Clean out the old task, if any */
1185         ast_sched_clean_by_callback(prune_context, prune_task, clean_task);
1186         /* Have to do something with the return value to shut up the stupid compiler. */
1187         if (ast_sched_add_variable(prune_context, unidentified_prune_interval * 1000, prune_task, NULL, 1) < 0) {
1188                 return;
1189         }
1190 }
1191
1192 /*! \brief Observer which is used to update our interval and default_realm when the global setting changes */
1193 static struct ast_sorcery_observer global_observer = {
1194         .loaded = global_loaded,
1195 };
1196
1197 /*!
1198  * \internal
1199  * \brief Shutdown the serializers in the distributor pool.
1200  * \since 13.10.0
1201  *
1202  * \return Nothing
1203  */
1204 static void distributor_pool_shutdown(void)
1205 {
1206         int idx;
1207
1208         for (idx = 0; idx < ARRAY_LEN(distributor_pool); ++idx) {
1209                 ast_taskprocessor_unreference(distributor_pool[idx]);
1210                 distributor_pool[idx] = NULL;
1211         }
1212 }
1213
1214 /*!
1215  * \internal
1216  * \brief Setup the serializers in the distributor pool.
1217  * \since 13.10.0
1218  *
1219  * \retval 0 on success.
1220  * \retval -1 on error.
1221  */
1222 static int distributor_pool_setup(void)
1223 {
1224         char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
1225         int idx;
1226
1227         for (idx = 0; idx < ARRAY_LEN(distributor_pool); ++idx) {
1228                 /* Create name with seq number appended. */
1229                 ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/distributor");
1230
1231                 distributor_pool[idx] = ast_sip_create_serializer(tps_name);
1232                 if (!distributor_pool[idx]) {
1233                         return -1;
1234                 }
1235         }
1236         return 0;
1237 }
1238
1239 int ast_sip_initialize_distributor(void)
1240 {
1241         unidentified_requests = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_RWLOCK, 0,
1242                 DEFAULT_SUSPECTS_BUCKETS, suspects_hash, NULL, suspects_compare);
1243         if (!unidentified_requests) {
1244                 return -1;
1245         }
1246
1247         dialog_associations = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_RWLOCK, 0,
1248                 DIALOG_ASSOCIATIONS_BUCKETS, dialog_associations_hash, NULL,
1249                 dialog_associations_cmp);
1250         if (!dialog_associations) {
1251                 ast_sip_destroy_distributor();
1252                 return -1;
1253         }
1254
1255         if (distributor_pool_setup()) {
1256                 ast_sip_destroy_distributor();
1257                 return -1;
1258         }
1259
1260         prune_context = ast_sched_context_create();
1261         if (!prune_context) {
1262                 ast_sip_destroy_distributor();
1263                 return -1;
1264         }
1265
1266         if (ast_sched_start_thread(prune_context)) {
1267                 ast_sip_destroy_distributor();
1268                 return -1;
1269         }
1270
1271         ast_sorcery_observer_add(ast_sip_get_sorcery(), "global", &global_observer);
1272         ast_sorcery_reload_object(ast_sip_get_sorcery(), "global");
1273
1274         if (create_artificial_endpoint() || create_artificial_auth()) {
1275                 ast_sip_destroy_distributor();
1276                 return -1;
1277         }
1278
1279         if (ast_sip_register_service(&distributor_mod)) {
1280                 ast_sip_destroy_distributor();
1281                 return -1;
1282         }
1283         if (ast_sip_register_service(&endpoint_mod)) {
1284                 ast_sip_destroy_distributor();
1285                 return -1;
1286         }
1287         if (ast_sip_register_service(&auth_mod)) {
1288                 ast_sip_destroy_distributor();
1289                 return -1;
1290         }
1291
1292         unid_formatter = ao2_alloc_options(sizeof(struct ast_sip_cli_formatter_entry), NULL,
1293                 AO2_ALLOC_OPT_LOCK_NOLOCK);
1294         if (!unid_formatter) {
1295                 ast_sip_destroy_distributor();
1296                 ast_log(LOG_ERROR, "Unable to allocate memory for unid_formatter\n");
1297                 return -1;
1298         }
1299         unid_formatter->name = "unidentified_request";
1300         unid_formatter->print_header = cli_unid_print_header;
1301         unid_formatter->print_body = cli_unid_print_body;
1302         unid_formatter->get_container = cli_unid_get_container;
1303         unid_formatter->iterate = cli_unid_iterate;
1304         unid_formatter->get_id = cli_unid_get_id;
1305         unid_formatter->retrieve_by_id = cli_unid_retrieve_by_id;
1306         ast_sip_register_cli_formatter(unid_formatter);
1307
1308         ast_cli_register_multiple(cli_commands, ARRAY_LEN(cli_commands));
1309
1310         return 0;
1311 }
1312
1313 void ast_sip_destroy_distributor(void)
1314 {
1315         ast_cli_unregister_multiple(cli_commands, ARRAY_LEN(cli_commands));
1316         ast_sip_unregister_cli_formatter(unid_formatter);
1317
1318         ast_sip_unregister_service(&auth_mod);
1319         ast_sip_unregister_service(&endpoint_mod);
1320         ast_sip_unregister_service(&distributor_mod);
1321
1322         ao2_global_obj_release(artificial_auth);
1323         ao2_cleanup(artificial_endpoint);
1324
1325         ast_sorcery_observer_remove(ast_sip_get_sorcery(), "global", &global_observer);
1326
1327         if (prune_context) {
1328                 ast_sched_context_destroy(prune_context);
1329         }
1330
1331         distributor_pool_shutdown();
1332
1333         ao2_cleanup(dialog_associations);
1334         ao2_cleanup(unidentified_requests);
1335 }