9326613785731faad725d9a528bd65c81bda2145
[asterisk/asterisk.git] / res / res_pjsip / pjsip_distributor.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * Mark Michelson <mmichelson@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 #include "asterisk.h"
20
21 #include <pjsip.h>
22
23 #include "asterisk/res_pjsip.h"
24 #include "asterisk/acl.h"
25 #include "include/res_pjsip_private.h"
26 #include "asterisk/taskprocessor.h"
27 #include "asterisk/threadpool.h"
28 #include "asterisk/res_pjsip_cli.h"
29
30 static int distribute(void *data);
31 static pj_bool_t distributor(pjsip_rx_data *rdata);
32 static pj_status_t record_serializer(pjsip_tx_data *tdata);
33
34 static pjsip_module distributor_mod = {
35         .name = {"Request Distributor", 19},
36         .priority = PJSIP_MOD_PRIORITY_TSX_LAYER - 6,
37         .on_tx_request = record_serializer,
38         .on_rx_request = distributor,
39         .on_rx_response = distributor,
40 };
41
42 struct ast_sched_context *prune_context;
43
44 /* From the auth/realm realtime column size */
45 #define MAX_REALM_LENGTH 40
46 static char default_realm[MAX_REALM_LENGTH + 1];
47
48 #define DEFAULT_SUSPECTS_BUCKETS 53
49
50 static struct ao2_container *unidentified_requests;
51 static unsigned int unidentified_count;
52 static unsigned int unidentified_period;
53 static unsigned int unidentified_prune_interval;
54 static int using_auth_username;
55
56 struct unidentified_request{
57         struct timeval first_seen;
58         int count;
59         char src_name[];
60 };
61
62 /*! Number of serializers in pool if one not otherwise known.  (Best if prime number) */
63 #define DISTRIBUTOR_POOL_SIZE           31
64
65 /*! Pool of serializers to use if not supplied. */
66 static struct ast_taskprocessor *distributor_pool[DISTRIBUTOR_POOL_SIZE];
67
68 /*!
69  * \internal
70  * \brief Record the task's serializer name on the tdata structure.
71  * \since 14.0.0
72  *
73  * \param tdata The outgoing message.
74  *
75  * \retval PJ_SUCCESS.
76  */
77 static pj_status_t record_serializer(pjsip_tx_data *tdata)
78 {
79         struct ast_taskprocessor *serializer;
80
81         serializer = ast_threadpool_serializer_get_current();
82         if (serializer) {
83                 const char *name;
84
85                 name = ast_taskprocessor_name(serializer);
86                 if (!ast_strlen_zero(name)
87                         && (!tdata->mod_data[distributor_mod.id]
88                                 || strcmp(tdata->mod_data[distributor_mod.id], name))) {
89                         char *tdata_name;
90
91                         /* The serializer in use changed. */
92                         tdata_name = pj_pool_alloc(tdata->pool, strlen(name) + 1);
93                         strcpy(tdata_name, name);/* Safe */
94
95                         tdata->mod_data[distributor_mod.id] = tdata_name;
96                 }
97         }
98
99         return PJ_SUCCESS;
100 }
101
102 /*!
103  * \internal
104  * \brief Find the request tdata to get the serializer it used.
105  * \since 14.0.0
106  *
107  * \param rdata The incoming message.
108  *
109  * \retval serializer on success.
110  * \retval NULL on error or could not find the serializer.
111  */
112 static struct ast_taskprocessor *find_request_serializer(pjsip_rx_data *rdata)
113 {
114         struct ast_taskprocessor *serializer = NULL;
115         pj_str_t tsx_key;
116         pjsip_transaction *tsx;
117
118         pjsip_tsx_create_key(rdata->tp_info.pool, &tsx_key, PJSIP_ROLE_UAC,
119                 &rdata->msg_info.cseq->method, rdata);
120
121         tsx = pjsip_tsx_layer_find_tsx(&tsx_key, PJ_TRUE);
122         if (!tsx) {
123                 ast_debug(1, "Could not find %.*s transaction for %d response.\n",
124                         (int) pj_strlen(&rdata->msg_info.cseq->method.name),
125                         pj_strbuf(&rdata->msg_info.cseq->method.name),
126                         rdata->msg_info.msg->line.status.code);
127                 return NULL;
128         }
129
130         if (tsx->last_tx) {
131                 const char *serializer_name;
132
133                 serializer_name = tsx->last_tx->mod_data[distributor_mod.id];
134                 if (!ast_strlen_zero(serializer_name)) {
135                         serializer = ast_taskprocessor_get(serializer_name, TPS_REF_IF_EXISTS);
136                         if (serializer) {
137                                 ast_debug(3, "Found serializer %s on transaction %s\n",
138                                                 serializer_name, tsx->obj_name);
139                         }
140                 }
141         }
142
143 #ifdef HAVE_PJ_TRANSACTION_GRP_LOCK
144         pj_grp_lock_release(tsx->grp_lock);
145 #else
146         pj_mutex_unlock(tsx->mutex);
147 #endif
148
149         return serializer;
150 }
151
152 /*! Dialog-specific information the distributor uses */
153 struct distributor_dialog_data {
154         /*! Serializer to distribute tasks to for this dialog */
155         struct ast_taskprocessor *serializer;
156         /*! Endpoint associated with this dialog */
157         struct ast_sip_endpoint *endpoint;
158 };
159
160 /*!
161  * \internal
162  *
163  * \note Call this with the dialog locked
164  */
165 static struct distributor_dialog_data *distributor_dialog_data_alloc(pjsip_dialog *dlg)
166 {
167         struct distributor_dialog_data *dist;
168
169         dist = PJ_POOL_ZALLOC_T(dlg->pool, struct distributor_dialog_data);
170         pjsip_dlg_set_mod_data(dlg, distributor_mod.id, dist);
171
172         return dist;
173 }
174
175 void ast_sip_dialog_set_serializer(pjsip_dialog *dlg, struct ast_taskprocessor *serializer)
176 {
177         struct distributor_dialog_data *dist;
178         SCOPED_LOCK(lock, dlg, pjsip_dlg_inc_lock, pjsip_dlg_dec_lock);
179
180         dist = pjsip_dlg_get_mod_data(dlg, distributor_mod.id);
181         if (!dist) {
182                 dist = distributor_dialog_data_alloc(dlg);
183         }
184         dist->serializer = serializer;
185 }
186
187 void ast_sip_dialog_set_endpoint(pjsip_dialog *dlg, struct ast_sip_endpoint *endpoint)
188 {
189         struct distributor_dialog_data *dist;
190         SCOPED_LOCK(lock, dlg, pjsip_dlg_inc_lock, pjsip_dlg_dec_lock);
191
192         dist = pjsip_dlg_get_mod_data(dlg, distributor_mod.id);
193         if (!dist) {
194                 dist = distributor_dialog_data_alloc(dlg);
195         }
196         dist->endpoint = endpoint;
197 }
198
199 struct ast_sip_endpoint *ast_sip_dialog_get_endpoint(pjsip_dialog *dlg)
200 {
201         struct distributor_dialog_data *dist;
202         SCOPED_LOCK(lock, dlg, pjsip_dlg_inc_lock, pjsip_dlg_dec_lock);
203
204         dist = pjsip_dlg_get_mod_data(dlg, distributor_mod.id);
205         if (!dist || !dist->endpoint) {
206                 return NULL;
207         }
208         ao2_ref(dist->endpoint, +1);
209         return dist->endpoint;
210 }
211
212 static pjsip_dialog *find_dialog(pjsip_rx_data *rdata)
213 {
214         pj_str_t tsx_key;
215         pjsip_transaction *tsx;
216         pjsip_dialog *dlg;
217         pj_str_t *local_tag;
218         pj_str_t *remote_tag;
219
220         if (!rdata->msg_info.msg) {
221                 return NULL;
222         }
223
224         if (rdata->msg_info.msg->type == PJSIP_REQUEST_MSG) {
225                 local_tag = &rdata->msg_info.to->tag;
226                 remote_tag = &rdata->msg_info.from->tag;
227         } else {
228                 local_tag = &rdata->msg_info.from->tag;
229                 remote_tag = &rdata->msg_info.to->tag;
230         }
231
232         /* We can only call the convenient method for
233          *  1) responses
234          *  2) non-CANCEL requests
235          *  3) CANCEL requests with a to-tag
236          */
237         if (rdata->msg_info.msg->type == PJSIP_RESPONSE_MSG ||
238                         pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_cancel_method) ||
239                         rdata->msg_info.to->tag.slen != 0) {
240                 dlg = pjsip_ua_find_dialog(&rdata->msg_info.cid->id, local_tag,
241                                 remote_tag, PJ_TRUE);
242                 if (dlg) {
243                         return dlg;
244                 }
245         }
246
247         /*
248          * There may still be a matching dialog if this is
249          * 1) an incoming CANCEL request without a to-tag
250          * 2) an incoming response to a dialog-creating request.
251          */
252         if (rdata->msg_info.msg->type == PJSIP_REQUEST_MSG) {
253                 /* CANCEL requests will need to match the INVITE we initially received. Any
254                  * other request type will either have been matched already or is not in
255                  * dialog
256                  */
257                 pjsip_tsx_create_key(rdata->tp_info.pool, &tsx_key, PJSIP_ROLE_UAS,
258                                 pjsip_get_invite_method(), rdata);
259         } else {
260                 pjsip_tsx_create_key(rdata->tp_info.pool, &tsx_key, PJSIP_ROLE_UAC,
261                                 &rdata->msg_info.cseq->method, rdata);
262         }
263
264         tsx = pjsip_tsx_layer_find_tsx(&tsx_key, PJ_TRUE);
265         if (!tsx) {
266                 ast_debug(3, "Could not find matching transaction for %s\n",
267                         pjsip_rx_data_get_info(rdata));
268                 return NULL;
269         }
270
271         dlg = pjsip_tsx_get_dlg(tsx);
272
273 #ifdef HAVE_PJ_TRANSACTION_GRP_LOCK
274         pj_grp_lock_release(tsx->grp_lock);
275 #else
276         pj_mutex_unlock(tsx->mutex);
277 #endif
278
279         if (!dlg) {
280                 return NULL;
281         }
282
283         pjsip_dlg_inc_lock(dlg);
284         return dlg;
285 }
286
287 /*!
288  * \internal
289  * \brief Compute a hash value on a pjlib string
290  * \since 13.10.0
291  *
292  * \param[in] str The pjlib string to add to the hash
293  * \param[in] hash The hash value to add to
294  *
295  * \details
296  * This version of the function is for when you need to compute a
297  * string hash of more than one string.
298  *
299  * This famous hash algorithm was written by Dan Bernstein and is
300  * commonly used.
301  *
302  * \sa http://www.cse.yorku.ca/~oz/hash.html
303  */
304 static int pjstr_hash_add(pj_str_t *str, int hash)
305 {
306         size_t len;
307         const char *pos;
308
309         len = pj_strlen(str);
310         pos = pj_strbuf(str);
311         while (len--) {
312                 hash = hash * 33 ^ *pos++;
313         }
314
315         return hash;
316 }
317
318 /*!
319  * \internal
320  * \brief Compute a hash value on a pjlib string
321  * \since 13.10.0
322  *
323  * \param[in] str The pjlib string to hash
324  *
325  * This famous hash algorithm was written by Dan Bernstein and is
326  * commonly used.
327  *
328  * http://www.cse.yorku.ca/~oz/hash.html
329  */
330 static int pjstr_hash(pj_str_t *str)
331 {
332         return pjstr_hash_add(str, 5381);
333 }
334
335 struct ast_taskprocessor *ast_sip_get_distributor_serializer(pjsip_rx_data *rdata)
336 {
337         int hash;
338         pj_str_t *remote_tag;
339         struct ast_taskprocessor *serializer;
340
341         if (!rdata->msg_info.msg) {
342                 return NULL;
343         }
344
345         if (rdata->msg_info.msg->type == PJSIP_REQUEST_MSG) {
346                 remote_tag = &rdata->msg_info.from->tag;
347         } else {
348                 remote_tag = &rdata->msg_info.to->tag;
349         }
350
351         /* Compute the hash from the SIP message call-id and remote-tag */
352         hash = pjstr_hash(&rdata->msg_info.cid->id);
353         hash = pjstr_hash_add(remote_tag, hash);
354         hash = abs(hash);
355
356         serializer = ao2_bump(distributor_pool[hash % ARRAY_LEN(distributor_pool)]);
357         if (serializer) {
358                 ast_debug(3, "Calculated serializer %s to use for %s\n",
359                         ast_taskprocessor_name(serializer), pjsip_rx_data_get_info(rdata));
360         }
361         return serializer;
362 }
363
364 static pj_bool_t endpoint_lookup(pjsip_rx_data *rdata);
365
366 static pjsip_module endpoint_mod = {
367         .name = {"Endpoint Identifier", 19},
368         .priority = PJSIP_MOD_PRIORITY_TSX_LAYER - 3,
369         .on_rx_request = endpoint_lookup,
370 };
371
372 static pj_bool_t distributor(pjsip_rx_data *rdata)
373 {
374         pjsip_dialog *dlg;
375         struct distributor_dialog_data *dist = NULL;
376         struct ast_taskprocessor *serializer = NULL;
377         pjsip_rx_data *clone;
378
379         if (!ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) {
380                 /*
381                  * Ignore everything until we are fully booted.  Let the
382                  * peer retransmit messages until we are ready.
383                  */
384                 return PJ_TRUE;
385         }
386
387         dlg = find_dialog(rdata);
388         if (dlg) {
389                 ast_debug(3, "Searching for serializer on dialog %s for %s\n",
390                         dlg->obj_name, pjsip_rx_data_get_info(rdata));
391                 dist = pjsip_dlg_get_mod_data(dlg, distributor_mod.id);
392                 if (dist) {
393                         serializer = ao2_bump(dist->serializer);
394                         if (serializer) {
395                                 ast_debug(3, "Found serializer %s on dialog %s\n",
396                                         ast_taskprocessor_name(serializer), dlg->obj_name);
397                         }
398                 }
399                 pjsip_dlg_dec_lock(dlg);
400         }
401
402         if (serializer) {
403                 /* We have a serializer so we know where to send the message. */
404         } else if (rdata->msg_info.msg->type == PJSIP_RESPONSE_MSG) {
405                 ast_debug(3, "No dialog serializer for response %s. Using request transaction as basis\n",
406                         pjsip_rx_data_get_info(rdata));
407                 serializer = find_request_serializer(rdata);
408                 if (!serializer) {
409                         if (ast_taskprocessor_alert_get()) {
410                                 /* We're overloaded, ignore the unmatched response. */
411                                 ast_debug(3, "Taskprocessor overload alert: Ignoring unmatched '%s'.\n",
412                                         pjsip_rx_data_get_info(rdata));
413                                 return PJ_TRUE;
414                         }
415
416                         /*
417                          * Pick a serializer for the unmatched response.  Maybe
418                          * the stack can figure out what it is for, or we really
419                          * should just toss it regardless.
420                          */
421                         serializer = ast_sip_get_distributor_serializer(rdata);
422                 }
423         } else if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_cancel_method)
424                 || !pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_bye_method)) {
425                 /* We have a BYE or CANCEL request without a serializer. */
426                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata,
427                         PJSIP_SC_CALL_TSX_DOES_NOT_EXIST, NULL, NULL, NULL);
428                 return PJ_TRUE;
429         } else {
430                 if (ast_taskprocessor_alert_get()) {
431                         /*
432                          * When taskprocessors get backed up, there is a good chance that
433                          * we are being overloaded and need to defer adding new work to
434                          * the system.  To defer the work we will ignore the request and
435                          * rely on the peer's transport layer to retransmit the message.
436                          * We usually work off the overload within a few seconds.  The
437                          * alternative is to send back a 503 response to these requests
438                          * and be done with it.
439                          */
440                         ast_debug(3, "Taskprocessor overload alert: Ignoring '%s'.\n",
441                                 pjsip_rx_data_get_info(rdata));
442                         return PJ_TRUE;
443                 }
444
445                 /* Pick a serializer for the out-of-dialog request. */
446                 serializer = ast_sip_get_distributor_serializer(rdata);
447         }
448
449         pjsip_rx_data_clone(rdata, 0, &clone);
450
451         if (dist) {
452                 clone->endpt_info.mod_data[endpoint_mod.id] = ao2_bump(dist->endpoint);
453         }
454
455         if (ast_sip_push_task(serializer, distribute, clone)) {
456                 ao2_cleanup(clone->endpt_info.mod_data[endpoint_mod.id]);
457                 pjsip_rx_data_free_cloned(clone);
458         }
459
460         ast_taskprocessor_unreference(serializer);
461
462         return PJ_TRUE;
463 }
464
465 static struct ast_sip_auth *artificial_auth;
466
467 static int create_artificial_auth(void)
468 {
469         if (!(artificial_auth = ast_sorcery_alloc(
470                       ast_sip_get_sorcery(), SIP_SORCERY_AUTH_TYPE, "artificial"))) {
471                 ast_log(LOG_ERROR, "Unable to create artificial auth\n");
472                 return -1;
473         }
474
475         ast_string_field_set(artificial_auth, realm, default_realm);
476         ast_string_field_set(artificial_auth, auth_user, "");
477         ast_string_field_set(artificial_auth, auth_pass, "");
478         artificial_auth->type = AST_SIP_AUTH_TYPE_ARTIFICIAL;
479         return 0;
480 }
481
482 struct ast_sip_auth *ast_sip_get_artificial_auth(void)
483 {
484         ao2_ref(artificial_auth, +1);
485         return artificial_auth;
486 }
487
488 static struct ast_sip_endpoint *artificial_endpoint = NULL;
489
490 static int create_artificial_endpoint(void)
491 {
492         if (!(artificial_endpoint = ast_sorcery_alloc(
493                       ast_sip_get_sorcery(), "endpoint", NULL))) {
494                 return -1;
495         }
496
497         AST_VECTOR_INIT(&artificial_endpoint->inbound_auths, 1);
498         /* Pushing a bogus value into the vector will ensure that
499          * the proper size of the vector is returned. This value is
500          * not actually used anywhere
501          */
502         AST_VECTOR_APPEND(&artificial_endpoint->inbound_auths, ast_strdup("artificial-auth"));
503         return 0;
504 }
505
506 struct ast_sip_endpoint *ast_sip_get_artificial_endpoint(void)
507 {
508         ao2_ref(artificial_endpoint, +1);
509         return artificial_endpoint;
510 }
511
512 static void log_failed_request(pjsip_rx_data *rdata, char *msg, unsigned int count, unsigned int period)
513 {
514         char from_buf[PJSIP_MAX_URL_SIZE];
515         char callid_buf[PJSIP_MAX_URL_SIZE];
516         char method_buf[PJSIP_MAX_URL_SIZE];
517         pjsip_uri_print(PJSIP_URI_IN_FROMTO_HDR, rdata->msg_info.from->uri, from_buf, PJSIP_MAX_URL_SIZE);
518         ast_copy_pj_str(callid_buf, &rdata->msg_info.cid->id, PJSIP_MAX_URL_SIZE);
519         ast_copy_pj_str(method_buf, &rdata->msg_info.msg->line.req.method.name, PJSIP_MAX_URL_SIZE);
520         if (count) {
521                 ast_log(LOG_NOTICE, "Request '%s' from '%s' failed for '%s:%d' (callid: %s) - %s"
522                         " after %u tries in %.3f ms\n",
523                         method_buf, from_buf, rdata->pkt_info.src_name, rdata->pkt_info.src_port, callid_buf, msg, count, period / 1000.0);
524         } else {
525                 ast_log(LOG_NOTICE, "Request '%s' from '%s' failed for '%s:%d' (callid: %s) - %s\n",
526                         method_buf, from_buf, rdata->pkt_info.src_name, rdata->pkt_info.src_port, callid_buf, msg);
527         }
528 }
529
530 static void check_endpoint(pjsip_rx_data *rdata, struct unidentified_request *unid,
531         const char *name)
532 {
533         int64_t ms = ast_tvdiff_ms(ast_tvnow(), unid->first_seen);
534
535         ao2_wrlock(unid);
536         unid->count++;
537
538         if (ms < (unidentified_period * 1000) && unid->count >= unidentified_count) {
539                 log_failed_request(rdata, "No matching endpoint found", unid->count, ms);
540                 ast_sip_report_invalid_endpoint(name, rdata);
541         }
542         ao2_unlock(unid);
543 }
544
545 static pj_bool_t endpoint_lookup(pjsip_rx_data *rdata)
546 {
547         struct ast_sip_endpoint *endpoint;
548         struct unidentified_request *unid;
549         int is_ack = rdata->msg_info.msg->line.req.method.id == PJSIP_ACK_METHOD;
550
551         endpoint = rdata->endpt_info.mod_data[endpoint_mod.id];
552         if (endpoint) {
553                 /*
554                  * ao2_find with OBJ_UNLINK always write locks the container before even searching
555                  * for the object.  Since the majority case is that the object won't be found, do
556                  * the find without OBJ_UNLINK to prevent the unnecessary write lock, then unlink
557                  * if needed.
558                  */
559                 if ((unid = ao2_find(unidentified_requests, rdata->pkt_info.src_name, OBJ_SEARCH_KEY))) {
560                         ao2_unlink(unidentified_requests, unid);
561                         ao2_ref(unid, -1);
562                 }
563                 return PJ_FALSE;
564         }
565
566         endpoint = ast_sip_identify_endpoint(rdata);
567         if (endpoint) {
568                 if ((unid = ao2_find(unidentified_requests, rdata->pkt_info.src_name, OBJ_SEARCH_KEY))) {
569                         ao2_unlink(unidentified_requests, unid);
570                         ao2_ref(unid, -1);
571                 }
572         }
573
574         if (!endpoint) {
575
576                 /* always use an artificial endpoint - per discussion no reason
577                    to have "alwaysauthreject" as an option.  It is felt using it
578                    was a bug fix and it is not needed since we are not worried about
579                    breaking old stuff and we really don't want to enable the discovery
580                    of SIP accounts */
581                 endpoint = ast_sip_get_artificial_endpoint();
582         }
583
584         rdata->endpt_info.mod_data[endpoint_mod.id] = endpoint;
585
586         if (!is_ack) {
587                 char name[AST_UUID_STR_LEN] = "";
588                 pjsip_uri *from = rdata->msg_info.from->uri;
589
590                 if (PJSIP_URI_SCHEME_IS_SIP(from) || PJSIP_URI_SCHEME_IS_SIPS(from)) {
591                         pjsip_sip_uri *sip_from = pjsip_uri_get_uri(from);
592                         ast_copy_pj_str(name, &sip_from->user, sizeof(name));
593                 }
594
595                 if ((unid = ao2_find(unidentified_requests, rdata->pkt_info.src_name, OBJ_SEARCH_KEY))) {
596                         check_endpoint(rdata, unid, name);
597                         ao2_ref(unid, -1);
598                 } else if (using_auth_username) {
599                         ao2_wrlock(unidentified_requests);
600                         /* The check again with the write lock held allows us to eliminate the DUPS_REPLACE and sort_fn */
601                         if ((unid = ao2_find(unidentified_requests, rdata->pkt_info.src_name, OBJ_SEARCH_KEY | OBJ_NOLOCK))) {
602                                 check_endpoint(rdata, unid, name);
603                         } else {
604                                 unid = ao2_alloc_options(sizeof(*unid) + strlen(rdata->pkt_info.src_name) + 1, NULL,
605                                         AO2_ALLOC_OPT_LOCK_RWLOCK);
606                                 if (!unid) {
607                                         ao2_unlock(unidentified_requests);
608                                         return PJ_TRUE;
609                                 }
610                                 strcpy(unid->src_name, rdata->pkt_info.src_name); /* Safe */
611                                 unid->first_seen = ast_tvnow();
612                                 unid->count = 1;
613                                 ao2_link_flags(unidentified_requests, unid, OBJ_NOLOCK);
614                         }
615                         ao2_ref(unid, -1);
616                         ao2_unlock(unidentified_requests);
617                 } else {
618                         log_failed_request(rdata, "No matching endpoint found", 0, 0);
619                         ast_sip_report_invalid_endpoint(name, rdata);
620                 }
621         }
622         return PJ_FALSE;
623 }
624
625 static int apply_endpoint_acl(pjsip_rx_data *rdata, struct ast_sip_endpoint *endpoint)
626 {
627         struct ast_sockaddr addr;
628
629         if (ast_acl_list_is_empty(endpoint->acl)) {
630                 return 0;
631         }
632
633         memset(&addr, 0, sizeof(addr));
634         ast_sockaddr_parse(&addr, rdata->pkt_info.src_name, PARSE_PORT_FORBID);
635         ast_sockaddr_set_port(&addr, rdata->pkt_info.src_port);
636
637         if (ast_apply_acl(endpoint->acl, &addr, "SIP ACL: ") != AST_SENSE_ALLOW) {
638                 log_failed_request(rdata, "Not match Endpoint ACL", 0, 0);
639                 ast_sip_report_failed_acl(endpoint, rdata, "not_match_endpoint_acl");
640                 return 1;
641         }
642         return 0;
643 }
644
645 static int extract_contact_addr(pjsip_contact_hdr *contact, struct ast_sockaddr **addrs)
646 {
647         pjsip_sip_uri *sip_uri;
648         char host[256];
649
650         if (!contact || contact->star) {
651                 *addrs = NULL;
652                 return 0;
653         }
654         if (!PJSIP_URI_SCHEME_IS_SIP(contact->uri) && !PJSIP_URI_SCHEME_IS_SIPS(contact->uri)) {
655                 *addrs = NULL;
656                 return 0;
657         }
658         sip_uri = pjsip_uri_get_uri(contact->uri);
659         ast_copy_pj_str(host, &sip_uri->host, sizeof(host));
660         return ast_sockaddr_resolve(addrs, host, PARSE_PORT_FORBID, AST_AF_UNSPEC);
661 }
662
663 static int apply_endpoint_contact_acl(pjsip_rx_data *rdata, struct ast_sip_endpoint *endpoint)
664 {
665         int num_contact_addrs;
666         int forbidden = 0;
667         struct ast_sockaddr *contact_addrs;
668         int i;
669         pjsip_contact_hdr *contact = (pjsip_contact_hdr *)&rdata->msg_info.msg->hdr;
670
671         if (ast_acl_list_is_empty(endpoint->contact_acl)) {
672                 return 0;
673         }
674
675         while ((contact = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_CONTACT, contact->next))) {
676                 num_contact_addrs = extract_contact_addr(contact, &contact_addrs);
677                 if (num_contact_addrs <= 0) {
678                         continue;
679                 }
680                 for (i = 0; i < num_contact_addrs; ++i) {
681                         if (ast_apply_acl(endpoint->contact_acl, &contact_addrs[i], "SIP Contact ACL: ") != AST_SENSE_ALLOW) {
682                                 log_failed_request(rdata, "Not match Endpoint Contact ACL", 0, 0);
683                                 ast_sip_report_failed_acl(endpoint, rdata, "not_match_endpoint_contact_acl");
684                                 forbidden = 1;
685                                 break;
686                         }
687                 }
688                 ast_free(contact_addrs);
689                 if (forbidden) {
690                         /* No use checking other contacts if we already have failed ACL check */
691                         break;
692                 }
693         }
694
695         return forbidden;
696 }
697
698 static pj_bool_t authenticate(pjsip_rx_data *rdata)
699 {
700         RAII_VAR(struct ast_sip_endpoint *, endpoint, ast_pjsip_rdata_get_endpoint(rdata), ao2_cleanup);
701         int is_ack = rdata->msg_info.msg->line.req.method.id == PJSIP_ACK_METHOD;
702
703         ast_assert(endpoint != NULL);
704
705         if (endpoint!=artificial_endpoint) {
706                 if (apply_endpoint_acl(rdata, endpoint) || apply_endpoint_contact_acl(rdata, endpoint)) {
707                         if (!is_ack) {
708                                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 403, NULL, NULL, NULL);
709                         }
710                         return PJ_TRUE;
711                 }
712         }
713
714         if (!is_ack && ast_sip_requires_authentication(endpoint, rdata)) {
715                 pjsip_tx_data *tdata;
716                 struct unidentified_request *unid;
717
718                 pjsip_endpt_create_response(ast_sip_get_pjsip_endpoint(), rdata, 401, NULL, &tdata);
719                 switch (ast_sip_check_authentication(endpoint, rdata, tdata)) {
720                 case AST_SIP_AUTHENTICATION_CHALLENGE:
721                         /* Send the 401 we created for them */
722                         ast_sip_report_auth_challenge_sent(endpoint, rdata, tdata);
723                         pjsip_endpt_send_response2(ast_sip_get_pjsip_endpoint(), rdata, tdata, NULL, NULL);
724                         return PJ_TRUE;
725                 case AST_SIP_AUTHENTICATION_SUCCESS:
726                         /* See note in endpoint_lookup about not holding an unnecessary write lock */
727                         if ((unid = ao2_find(unidentified_requests, rdata->pkt_info.src_name, OBJ_SEARCH_KEY))) {
728                                 ao2_unlink(unidentified_requests, unid);
729                                 ao2_ref(unid, -1);
730                         }
731                         ast_sip_report_auth_success(endpoint, rdata);
732                         pjsip_tx_data_dec_ref(tdata);
733                         return PJ_FALSE;
734                 case AST_SIP_AUTHENTICATION_FAILED:
735                         log_failed_request(rdata, "Failed to authenticate", 0, 0);
736                         ast_sip_report_auth_failed_challenge_response(endpoint, rdata);
737                         pjsip_endpt_send_response2(ast_sip_get_pjsip_endpoint(), rdata, tdata, NULL, NULL);
738                         return PJ_TRUE;
739                 case AST_SIP_AUTHENTICATION_ERROR:
740                         log_failed_request(rdata, "Error to authenticate", 0, 0);
741                         ast_sip_report_auth_failed_challenge_response(endpoint, rdata);
742                         pjsip_tx_data_dec_ref(tdata);
743                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
744                         return PJ_TRUE;
745                 }
746         }
747
748         return PJ_FALSE;
749 }
750
751 static pjsip_module auth_mod = {
752         .name = {"Request Authenticator", 21},
753         .priority = PJSIP_MOD_PRIORITY_APPLICATION - 2,
754         .on_rx_request = authenticate,
755 };
756
757 static int distribute(void *data)
758 {
759         static pjsip_process_rdata_param param = {
760                 .start_mod = &distributor_mod,
761                 .idx_after_start = 1,
762         };
763         pj_bool_t handled;
764         pjsip_rx_data *rdata = data;
765         int is_request = rdata->msg_info.msg->type == PJSIP_REQUEST_MSG;
766         int is_ack = is_request ? rdata->msg_info.msg->line.req.method.id == PJSIP_ACK_METHOD : 0;
767         struct ast_sip_endpoint *endpoint;
768
769         pjsip_endpt_process_rx_data(ast_sip_get_pjsip_endpoint(), rdata, &param, &handled);
770         if (!handled && is_request && !is_ack) {
771                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 501, NULL, NULL, NULL);
772         }
773
774         /* The endpoint_mod stores an endpoint reference in the mod_data of rdata. This
775          * is the only appropriate spot to actually decrement the reference.
776          */
777         endpoint = rdata->endpt_info.mod_data[endpoint_mod.id];
778         ao2_cleanup(endpoint);
779         pjsip_rx_data_free_cloned(rdata);
780         return 0;
781 }
782
783 struct ast_sip_endpoint *ast_pjsip_rdata_get_endpoint(pjsip_rx_data *rdata)
784 {
785         struct ast_sip_endpoint *endpoint = rdata->endpt_info.mod_data[endpoint_mod.id];
786         if (endpoint) {
787                 ao2_ref(endpoint, +1);
788         }
789         return endpoint;
790 }
791
792 static int suspects_sort(const void *obj, const void *arg, int flags)
793 {
794         const struct unidentified_request *object_left = obj;
795         const struct unidentified_request *object_right = arg;
796         const char *right_key = arg;
797         int cmp;
798
799         switch (flags & OBJ_SEARCH_MASK) {
800         case OBJ_SEARCH_OBJECT:
801                 right_key = object_right->src_name;
802                 /* Fall through */
803         case OBJ_SEARCH_KEY:
804                 cmp = strcmp(object_left->src_name, right_key);
805                 break;
806         case OBJ_SEARCH_PARTIAL_KEY:
807                 cmp = strncmp(object_left->src_name, right_key, strlen(right_key));
808                 break;
809         default:
810                 cmp = 0;
811                 break;
812         }
813         return cmp;
814 }
815
816 static int suspects_compare(void *obj, void *arg, int flags)
817 {
818         const struct unidentified_request *object_left = obj;
819         const struct unidentified_request *object_right = arg;
820         const char *right_key = arg;
821         int cmp = 0;
822
823         switch (flags & OBJ_SEARCH_MASK) {
824         case OBJ_SEARCH_OBJECT:
825                 right_key = object_right->src_name;
826                 /* Fall through */
827         case OBJ_SEARCH_KEY:
828                 if (strcmp(object_left->src_name, right_key) == 0) {
829                         cmp = CMP_MATCH | CMP_STOP;
830                 }
831                 break;
832         case OBJ_SEARCH_PARTIAL_KEY:
833                 if (strncmp(object_left->src_name, right_key, strlen(right_key)) == 0) {
834                         cmp = CMP_MATCH;
835                 }
836                 break;
837         default:
838                 cmp = 0;
839                 break;
840         }
841         return cmp;
842 }
843
844 static int suspects_hash(const void *obj, int flags) {
845         const struct unidentified_request *object_left = obj;
846
847         if (flags & OBJ_SEARCH_OBJECT) {
848                 return ast_str_hash(object_left->src_name);
849         } else if (flags & OBJ_SEARCH_KEY) {
850                 return ast_str_hash(obj);
851         }
852         return -1;
853 }
854
855 static struct ao2_container *cli_unid_get_container(const char *regex)
856 {
857         struct ao2_container *s_container;
858
859         s_container = ao2_container_alloc_list(AO2_ALLOC_OPT_LOCK_NOLOCK, 0,
860                 suspects_sort, suspects_compare);
861         if (!s_container) {
862                 return NULL;
863         }
864
865         if (ao2_container_dup(s_container, unidentified_requests, 0)) {
866                 ao2_ref(s_container, -1);
867                 return NULL;
868         }
869
870         return s_container;
871 }
872
873 static int cli_unid_iterate(void *container, ao2_callback_fn callback, void *args)
874 {
875         ao2_callback(container, 0, callback, args);
876
877         return 0;
878 }
879
880 static void *cli_unid_retrieve_by_id(const char *id)
881 {
882         return ao2_find(unidentified_requests, id, OBJ_SEARCH_KEY);
883 }
884
885 static const char *cli_unid_get_id(const void *obj)
886 {
887         const struct unidentified_request *unid = obj;
888
889         return unid->src_name;
890 }
891
892 static int cli_unid_print_header(void *obj, void *arg, int flags)
893 {
894         struct ast_sip_cli_context *context = arg;
895         RAII_VAR(struct ast_sip_cli_formatter_entry *, formatter_entry, NULL, ao2_cleanup);
896
897         int indent = CLI_INDENT_TO_SPACES(context->indent_level);
898         int filler = CLI_LAST_TABSTOP - indent - 7;
899
900         ast_assert(context->output_buffer != NULL);
901
902         ast_str_append(&context->output_buffer, 0,
903                 "%*s:  <IP Address%*.*s>  <Count> <Age(sec)>\n",
904                 indent, "Request", filler, filler, CLI_HEADER_FILLER);
905
906         return 0;
907 }
908
909 static int cli_unid_print_body(void *obj, void *arg, int flags)
910 {
911         struct unidentified_request *unid = obj;
912         struct ast_sip_cli_context *context = arg;
913         int indent;
914         int flexwidth;
915         int64_t ms = ast_tvdiff_ms(ast_tvnow(), unid->first_seen);
916
917         ast_assert(context->output_buffer != NULL);
918
919         indent = CLI_INDENT_TO_SPACES(context->indent_level);
920         flexwidth = CLI_LAST_TABSTOP - 4;
921
922         ast_str_append(&context->output_buffer, 0, "%*s:  %-*.*s  %7d %10.3f\n",
923                 indent,
924                 "Request",
925                 flexwidth, flexwidth,
926                 unid->src_name, unid->count,  ms / 1000.0);
927
928         return 0;
929 }
930
931 static struct ast_cli_entry cli_commands[] = {
932         AST_CLI_DEFINE(ast_sip_cli_traverse_objects, "Show PJSIP Unidentified Requests",
933                 .command = "pjsip show unidentified_requests",
934                 .usage = "Usage: pjsip show unidentified_requests\n"
935                                 "       Show the PJSIP Unidentified Requests\n"),
936 };
937
938 struct ast_sip_cli_formatter_entry *unid_formatter;
939
940 static int expire_requests(void *object, void *arg, int flags)
941 {
942         struct unidentified_request *unid = object;
943         int *maxage = arg;
944         int64_t ms = ast_tvdiff_ms(ast_tvnow(), unid->first_seen);
945
946         if (ms > (*maxage) * 2 * 1000) {
947                 return CMP_MATCH;
948         }
949
950         return 0;
951 }
952
953 static int prune_task(const void *data)
954 {
955         unsigned int maxage;
956
957         ast_sip_get_unidentified_request_thresholds(&unidentified_count, &unidentified_period, &unidentified_prune_interval);
958         maxage = unidentified_period * 2;
959         ao2_callback(unidentified_requests, OBJ_MULTIPLE | OBJ_NODATA | OBJ_UNLINK, expire_requests, &maxage);
960
961         return unidentified_prune_interval * 1000;
962 }
963
964 static int clean_task(const void *data)
965 {
966         return 0;
967 }
968
969 static void global_loaded(const char *object_type)
970 {
971         char *identifier_order = ast_sip_get_endpoint_identifier_order();
972         char *io_copy = identifier_order ? ast_strdupa(identifier_order) : NULL;
973         char *identify_method;
974
975         ast_free(identifier_order);
976         using_auth_username = 0;
977         while ((identify_method = ast_strip(strsep(&io_copy, ",")))) {
978                 if (!strcmp(identify_method, "auth_username")) {
979                         using_auth_username = 1;
980                         break;
981                 }
982         }
983
984         ast_sip_get_default_realm(default_realm, sizeof(default_realm));
985         ast_sip_get_unidentified_request_thresholds(&unidentified_count, &unidentified_period, &unidentified_prune_interval);
986
987         /* Clean out the old task, if any */
988         ast_sched_clean_by_callback(prune_context, prune_task, clean_task);
989         /* Have to do something with the return value to shut up the stupid compiler. */
990         if (ast_sched_add_variable(prune_context, unidentified_prune_interval * 1000, prune_task, NULL, 1) < 0) {
991                 return;
992         }
993 }
994
995 /*! \brief Observer which is used to update our interval and default_realm when the global setting changes */
996 static struct ast_sorcery_observer global_observer = {
997         .loaded = global_loaded,
998 };
999
1000 /*!
1001  * \internal
1002  * \brief Shutdown the serializers in the distributor pool.
1003  * \since 13.10.0
1004  *
1005  * \return Nothing
1006  */
1007 static void distributor_pool_shutdown(void)
1008 {
1009         int idx;
1010
1011         for (idx = 0; idx < ARRAY_LEN(distributor_pool); ++idx) {
1012                 ast_taskprocessor_unreference(distributor_pool[idx]);
1013                 distributor_pool[idx] = NULL;
1014         }
1015 }
1016
1017 /*!
1018  * \internal
1019  * \brief Setup the serializers in the distributor pool.
1020  * \since 13.10.0
1021  *
1022  * \retval 0 on success.
1023  * \retval -1 on error.
1024  */
1025 static int distributor_pool_setup(void)
1026 {
1027         char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
1028         int idx;
1029
1030         for (idx = 0; idx < ARRAY_LEN(distributor_pool); ++idx) {
1031                 /* Create name with seq number appended. */
1032                 ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/distributor");
1033
1034                 distributor_pool[idx] = ast_sip_create_serializer(tps_name);
1035                 if (!distributor_pool[idx]) {
1036                         return -1;
1037                 }
1038         }
1039         return 0;
1040 }
1041
1042 int ast_sip_initialize_distributor(void)
1043 {
1044         unidentified_requests = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_RWLOCK, 0,
1045                 DEFAULT_SUSPECTS_BUCKETS, suspects_hash, NULL, suspects_compare);
1046         if (!unidentified_requests) {
1047                 return -1;
1048         }
1049
1050         if (distributor_pool_setup()) {
1051                 ast_sip_destroy_distributor();
1052                 return -1;
1053         }
1054
1055         prune_context = ast_sched_context_create();
1056         if (!prune_context) {
1057                 ast_sip_destroy_distributor();
1058                 return -1;
1059         }
1060
1061         if (ast_sched_start_thread(prune_context)) {
1062                 ast_sip_destroy_distributor();
1063                 return -1;
1064         }
1065
1066         ast_sorcery_observer_add(ast_sip_get_sorcery(), "global", &global_observer);
1067         ast_sorcery_reload_object(ast_sip_get_sorcery(), "global");
1068
1069         if (create_artificial_endpoint() || create_artificial_auth()) {
1070                 ast_sip_destroy_distributor();
1071                 return -1;
1072         }
1073
1074         if (internal_sip_register_service(&distributor_mod)) {
1075                 ast_sip_destroy_distributor();
1076                 return -1;
1077         }
1078         if (internal_sip_register_service(&endpoint_mod)) {
1079                 ast_sip_destroy_distributor();
1080                 return -1;
1081         }
1082         if (internal_sip_register_service(&auth_mod)) {
1083                 ast_sip_destroy_distributor();
1084                 return -1;
1085         }
1086
1087         unid_formatter = ao2_alloc_options(sizeof(struct ast_sip_cli_formatter_entry), NULL,
1088                 AO2_ALLOC_OPT_LOCK_NOLOCK);
1089         if (!unid_formatter) {
1090                 ast_sip_destroy_distributor();
1091                 ast_log(LOG_ERROR, "Unable to allocate memory for unid_formatter\n");
1092                 return -1;
1093         }
1094         unid_formatter->name = "unidentified_request";
1095         unid_formatter->print_header = cli_unid_print_header;
1096         unid_formatter->print_body = cli_unid_print_body;
1097         unid_formatter->get_container = cli_unid_get_container;
1098         unid_formatter->iterate = cli_unid_iterate;
1099         unid_formatter->get_id = cli_unid_get_id;
1100         unid_formatter->retrieve_by_id = cli_unid_retrieve_by_id;
1101         ast_sip_register_cli_formatter(unid_formatter);
1102
1103         ast_cli_register_multiple(cli_commands, ARRAY_LEN(cli_commands));
1104
1105         return 0;
1106 }
1107
1108 void ast_sip_destroy_distributor(void)
1109 {
1110         ast_cli_unregister_multiple(cli_commands, ARRAY_LEN(cli_commands));
1111         ast_sip_unregister_cli_formatter(unid_formatter);
1112
1113         internal_sip_unregister_service(&auth_mod);
1114         internal_sip_unregister_service(&endpoint_mod);
1115         internal_sip_unregister_service(&distributor_mod);
1116
1117         ao2_cleanup(artificial_auth);
1118         ao2_cleanup(artificial_endpoint);
1119
1120         ast_sorcery_observer_remove(ast_sip_get_sorcery(), "global", &global_observer);
1121
1122         if (prune_context) {
1123                 ast_sched_context_destroy(prune_context);
1124         }
1125
1126         distributor_pool_shutdown();
1127
1128         ao2_cleanup(unidentified_requests);
1129 }