Merge "chan_pjsip: Relock correct channel during "fax" redirect."
[asterisk/asterisk.git] / res / res_pjsip / pjsip_distributor.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * Mark Michelson <mmichelson@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 #include "asterisk.h"
20
21 #include <pjsip.h>
22
23 #include "asterisk/res_pjsip.h"
24 #include "asterisk/acl.h"
25 #include "include/res_pjsip_private.h"
26 #include "asterisk/taskprocessor.h"
27 #include "asterisk/threadpool.h"
28 #include "asterisk/res_pjsip_cli.h"
29
30 static int distribute(void *data);
31 static pj_bool_t distributor(pjsip_rx_data *rdata);
32 static pj_status_t record_serializer(pjsip_tx_data *tdata);
33
34 static pjsip_module distributor_mod = {
35         .name = {"Request Distributor", 19},
36         .priority = PJSIP_MOD_PRIORITY_TSX_LAYER - 6,
37         .on_tx_request = record_serializer,
38         .on_rx_request = distributor,
39         .on_rx_response = distributor,
40 };
41
42 struct ast_sched_context *prune_context;
43
44 /* From the auth/realm realtime column size */
45 #define MAX_REALM_LENGTH 40
46
47 #define DEFAULT_SUSPECTS_BUCKETS 53
48
49 static struct ao2_container *unidentified_requests;
50 static unsigned int unidentified_count;
51 static unsigned int unidentified_period;
52 static unsigned int unidentified_prune_interval;
53 static int using_auth_username;
54 static enum ast_sip_taskprocessor_overload_trigger overload_trigger;
55
56 struct unidentified_request{
57         struct timeval first_seen;
58         int count;
59         char src_name[];
60 };
61
62 /*! Number of serializers in pool if one not otherwise known.  (Best if prime number) */
63 #define DISTRIBUTOR_POOL_SIZE           31
64
65 /*! Pool of serializers to use if not supplied. */
66 static struct ast_taskprocessor *distributor_pool[DISTRIBUTOR_POOL_SIZE];
67
68 /*!
69  * \internal
70  * \brief Record the task's serializer name on the tdata structure.
71  * \since 14.0.0
72  *
73  * \param tdata The outgoing message.
74  *
75  * \retval PJ_SUCCESS.
76  */
77 static pj_status_t record_serializer(pjsip_tx_data *tdata)
78 {
79         struct ast_taskprocessor *serializer;
80
81         serializer = ast_threadpool_serializer_get_current();
82         if (serializer) {
83                 const char *name;
84
85                 name = ast_taskprocessor_name(serializer);
86                 if (!ast_strlen_zero(name)
87                         && (!tdata->mod_data[distributor_mod.id]
88                                 || strcmp(tdata->mod_data[distributor_mod.id], name))) {
89                         char *tdata_name;
90
91                         /* The serializer in use changed. */
92                         tdata_name = pj_pool_alloc(tdata->pool, strlen(name) + 1);
93                         strcpy(tdata_name, name);/* Safe */
94
95                         tdata->mod_data[distributor_mod.id] = tdata_name;
96                 }
97         }
98
99         return PJ_SUCCESS;
100 }
101
102 /*!
103  * \internal
104  * \brief Find the request tdata to get the serializer it used.
105  * \since 14.0.0
106  *
107  * \param rdata The incoming message.
108  *
109  * \retval serializer on success.
110  * \retval NULL on error or could not find the serializer.
111  */
112 static struct ast_taskprocessor *find_request_serializer(pjsip_rx_data *rdata)
113 {
114         struct ast_taskprocessor *serializer = NULL;
115         pj_str_t tsx_key;
116         pjsip_transaction *tsx;
117
118         pjsip_tsx_create_key(rdata->tp_info.pool, &tsx_key, PJSIP_ROLE_UAC,
119                 &rdata->msg_info.cseq->method, rdata);
120
121         tsx = pjsip_tsx_layer_find_tsx(&tsx_key, PJ_TRUE);
122         if (!tsx) {
123                 ast_debug(1, "Could not find transaction for %s.\n",
124                         pjsip_rx_data_get_info(rdata));
125                 return NULL;
126         }
127         ast_debug(3, "Found transaction %s for %s.\n",
128                 tsx->obj_name, pjsip_rx_data_get_info(rdata));
129
130         if (tsx->last_tx) {
131                 const char *serializer_name;
132
133                 serializer_name = tsx->last_tx->mod_data[distributor_mod.id];
134                 if (!ast_strlen_zero(serializer_name)) {
135                         serializer = ast_taskprocessor_get(serializer_name, TPS_REF_IF_EXISTS);
136                         if (serializer) {
137                                 ast_debug(3, "Found serializer %s on transaction %s\n",
138                                                 serializer_name, tsx->obj_name);
139                         }
140                 }
141         }
142
143 #ifdef HAVE_PJ_TRANSACTION_GRP_LOCK
144         pj_grp_lock_release(tsx->grp_lock);
145 #else
146         pj_mutex_unlock(tsx->mutex);
147 #endif
148
149         return serializer;
150 }
151
152 /*! Dialog-specific information the distributor uses */
153 struct distributor_dialog_data {
154         /*! dialog_associations ao2 container key */
155         pjsip_dialog *dlg;
156         /*! Serializer to distribute tasks to for this dialog */
157         struct ast_taskprocessor *serializer;
158         /*! Endpoint associated with this dialog */
159         struct ast_sip_endpoint *endpoint;
160 };
161
162 #define DIALOG_ASSOCIATIONS_BUCKETS 251
163
164 static struct ao2_container *dialog_associations;
165
166 /*!
167  * \internal
168  * \brief Compute a hash value on an arbitrary buffer.
169  * \since 13.17.0
170  *
171  * \param[in] pos The buffer to add to the hash
172  * \param[in] len The buffer length to add to the hash
173  * \param[in] hash The hash value to add to
174  *
175  * \details
176  * This version of the function is for when you need to compute a
177  * hash of more than one buffer.
178  *
179  * This famous hash algorithm was written by Dan Bernstein and is
180  * commonly used.
181  *
182  * \sa http://www.cse.yorku.ca/~oz/hash.html
183  */
184 static int buf_hash_add(const char *pos, size_t len, int hash)
185 {
186         while (len--) {
187                 hash = hash * 33 ^ *pos++;
188         }
189
190         return hash;
191 }
192
193 /*!
194  * \internal
195  * \brief Compute a hash value on an arbitrary buffer.
196  * \since 13.17.0
197  *
198  * \param[in] pos The buffer to add to the hash
199  * \param[in] len The buffer length to add to the hash
200  *
201  * \details
202  * This version of the function is for when you need to compute a
203  * hash of more than one buffer.
204  *
205  * This famous hash algorithm was written by Dan Bernstein and is
206  * commonly used.
207  *
208  * \sa http://www.cse.yorku.ca/~oz/hash.html
209  */
210 static int buf_hash(const char *pos, size_t len)
211 {
212         return buf_hash_add(pos, len, 5381);
213 }
214
215 static int dialog_associations_hash(const void *obj, int flags)
216 {
217         const struct distributor_dialog_data *object;
218         union {
219                 const pjsip_dialog *dlg;
220                 const char buf[sizeof(pjsip_dialog *)];
221         } key;
222
223         switch (flags & OBJ_SEARCH_MASK) {
224         case OBJ_SEARCH_KEY:
225                 key.dlg = obj;
226                 break;
227         case OBJ_SEARCH_OBJECT:
228                 object = obj;
229                 key.dlg = object->dlg;
230                 break;
231         default:
232                 /* Hash can only work on something with a full key. */
233                 ast_assert(0);
234                 return 0;
235         }
236         return ast_str_hash_restrict(buf_hash(key.buf, sizeof(key.buf)));
237 }
238
239 static int dialog_associations_cmp(void *obj, void *arg, int flags)
240 {
241         const struct distributor_dialog_data *object_left = obj;
242         const struct distributor_dialog_data *object_right = arg;
243         const pjsip_dialog *right_key = arg;
244         int cmp = 0;
245
246         switch (flags & OBJ_SEARCH_MASK) {
247         case OBJ_SEARCH_OBJECT:
248                 right_key = object_right->dlg;
249                 /* Fall through */
250         case OBJ_SEARCH_KEY:
251                 if (object_left->dlg == right_key) {
252                         cmp = CMP_MATCH;
253                 }
254                 break;
255         case OBJ_SEARCH_PARTIAL_KEY:
256                 /* There is no such thing for this container. */
257                 ast_assert(0);
258                 break;
259         default:
260                 cmp = 0;
261                 break;
262         }
263         return cmp;
264 }
265
266 void ast_sip_dialog_set_serializer(pjsip_dialog *dlg, struct ast_taskprocessor *serializer)
267 {
268         struct distributor_dialog_data *dist;
269
270         ao2_wrlock(dialog_associations);
271         dist = ao2_find(dialog_associations, dlg, OBJ_SEARCH_KEY | OBJ_NOLOCK);
272         if (!dist) {
273                 if (serializer) {
274                         dist = ao2_alloc(sizeof(*dist), NULL);
275                         if (dist) {
276                                 dist->dlg = dlg;
277                                 dist->serializer = serializer;
278                                 ao2_link_flags(dialog_associations, dist, OBJ_NOLOCK);
279                                 ao2_ref(dist, -1);
280                         }
281                 }
282         } else {
283                 ao2_lock(dist);
284                 dist->serializer = serializer;
285                 if (!dist->serializer && !dist->endpoint) {
286                         ao2_unlink_flags(dialog_associations, dist, OBJ_NOLOCK);
287                 }
288                 ao2_unlock(dist);
289                 ao2_ref(dist, -1);
290         }
291         ao2_unlock(dialog_associations);
292 }
293
294 void ast_sip_dialog_set_endpoint(pjsip_dialog *dlg, struct ast_sip_endpoint *endpoint)
295 {
296         struct distributor_dialog_data *dist;
297
298         ao2_wrlock(dialog_associations);
299         dist = ao2_find(dialog_associations, dlg, OBJ_SEARCH_KEY | OBJ_NOLOCK);
300         if (!dist) {
301                 if (endpoint) {
302                         dist = ao2_alloc(sizeof(*dist), NULL);
303                         if (dist) {
304                                 dist->dlg = dlg;
305                                 dist->endpoint = endpoint;
306                                 ao2_link_flags(dialog_associations, dist, OBJ_NOLOCK);
307                                 ao2_ref(dist, -1);
308                         }
309                 }
310         } else {
311                 ao2_lock(dist);
312                 dist->endpoint = endpoint;
313                 if (!dist->serializer && !dist->endpoint) {
314                         ao2_unlink_flags(dialog_associations, dist, OBJ_NOLOCK);
315                 }
316                 ao2_unlock(dist);
317                 ao2_ref(dist, -1);
318         }
319         ao2_unlock(dialog_associations);
320 }
321
322 struct ast_sip_endpoint *ast_sip_dialog_get_endpoint(pjsip_dialog *dlg)
323 {
324         struct distributor_dialog_data *dist;
325         struct ast_sip_endpoint *endpoint;
326
327         dist = ao2_find(dialog_associations, dlg, OBJ_SEARCH_KEY);
328         if (dist) {
329                 ao2_lock(dist);
330                 endpoint = ao2_bump(dist->endpoint);
331                 ao2_unlock(dist);
332                 ao2_ref(dist, -1);
333         } else {
334                 endpoint = NULL;
335         }
336         return endpoint;
337 }
338
339 static pjsip_dialog *find_dialog(pjsip_rx_data *rdata)
340 {
341         pj_str_t tsx_key;
342         pjsip_transaction *tsx;
343         pjsip_dialog *dlg;
344         pj_str_t *local_tag;
345         pj_str_t *remote_tag;
346
347         if (!rdata->msg_info.msg) {
348                 return NULL;
349         }
350
351         if (rdata->msg_info.msg->type == PJSIP_REQUEST_MSG) {
352                 local_tag = &rdata->msg_info.to->tag;
353                 remote_tag = &rdata->msg_info.from->tag;
354         } else {
355                 local_tag = &rdata->msg_info.from->tag;
356                 remote_tag = &rdata->msg_info.to->tag;
357         }
358
359         /* We can only call the convenient method for
360          *  1) responses
361          *  2) non-CANCEL requests
362          *  3) CANCEL requests with a to-tag
363          */
364         if (rdata->msg_info.msg->type == PJSIP_RESPONSE_MSG ||
365                         pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_cancel_method) ||
366                         rdata->msg_info.to->tag.slen != 0) {
367                 dlg = pjsip_ua_find_dialog(&rdata->msg_info.cid->id, local_tag,
368                                 remote_tag, PJ_FALSE);
369                 if (dlg) {
370                         return dlg;
371                 }
372         }
373
374         /*
375          * There may still be a matching dialog if this is
376          * 1) an incoming CANCEL request without a to-tag
377          * 2) an incoming response to a dialog-creating request.
378          */
379         if (rdata->msg_info.msg->type == PJSIP_REQUEST_MSG) {
380                 /* CANCEL requests will need to match the INVITE we initially received. Any
381                  * other request type will either have been matched already or is not in
382                  * dialog
383                  */
384                 pjsip_tsx_create_key(rdata->tp_info.pool, &tsx_key, PJSIP_ROLE_UAS,
385                                 pjsip_get_invite_method(), rdata);
386         } else {
387                 pjsip_tsx_create_key(rdata->tp_info.pool, &tsx_key, PJSIP_ROLE_UAC,
388                                 &rdata->msg_info.cseq->method, rdata);
389         }
390
391         tsx = pjsip_tsx_layer_find_tsx(&tsx_key, PJ_TRUE);
392         if (!tsx) {
393                 ast_debug(3, "Could not find matching transaction for %s\n",
394                         pjsip_rx_data_get_info(rdata));
395                 return NULL;
396         }
397
398         dlg = pjsip_tsx_get_dlg(tsx);
399
400 #ifdef HAVE_PJ_TRANSACTION_GRP_LOCK
401         pj_grp_lock_release(tsx->grp_lock);
402 #else
403         pj_mutex_unlock(tsx->mutex);
404 #endif
405
406         return dlg;
407 }
408
409 /*!
410  * \internal
411  * \brief Compute a hash value on a pjlib string
412  * \since 13.10.0
413  *
414  * \param[in] str The pjlib string to add to the hash
415  * \param[in] hash The hash value to add to
416  *
417  * \details
418  * This version of the function is for when you need to compute a
419  * string hash of more than one string.
420  *
421  * This famous hash algorithm was written by Dan Bernstein and is
422  * commonly used.
423  *
424  * \sa http://www.cse.yorku.ca/~oz/hash.html
425  */
426 static int pjstr_hash_add(pj_str_t *str, int hash)
427 {
428         return buf_hash_add(pj_strbuf(str), pj_strlen(str), hash);
429 }
430
431 /*!
432  * \internal
433  * \brief Compute a hash value on a pjlib string
434  * \since 13.10.0
435  *
436  * \param[in] str The pjlib string to hash
437  *
438  * This famous hash algorithm was written by Dan Bernstein and is
439  * commonly used.
440  *
441  * http://www.cse.yorku.ca/~oz/hash.html
442  */
443 static int pjstr_hash(pj_str_t *str)
444 {
445         return pjstr_hash_add(str, 5381);
446 }
447
448 struct ast_taskprocessor *ast_sip_get_distributor_serializer(pjsip_rx_data *rdata)
449 {
450         int hash;
451         pj_str_t *remote_tag;
452         struct ast_taskprocessor *serializer;
453
454         if (!rdata->msg_info.msg) {
455                 return NULL;
456         }
457
458         if (rdata->msg_info.msg->type == PJSIP_REQUEST_MSG) {
459                 remote_tag = &rdata->msg_info.from->tag;
460         } else {
461                 remote_tag = &rdata->msg_info.to->tag;
462         }
463
464         /* Compute the hash from the SIP message call-id and remote-tag */
465         hash = pjstr_hash(&rdata->msg_info.cid->id);
466         hash = pjstr_hash_add(remote_tag, hash);
467         hash = ast_str_hash_restrict(hash);
468
469         serializer = ao2_bump(distributor_pool[hash % ARRAY_LEN(distributor_pool)]);
470         if (serializer) {
471                 ast_debug(3, "Calculated serializer %s to use for %s\n",
472                         ast_taskprocessor_name(serializer), pjsip_rx_data_get_info(rdata));
473         }
474         return serializer;
475 }
476
477 static pj_bool_t endpoint_lookup(pjsip_rx_data *rdata);
478
479 static pjsip_module endpoint_mod = {
480         .name = {"Endpoint Identifier", 19},
481         .priority = PJSIP_MOD_PRIORITY_TSX_LAYER - 3,
482         .on_rx_request = endpoint_lookup,
483 };
484
485 static pj_bool_t distributor(pjsip_rx_data *rdata)
486 {
487         pjsip_dialog *dlg;
488         struct distributor_dialog_data *dist = NULL;
489         struct ast_taskprocessor *serializer = NULL;
490         pjsip_rx_data *clone;
491
492         if (!ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) {
493                 /*
494                  * Ignore everything until we are fully booted.  Let the
495                  * peer retransmit messages until we are ready.
496                  */
497                 return PJ_TRUE;
498         }
499
500         dlg = find_dialog(rdata);
501         if (dlg) {
502                 ast_debug(3, "Searching for serializer associated with dialog %s for %s\n",
503                         dlg->obj_name, pjsip_rx_data_get_info(rdata));
504                 dist = ao2_find(dialog_associations, dlg, OBJ_SEARCH_KEY);
505                 if (dist) {
506                         ao2_lock(dist);
507                         serializer = ao2_bump(dist->serializer);
508                         ao2_unlock(dist);
509                         if (serializer) {
510                                 ast_debug(3, "Found serializer %s associated with dialog %s\n",
511                                         ast_taskprocessor_name(serializer), dlg->obj_name);
512                         }
513                 }
514         }
515
516         if (serializer) {
517                 /* We have a serializer so we know where to send the message. */
518         } else if (rdata->msg_info.msg->type == PJSIP_RESPONSE_MSG) {
519                 ast_debug(3, "No dialog serializer for %s.  Using request transaction as basis.\n",
520                         pjsip_rx_data_get_info(rdata));
521                 serializer = find_request_serializer(rdata);
522                 if (!serializer) {
523                         /*
524                          * Pick a serializer for the unmatched response.
525                          * We couldn't determine what serializer originally
526                          * sent the request or the serializer is gone.
527                          */
528                         serializer = ast_sip_get_distributor_serializer(rdata);
529                 }
530         } else if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_cancel_method)
531                 || !pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_bye_method)) {
532                 /* We have a BYE or CANCEL request without a serializer. */
533                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata,
534                         PJSIP_SC_CALL_TSX_DOES_NOT_EXIST, NULL, NULL, NULL);
535                 ao2_cleanup(dist);
536                 return PJ_TRUE;
537         } else {
538                 if ((overload_trigger == TASKPROCESSOR_OVERLOAD_TRIGGER_GLOBAL &&
539                         ast_taskprocessor_alert_get())
540                         || (overload_trigger == TASKPROCESSOR_OVERLOAD_TRIGGER_PJSIP_ONLY &&
541                         ast_taskprocessor_get_subsystem_alert("pjsip"))) {
542                         /*
543                          * When taskprocessors get backed up, there is a good chance that
544                          * we are being overloaded and need to defer adding new work to
545                          * the system.  To defer the work we will ignore the request and
546                          * rely on the peer's transport layer to retransmit the message.
547                          * We usually work off the overload within a few seconds.
548                          * If transport is non-UDP we send a 503 response instead.
549                          */
550                         switch (rdata->tp_info.transport->key.type) {
551                         case PJSIP_TRANSPORT_UDP6:
552                         case PJSIP_TRANSPORT_UDP:
553                                 ast_debug(3, "Taskprocessor overload alert: Ignoring '%s'.\n",
554                                         pjsip_rx_data_get_info(rdata));
555                                 break;
556                         default:
557                                 ast_debug(3, "Taskprocessor overload on non-udp transport. Received:'%s'. "
558                                         "Responding with a 503.\n", pjsip_rx_data_get_info(rdata));
559                                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata,
560                                         PJSIP_SC_SERVICE_UNAVAILABLE, NULL, NULL, NULL);
561                                 break;
562                         }
563                         ao2_cleanup(dist);
564                         return PJ_TRUE;
565                 }
566
567                 /* Pick a serializer for the out-of-dialog request. */
568                 serializer = ast_sip_get_distributor_serializer(rdata);
569         }
570
571         if (pjsip_rx_data_clone(rdata, 0, &clone) != PJ_SUCCESS) {
572                 ast_taskprocessor_unreference(serializer);
573                 ao2_cleanup(dist);
574                 return PJ_TRUE;
575         }
576
577         if (dist) {
578                 ao2_lock(dist);
579                 clone->endpt_info.mod_data[endpoint_mod.id] = ao2_bump(dist->endpoint);
580                 ao2_unlock(dist);
581                 ao2_cleanup(dist);
582         }
583
584         if (ast_sip_push_task(serializer, distribute, clone)) {
585                 ao2_cleanup(clone->endpt_info.mod_data[endpoint_mod.id]);
586                 pjsip_rx_data_free_cloned(clone);
587         }
588
589         ast_taskprocessor_unreference(serializer);
590
591         return PJ_TRUE;
592 }
593
594 static struct ast_sip_auth *alloc_artificial_auth(char *default_realm)
595 {
596         struct ast_sip_auth *fake_auth;
597
598         fake_auth = ast_sorcery_alloc(ast_sip_get_sorcery(), SIP_SORCERY_AUTH_TYPE,
599                 "artificial");
600         if (!fake_auth) {
601                 return NULL;
602         }
603
604         ast_string_field_set(fake_auth, realm, default_realm);
605         ast_string_field_set(fake_auth, auth_user, "");
606         ast_string_field_set(fake_auth, auth_pass, "");
607         fake_auth->type = AST_SIP_AUTH_TYPE_ARTIFICIAL;
608
609         return fake_auth;
610 }
611
612 static AO2_GLOBAL_OBJ_STATIC(artificial_auth);
613
614 static int create_artificial_auth(void)
615 {
616         char default_realm[MAX_REALM_LENGTH + 1];
617         struct ast_sip_auth *fake_auth;
618
619         ast_sip_get_default_realm(default_realm, sizeof(default_realm));
620         fake_auth = alloc_artificial_auth(default_realm);
621         if (!fake_auth) {
622                 ast_log(LOG_ERROR, "Unable to create artificial auth\n");
623                 return -1;
624         }
625
626         ao2_global_obj_replace_unref(artificial_auth, fake_auth);
627         ao2_ref(fake_auth, -1);
628         return 0;
629 }
630
631 struct ast_sip_auth *ast_sip_get_artificial_auth(void)
632 {
633         return ao2_global_obj_ref(artificial_auth);
634 }
635
636 static struct ast_sip_endpoint *artificial_endpoint = NULL;
637
638 static int create_artificial_endpoint(void)
639 {
640         artificial_endpoint = ast_sorcery_alloc(ast_sip_get_sorcery(), "endpoint", NULL);
641         if (!artificial_endpoint) {
642                 return -1;
643         }
644
645         AST_VECTOR_INIT(&artificial_endpoint->inbound_auths, 1);
646         /* Pushing a bogus value into the vector will ensure that
647          * the proper size of the vector is returned. This value is
648          * not actually used anywhere
649          */
650         AST_VECTOR_APPEND(&artificial_endpoint->inbound_auths, ast_strdup("artificial-auth"));
651         return 0;
652 }
653
654 struct ast_sip_endpoint *ast_sip_get_artificial_endpoint(void)
655 {
656         ao2_ref(artificial_endpoint, +1);
657         return artificial_endpoint;
658 }
659
660 static void log_failed_request(pjsip_rx_data *rdata, char *msg, unsigned int count, unsigned int period)
661 {
662         char from_buf[PJSIP_MAX_URL_SIZE];
663         char callid_buf[PJSIP_MAX_URL_SIZE];
664         char method_buf[PJSIP_MAX_URL_SIZE];
665         char src_addr_buf[AST_SOCKADDR_BUFLEN];
666         pjsip_uri_print(PJSIP_URI_IN_FROMTO_HDR, rdata->msg_info.from->uri, from_buf, PJSIP_MAX_URL_SIZE);
667         ast_copy_pj_str(callid_buf, &rdata->msg_info.cid->id, PJSIP_MAX_URL_SIZE);
668         ast_copy_pj_str(method_buf, &rdata->msg_info.msg->line.req.method.name, PJSIP_MAX_URL_SIZE);
669         if (count) {
670                 ast_log(LOG_NOTICE, "Request '%s' from '%s' failed for '%s' (callid: %s) - %s"
671                         " after %u tries in %.3f ms\n",
672                         method_buf, from_buf,
673                         pj_sockaddr_print(&rdata->pkt_info.src_addr, src_addr_buf, sizeof(src_addr_buf), 3),
674                         callid_buf, msg, count, period / 1000.0);
675         } else {
676                 ast_log(LOG_NOTICE, "Request '%s' from '%s' failed for '%s' (callid: %s) - %s\n",
677                         method_buf, from_buf,
678                         pj_sockaddr_print(&rdata->pkt_info.src_addr, src_addr_buf, sizeof(src_addr_buf), 3),
679                         callid_buf, msg);
680         }
681 }
682
683 static void check_endpoint(pjsip_rx_data *rdata, struct unidentified_request *unid,
684         const char *name)
685 {
686         int64_t ms = ast_tvdiff_ms(ast_tvnow(), unid->first_seen);
687
688         ao2_wrlock(unid);
689         unid->count++;
690
691         if (ms < (unidentified_period * 1000) && unid->count >= unidentified_count) {
692                 log_failed_request(rdata, "No matching endpoint found", unid->count, ms);
693                 ast_sip_report_invalid_endpoint(name, rdata);
694         }
695         ao2_unlock(unid);
696 }
697
698 static int apply_endpoint_acl(pjsip_rx_data *rdata, struct ast_sip_endpoint *endpoint);
699 static int apply_endpoint_contact_acl(pjsip_rx_data *rdata, struct ast_sip_endpoint *endpoint);
700
701 static void apply_acls(pjsip_rx_data *rdata)
702 {
703         struct ast_sip_endpoint *endpoint;
704
705         /* Is the endpoint allowed with the source or contact address? */
706         endpoint = rdata->endpt_info.mod_data[endpoint_mod.id];
707         if (endpoint != artificial_endpoint
708                 && (apply_endpoint_acl(rdata, endpoint)
709                         || apply_endpoint_contact_acl(rdata, endpoint))) {
710                 ast_debug(1, "Endpoint '%s' not allowed by ACL\n",
711                         ast_sorcery_object_get_id(endpoint));
712
713                 /* Replace the rdata endpoint with the artificial endpoint. */
714                 ao2_replace(rdata->endpt_info.mod_data[endpoint_mod.id], artificial_endpoint);
715         }
716 }
717
718 static pj_bool_t endpoint_lookup(pjsip_rx_data *rdata)
719 {
720         struct ast_sip_endpoint *endpoint;
721         struct unidentified_request *unid;
722         int is_ack = rdata->msg_info.msg->line.req.method.id == PJSIP_ACK_METHOD;
723
724         endpoint = rdata->endpt_info.mod_data[endpoint_mod.id];
725         if (endpoint) {
726                 /*
727                  * ao2_find with OBJ_UNLINK always write locks the container before even searching
728                  * for the object.  Since the majority case is that the object won't be found, do
729                  * the find without OBJ_UNLINK to prevent the unnecessary write lock, then unlink
730                  * if needed.
731                  */
732                 unid = ao2_find(unidentified_requests, rdata->pkt_info.src_name, OBJ_SEARCH_KEY);
733                 if (unid) {
734                         ao2_unlink(unidentified_requests, unid);
735                         ao2_ref(unid, -1);
736                 }
737                 apply_acls(rdata);
738                 return PJ_FALSE;
739         }
740
741         endpoint = ast_sip_identify_endpoint(rdata);
742         if (endpoint) {
743                 unid = ao2_find(unidentified_requests, rdata->pkt_info.src_name, OBJ_SEARCH_KEY);
744                 if (unid) {
745                         ao2_unlink(unidentified_requests, unid);
746                         ao2_ref(unid, -1);
747                 }
748         }
749
750         if (!endpoint) {
751                 /* always use an artificial endpoint - per discussion no reason
752                    to have "alwaysauthreject" as an option.  It is felt using it
753                    was a bug fix and it is not needed since we are not worried about
754                    breaking old stuff and we really don't want to enable the discovery
755                    of SIP accounts */
756                 endpoint = ast_sip_get_artificial_endpoint();
757         }
758
759         /* endpoint ref held by mod_data[] */
760         rdata->endpt_info.mod_data[endpoint_mod.id] = endpoint;
761
762         if (endpoint == artificial_endpoint && !is_ack) {
763                 char name[AST_UUID_STR_LEN] = "";
764                 pjsip_uri *from = rdata->msg_info.from->uri;
765
766                 if (PJSIP_URI_SCHEME_IS_SIP(from) || PJSIP_URI_SCHEME_IS_SIPS(from)) {
767                         pjsip_sip_uri *sip_from = pjsip_uri_get_uri(from);
768                         ast_copy_pj_str(name, &sip_from->user, sizeof(name));
769                 }
770
771                 unid = ao2_find(unidentified_requests, rdata->pkt_info.src_name, OBJ_SEARCH_KEY);
772                 if (unid) {
773                         check_endpoint(rdata, unid, name);
774                         ao2_ref(unid, -1);
775                 } else if (using_auth_username) {
776                         ao2_wrlock(unidentified_requests);
777                         /* Checking again with the write lock held allows us to eliminate the DUPS_REPLACE and sort_fn */
778                         unid = ao2_find(unidentified_requests, rdata->pkt_info.src_name,
779                                 OBJ_SEARCH_KEY | OBJ_NOLOCK);
780                         if (unid) {
781                                 check_endpoint(rdata, unid, name);
782                         } else {
783                                 unid = ao2_alloc_options(sizeof(*unid) + strlen(rdata->pkt_info.src_name) + 1,
784                                         NULL, AO2_ALLOC_OPT_LOCK_RWLOCK);
785                                 if (!unid) {
786                                         ao2_unlock(unidentified_requests);
787                                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
788                                         return PJ_TRUE;
789                                 }
790                                 strcpy(unid->src_name, rdata->pkt_info.src_name); /* Safe */
791                                 unid->first_seen = ast_tvnow();
792                                 unid->count = 1;
793                                 ao2_link_flags(unidentified_requests, unid, OBJ_NOLOCK);
794                         }
795                         ao2_ref(unid, -1);
796                         ao2_unlock(unidentified_requests);
797                 } else {
798                         log_failed_request(rdata, "No matching endpoint found", 0, 0);
799                         ast_sip_report_invalid_endpoint(name, rdata);
800                 }
801         }
802
803         apply_acls(rdata);
804         return PJ_FALSE;
805 }
806
807 static int apply_endpoint_acl(pjsip_rx_data *rdata, struct ast_sip_endpoint *endpoint)
808 {
809         struct ast_sockaddr addr;
810
811         if (ast_acl_list_is_empty(endpoint->acl)) {
812                 return 0;
813         }
814
815         memset(&addr, 0, sizeof(addr));
816         ast_sockaddr_parse(&addr, rdata->pkt_info.src_name, PARSE_PORT_FORBID);
817         ast_sockaddr_set_port(&addr, rdata->pkt_info.src_port);
818
819         if (ast_apply_acl(endpoint->acl, &addr, "SIP ACL: ") != AST_SENSE_ALLOW) {
820                 log_failed_request(rdata, "Not match Endpoint ACL", 0, 0);
821                 ast_sip_report_failed_acl(endpoint, rdata, "not_match_endpoint_acl");
822                 return 1;
823         }
824         return 0;
825 }
826
827 static int extract_contact_addr(pjsip_contact_hdr *contact, struct ast_sockaddr **addrs)
828 {
829         pjsip_sip_uri *sip_uri;
830         char host[256];
831
832         if (!contact || contact->star) {
833                 *addrs = NULL;
834                 return 0;
835         }
836         if (!PJSIP_URI_SCHEME_IS_SIP(contact->uri) && !PJSIP_URI_SCHEME_IS_SIPS(contact->uri)) {
837                 *addrs = NULL;
838                 return 0;
839         }
840         sip_uri = pjsip_uri_get_uri(contact->uri);
841         ast_copy_pj_str(host, &sip_uri->host, sizeof(host));
842         return ast_sockaddr_resolve(addrs, host, PARSE_PORT_FORBID, AST_AF_UNSPEC);
843 }
844
845 static int apply_endpoint_contact_acl(pjsip_rx_data *rdata, struct ast_sip_endpoint *endpoint)
846 {
847         int num_contact_addrs;
848         int forbidden = 0;
849         struct ast_sockaddr *contact_addrs;
850         int i;
851         pjsip_contact_hdr *contact = (pjsip_contact_hdr *)&rdata->msg_info.msg->hdr;
852
853         if (ast_acl_list_is_empty(endpoint->contact_acl)) {
854                 return 0;
855         }
856
857         while ((contact = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_CONTACT, contact->next))) {
858                 num_contact_addrs = extract_contact_addr(contact, &contact_addrs);
859                 if (num_contact_addrs <= 0) {
860                         continue;
861                 }
862                 for (i = 0; i < num_contact_addrs; ++i) {
863                         if (ast_apply_acl(endpoint->contact_acl, &contact_addrs[i], "SIP Contact ACL: ") != AST_SENSE_ALLOW) {
864                                 log_failed_request(rdata, "Not match Endpoint Contact ACL", 0, 0);
865                                 ast_sip_report_failed_acl(endpoint, rdata, "not_match_endpoint_contact_acl");
866                                 forbidden = 1;
867                                 break;
868                         }
869                 }
870                 ast_free(contact_addrs);
871                 if (forbidden) {
872                         /* No use checking other contacts if we already have failed ACL check */
873                         break;
874                 }
875         }
876
877         return forbidden;
878 }
879
880 static pj_bool_t authenticate(pjsip_rx_data *rdata)
881 {
882         RAII_VAR(struct ast_sip_endpoint *, endpoint, ast_pjsip_rdata_get_endpoint(rdata), ao2_cleanup);
883         int is_ack = rdata->msg_info.msg->line.req.method.id == PJSIP_ACK_METHOD;
884
885         ast_assert(endpoint != NULL);
886
887         if (is_ack) {
888                 return PJ_FALSE;
889         }
890
891         if (ast_sip_requires_authentication(endpoint, rdata)) {
892                 pjsip_tx_data *tdata;
893                 struct unidentified_request *unid;
894
895                 pjsip_endpt_create_response(ast_sip_get_pjsip_endpoint(), rdata, 401, NULL, &tdata);
896                 switch (ast_sip_check_authentication(endpoint, rdata, tdata)) {
897                 case AST_SIP_AUTHENTICATION_CHALLENGE:
898                         /* Send the 401 we created for them */
899                         ast_sip_report_auth_challenge_sent(endpoint, rdata, tdata);
900                         if (pjsip_endpt_send_response2(ast_sip_get_pjsip_endpoint(), rdata, tdata, NULL, NULL) != PJ_SUCCESS) {
901                                 pjsip_tx_data_dec_ref(tdata);
902                         }
903                         return PJ_TRUE;
904                 case AST_SIP_AUTHENTICATION_SUCCESS:
905                         /* See note in endpoint_lookup about not holding an unnecessary write lock */
906                         unid = ao2_find(unidentified_requests, rdata->pkt_info.src_name, OBJ_SEARCH_KEY);
907                         if (unid) {
908                                 ao2_unlink(unidentified_requests, unid);
909                                 ao2_ref(unid, -1);
910                         }
911                         ast_sip_report_auth_success(endpoint, rdata);
912                         break;
913                 case AST_SIP_AUTHENTICATION_FAILED:
914                         log_failed_request(rdata, "Failed to authenticate", 0, 0);
915                         ast_sip_report_auth_failed_challenge_response(endpoint, rdata);
916                         if (pjsip_endpt_send_response2(ast_sip_get_pjsip_endpoint(), rdata, tdata, NULL, NULL) != PJ_SUCCESS) {
917                                 pjsip_tx_data_dec_ref(tdata);
918                         }
919                         return PJ_TRUE;
920                 case AST_SIP_AUTHENTICATION_ERROR:
921                         log_failed_request(rdata, "Error to authenticate", 0, 0);
922                         ast_sip_report_auth_failed_challenge_response(endpoint, rdata);
923                         pjsip_tx_data_dec_ref(tdata);
924                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
925                         return PJ_TRUE;
926                 }
927                 pjsip_tx_data_dec_ref(tdata);
928         } else if (endpoint == artificial_endpoint) {
929                 /* Uh. Oh.  The artificial endpoint couldn't challenge so block the request. */
930                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
931                 return PJ_TRUE;
932         }
933
934         return PJ_FALSE;
935 }
936
937 static pjsip_module auth_mod = {
938         .name = {"Request Authenticator", 21},
939         .priority = PJSIP_MOD_PRIORITY_APPLICATION - 2,
940         .on_rx_request = authenticate,
941 };
942
943 static int distribute(void *data)
944 {
945         static pjsip_process_rdata_param param = {
946                 .start_mod = &distributor_mod,
947                 .idx_after_start = 1,
948         };
949         pj_bool_t handled = PJ_FALSE;
950         pjsip_rx_data *rdata = data;
951         int is_request = rdata->msg_info.msg->type == PJSIP_REQUEST_MSG;
952         int is_ack = is_request ? rdata->msg_info.msg->line.req.method.id == PJSIP_ACK_METHOD : 0;
953         struct ast_sip_endpoint *endpoint;
954
955         pjsip_endpt_process_rx_data(ast_sip_get_pjsip_endpoint(), rdata, &param, &handled);
956         if (!handled && is_request && !is_ack) {
957                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 501, NULL, NULL, NULL);
958         }
959
960         /* The endpoint_mod stores an endpoint reference in the mod_data of rdata. This
961          * is the only appropriate spot to actually decrement the reference.
962          */
963         endpoint = rdata->endpt_info.mod_data[endpoint_mod.id];
964         ao2_cleanup(endpoint);
965         pjsip_rx_data_free_cloned(rdata);
966         return 0;
967 }
968
969 struct ast_sip_endpoint *ast_pjsip_rdata_get_endpoint(pjsip_rx_data *rdata)
970 {
971         struct ast_sip_endpoint *endpoint = rdata->endpt_info.mod_data[endpoint_mod.id];
972         if (endpoint) {
973                 ao2_ref(endpoint, +1);
974         }
975         return endpoint;
976 }
977
978 static int suspects_sort(const void *obj, const void *arg, int flags)
979 {
980         const struct unidentified_request *object_left = obj;
981         const struct unidentified_request *object_right = arg;
982         const char *right_key = arg;
983         int cmp;
984
985         switch (flags & OBJ_SEARCH_MASK) {
986         case OBJ_SEARCH_OBJECT:
987                 right_key = object_right->src_name;
988                 /* Fall through */
989         case OBJ_SEARCH_KEY:
990                 cmp = strcmp(object_left->src_name, right_key);
991                 break;
992         case OBJ_SEARCH_PARTIAL_KEY:
993                 cmp = strncmp(object_left->src_name, right_key, strlen(right_key));
994                 break;
995         default:
996                 cmp = 0;
997                 break;
998         }
999         return cmp;
1000 }
1001
1002 static int suspects_compare(void *obj, void *arg, int flags)
1003 {
1004         const struct unidentified_request *object_left = obj;
1005         const struct unidentified_request *object_right = arg;
1006         const char *right_key = arg;
1007         int cmp = 0;
1008
1009         switch (flags & OBJ_SEARCH_MASK) {
1010         case OBJ_SEARCH_OBJECT:
1011                 right_key = object_right->src_name;
1012                 /* Fall through */
1013         case OBJ_SEARCH_KEY:
1014                 if (strcmp(object_left->src_name, right_key) == 0) {
1015                         cmp = CMP_MATCH;
1016                 }
1017                 break;
1018         case OBJ_SEARCH_PARTIAL_KEY:
1019                 if (strncmp(object_left->src_name, right_key, strlen(right_key)) == 0) {
1020                         cmp = CMP_MATCH;
1021                 }
1022                 break;
1023         default:
1024                 cmp = 0;
1025                 break;
1026         }
1027         return cmp;
1028 }
1029
1030 static int suspects_hash(const void *obj, int flags)
1031 {
1032         const struct unidentified_request *object;
1033         const char *key;
1034
1035         switch (flags & OBJ_SEARCH_MASK) {
1036         case OBJ_SEARCH_KEY:
1037                 key = obj;
1038                 break;
1039         case OBJ_SEARCH_OBJECT:
1040                 object = obj;
1041                 key = object->src_name;
1042                 break;
1043         default:
1044                 /* Hash can only work on something with a full key. */
1045                 ast_assert(0);
1046                 return 0;
1047         }
1048         return ast_str_hash(key);
1049 }
1050
1051 static struct ao2_container *cli_unid_get_container(const char *regex)
1052 {
1053         struct ao2_container *s_container;
1054
1055         s_container = ao2_container_alloc_list(AO2_ALLOC_OPT_LOCK_NOLOCK, 0,
1056                 suspects_sort, suspects_compare);
1057         if (!s_container) {
1058                 return NULL;
1059         }
1060
1061         if (ao2_container_dup(s_container, unidentified_requests, 0)) {
1062                 ao2_ref(s_container, -1);
1063                 return NULL;
1064         }
1065
1066         return s_container;
1067 }
1068
1069 static int cli_unid_iterate(void *container, ao2_callback_fn callback, void *args)
1070 {
1071         ao2_callback(container, 0, callback, args);
1072
1073         return 0;
1074 }
1075
1076 static void *cli_unid_retrieve_by_id(const char *id)
1077 {
1078         return ao2_find(unidentified_requests, id, OBJ_SEARCH_KEY);
1079 }
1080
1081 static const char *cli_unid_get_id(const void *obj)
1082 {
1083         const struct unidentified_request *unid = obj;
1084
1085         return unid->src_name;
1086 }
1087
1088 static int cli_unid_print_header(void *obj, void *arg, int flags)
1089 {
1090         struct ast_sip_cli_context *context = arg;
1091         RAII_VAR(struct ast_sip_cli_formatter_entry *, formatter_entry, NULL, ao2_cleanup);
1092
1093         int indent = CLI_INDENT_TO_SPACES(context->indent_level);
1094         int filler = CLI_LAST_TABSTOP - indent - 7;
1095
1096         ast_assert(context->output_buffer != NULL);
1097
1098         ast_str_append(&context->output_buffer, 0,
1099                 "%*s:  <IP Address%*.*s>  <Count> <Age(sec)>\n",
1100                 indent, "Request", filler, filler, CLI_HEADER_FILLER);
1101
1102         return 0;
1103 }
1104
1105 static int cli_unid_print_body(void *obj, void *arg, int flags)
1106 {
1107         struct unidentified_request *unid = obj;
1108         struct ast_sip_cli_context *context = arg;
1109         int indent;
1110         int flexwidth;
1111         int64_t ms = ast_tvdiff_ms(ast_tvnow(), unid->first_seen);
1112
1113         ast_assert(context->output_buffer != NULL);
1114
1115         indent = CLI_INDENT_TO_SPACES(context->indent_level);
1116         flexwidth = CLI_LAST_TABSTOP - 4;
1117
1118         ast_str_append(&context->output_buffer, 0, "%*s:  %-*.*s  %7d %10.3f\n",
1119                 indent,
1120                 "Request",
1121                 flexwidth, flexwidth,
1122                 unid->src_name, unid->count,  ms / 1000.0);
1123
1124         return 0;
1125 }
1126
1127 static struct ast_cli_entry cli_commands[] = {
1128         AST_CLI_DEFINE(ast_sip_cli_traverse_objects, "Show PJSIP Unidentified Requests",
1129                 .command = "pjsip show unidentified_requests",
1130                 .usage = "Usage: pjsip show unidentified_requests\n"
1131                                 "       Show the PJSIP Unidentified Requests\n"),
1132 };
1133
1134 struct ast_sip_cli_formatter_entry *unid_formatter;
1135
1136 static int expire_requests(void *object, void *arg, int flags)
1137 {
1138         struct unidentified_request *unid = object;
1139         int *maxage = arg;
1140         int64_t ms = ast_tvdiff_ms(ast_tvnow(), unid->first_seen);
1141
1142         if (ms > (*maxage) * 2 * 1000) {
1143                 return CMP_MATCH;
1144         }
1145
1146         return 0;
1147 }
1148
1149 static int prune_task(const void *data)
1150 {
1151         unsigned int maxage;
1152
1153         ast_sip_get_unidentified_request_thresholds(&unidentified_count, &unidentified_period, &unidentified_prune_interval);
1154         maxage = unidentified_period * 2;
1155         ao2_callback(unidentified_requests, OBJ_MULTIPLE | OBJ_NODATA | OBJ_UNLINK, expire_requests, &maxage);
1156
1157         return unidentified_prune_interval * 1000;
1158 }
1159
1160 static int clean_task(const void *data)
1161 {
1162         return 0;
1163 }
1164
1165 static void global_loaded(const char *object_type)
1166 {
1167         char default_realm[MAX_REALM_LENGTH + 1];
1168         struct ast_sip_auth *fake_auth;
1169         char *identifier_order;
1170
1171         /* Update using_auth_username */
1172         identifier_order = ast_sip_get_endpoint_identifier_order();
1173         if (identifier_order) {
1174                 char *identify_method;
1175                 char *io_copy = ast_strdupa(identifier_order);
1176                 int new_using = 0;
1177
1178                 ast_free(identifier_order);
1179                 while ((identify_method = ast_strip(strsep(&io_copy, ",")))) {
1180                         if (!strcmp(identify_method, "auth_username")) {
1181                                 new_using = 1;
1182                                 break;
1183                         }
1184                 }
1185                 using_auth_username = new_using;
1186         }
1187
1188         /* Update default_realm of artificial_auth */
1189         ast_sip_get_default_realm(default_realm, sizeof(default_realm));
1190         fake_auth = ast_sip_get_artificial_auth();
1191         if (!fake_auth || strcmp(fake_auth->realm, default_realm)) {
1192                 ao2_cleanup(fake_auth);
1193
1194                 fake_auth = alloc_artificial_auth(default_realm);
1195                 if (fake_auth) {
1196                         ao2_global_obj_replace_unref(artificial_auth, fake_auth);
1197                 }
1198         }
1199         ao2_cleanup(fake_auth);
1200
1201         ast_sip_get_unidentified_request_thresholds(&unidentified_count, &unidentified_period, &unidentified_prune_interval);
1202
1203         overload_trigger = ast_sip_get_taskprocessor_overload_trigger();
1204
1205         /* Clean out the old task, if any */
1206         ast_sched_clean_by_callback(prune_context, prune_task, clean_task);
1207         /* Have to do something with the return value to shut up the stupid compiler. */
1208         if (ast_sched_add_variable(prune_context, unidentified_prune_interval * 1000, prune_task, NULL, 1) < 0) {
1209                 return;
1210         }
1211 }
1212
1213 /*! \brief Observer which is used to update our interval and default_realm when the global setting changes */
1214 static struct ast_sorcery_observer global_observer = {
1215         .loaded = global_loaded,
1216 };
1217
1218 /*!
1219  * \internal
1220  * \brief Shutdown the serializers in the distributor pool.
1221  * \since 13.10.0
1222  *
1223  * \return Nothing
1224  */
1225 static void distributor_pool_shutdown(void)
1226 {
1227         int idx;
1228
1229         for (idx = 0; idx < ARRAY_LEN(distributor_pool); ++idx) {
1230                 ast_taskprocessor_unreference(distributor_pool[idx]);
1231                 distributor_pool[idx] = NULL;
1232         }
1233 }
1234
1235 /*!
1236  * \internal
1237  * \brief Setup the serializers in the distributor pool.
1238  * \since 13.10.0
1239  *
1240  * \retval 0 on success.
1241  * \retval -1 on error.
1242  */
1243 static int distributor_pool_setup(void)
1244 {
1245         char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
1246         int idx;
1247
1248         for (idx = 0; idx < ARRAY_LEN(distributor_pool); ++idx) {
1249                 /* Create name with seq number appended. */
1250                 ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/distributor");
1251
1252                 distributor_pool[idx] = ast_sip_create_serializer(tps_name);
1253                 if (!distributor_pool[idx]) {
1254                         return -1;
1255                 }
1256         }
1257         return 0;
1258 }
1259
1260 int ast_sip_initialize_distributor(void)
1261 {
1262         unidentified_requests = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_RWLOCK, 0,
1263                 DEFAULT_SUSPECTS_BUCKETS, suspects_hash, NULL, suspects_compare);
1264         if (!unidentified_requests) {
1265                 return -1;
1266         }
1267
1268         dialog_associations = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_RWLOCK, 0,
1269                 DIALOG_ASSOCIATIONS_BUCKETS, dialog_associations_hash, NULL,
1270                 dialog_associations_cmp);
1271         if (!dialog_associations) {
1272                 ast_sip_destroy_distributor();
1273                 return -1;
1274         }
1275
1276         if (distributor_pool_setup()) {
1277                 ast_sip_destroy_distributor();
1278                 return -1;
1279         }
1280
1281         prune_context = ast_sched_context_create();
1282         if (!prune_context) {
1283                 ast_sip_destroy_distributor();
1284                 return -1;
1285         }
1286
1287         if (ast_sched_start_thread(prune_context)) {
1288                 ast_sip_destroy_distributor();
1289                 return -1;
1290         }
1291
1292         ast_sorcery_observer_add(ast_sip_get_sorcery(), "global", &global_observer);
1293         ast_sorcery_reload_object(ast_sip_get_sorcery(), "global");
1294
1295         if (create_artificial_endpoint() || create_artificial_auth()) {
1296                 ast_sip_destroy_distributor();
1297                 return -1;
1298         }
1299
1300         if (ast_sip_register_service(&distributor_mod)) {
1301                 ast_sip_destroy_distributor();
1302                 return -1;
1303         }
1304         if (ast_sip_register_service(&endpoint_mod)) {
1305                 ast_sip_destroy_distributor();
1306                 return -1;
1307         }
1308         if (ast_sip_register_service(&auth_mod)) {
1309                 ast_sip_destroy_distributor();
1310                 return -1;
1311         }
1312
1313         unid_formatter = ao2_alloc_options(sizeof(struct ast_sip_cli_formatter_entry), NULL,
1314                 AO2_ALLOC_OPT_LOCK_NOLOCK);
1315         if (!unid_formatter) {
1316                 ast_sip_destroy_distributor();
1317                 ast_log(LOG_ERROR, "Unable to allocate memory for unid_formatter\n");
1318                 return -1;
1319         }
1320         unid_formatter->name = "unidentified_request";
1321         unid_formatter->print_header = cli_unid_print_header;
1322         unid_formatter->print_body = cli_unid_print_body;
1323         unid_formatter->get_container = cli_unid_get_container;
1324         unid_formatter->iterate = cli_unid_iterate;
1325         unid_formatter->get_id = cli_unid_get_id;
1326         unid_formatter->retrieve_by_id = cli_unid_retrieve_by_id;
1327         ast_sip_register_cli_formatter(unid_formatter);
1328
1329         ast_cli_register_multiple(cli_commands, ARRAY_LEN(cli_commands));
1330
1331         return 0;
1332 }
1333
1334 void ast_sip_destroy_distributor(void)
1335 {
1336         ast_cli_unregister_multiple(cli_commands, ARRAY_LEN(cli_commands));
1337         ast_sip_unregister_cli_formatter(unid_formatter);
1338
1339         ast_sip_unregister_service(&auth_mod);
1340         ast_sip_unregister_service(&endpoint_mod);
1341         ast_sip_unregister_service(&distributor_mod);
1342
1343         ao2_global_obj_release(artificial_auth);
1344         ao2_cleanup(artificial_endpoint);
1345
1346         ast_sorcery_observer_remove(ast_sip_get_sorcery(), "global", &global_observer);
1347
1348         if (prune_context) {
1349                 ast_sched_context_destroy(prune_context);
1350         }
1351
1352         distributor_pool_shutdown();
1353
1354         ao2_cleanup(dialog_associations);
1355         ao2_cleanup(unidentified_requests);
1356 }