pjsip_distributor.c: Consistently pick a serializer for messages.
[asterisk/asterisk.git] / res / res_pjsip / pjsip_distributor.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * Mark Michelson <mmichelson@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 #include "asterisk.h"
20
21 #include <pjsip.h>
22
23 #include "asterisk/res_pjsip.h"
24 #include "asterisk/acl.h"
25 #include "include/res_pjsip_private.h"
26 #include "asterisk/taskprocessor.h"
27 #include "asterisk/threadpool.h"
28 #include "asterisk/res_pjsip_cli.h"
29
30 static int distribute(void *data);
31 static pj_bool_t distributor(pjsip_rx_data *rdata);
32 static pj_status_t record_serializer(pjsip_tx_data *tdata);
33
34 static pjsip_module distributor_mod = {
35         .name = {"Request Distributor", 19},
36         .priority = PJSIP_MOD_PRIORITY_TSX_LAYER - 6,
37         .on_tx_request = record_serializer,
38         .on_rx_request = distributor,
39         .on_rx_response = distributor,
40 };
41
42 struct ast_sched_context *prune_context;
43
44 /* From the auth/realm realtime column size */
45 #define MAX_REALM_LENGTH 40
46 static char default_realm[MAX_REALM_LENGTH + 1];
47
48 #define DEFAULT_SUSPECTS_BUCKETS 53
49
50 static struct ao2_container *unidentified_requests;
51 static unsigned int unidentified_count;
52 static unsigned int unidentified_period;
53 static unsigned int unidentified_prune_interval;
54 static int using_auth_username;
55
56 struct unidentified_request{
57         struct timeval first_seen;
58         int count;
59         char src_name[];
60 };
61
62 /*! Number of serializers in pool if one not otherwise known.  (Best if prime number) */
63 #define DISTRIBUTOR_POOL_SIZE           31
64
65 /*! Pool of serializers to use if not supplied. */
66 static struct ast_taskprocessor *distributor_pool[DISTRIBUTOR_POOL_SIZE];
67
68 /*!
69  * \internal
70  * \brief Record the task's serializer name on the tdata structure.
71  * \since 14.0.0
72  *
73  * \param tdata The outgoing message.
74  *
75  * \retval PJ_SUCCESS.
76  */
77 static pj_status_t record_serializer(pjsip_tx_data *tdata)
78 {
79         struct ast_taskprocessor *serializer;
80
81         serializer = ast_threadpool_serializer_get_current();
82         if (serializer) {
83                 const char *name;
84
85                 name = ast_taskprocessor_name(serializer);
86                 if (!ast_strlen_zero(name)
87                         && (!tdata->mod_data[distributor_mod.id]
88                                 || strcmp(tdata->mod_data[distributor_mod.id], name))) {
89                         char *tdata_name;
90
91                         /* The serializer in use changed. */
92                         tdata_name = pj_pool_alloc(tdata->pool, strlen(name) + 1);
93                         strcpy(tdata_name, name);/* Safe */
94
95                         tdata->mod_data[distributor_mod.id] = tdata_name;
96                 }
97         }
98
99         return PJ_SUCCESS;
100 }
101
102 /*!
103  * \internal
104  * \brief Find the request tdata to get the serializer it used.
105  * \since 14.0.0
106  *
107  * \param rdata The incoming message.
108  *
109  * \retval serializer on success.
110  * \retval NULL on error or could not find the serializer.
111  */
112 static struct ast_taskprocessor *find_request_serializer(pjsip_rx_data *rdata)
113 {
114         struct ast_taskprocessor *serializer = NULL;
115         pj_str_t tsx_key;
116         pjsip_transaction *tsx;
117
118         pjsip_tsx_create_key(rdata->tp_info.pool, &tsx_key, PJSIP_ROLE_UAC,
119                 &rdata->msg_info.cseq->method, rdata);
120
121         tsx = pjsip_tsx_layer_find_tsx(&tsx_key, PJ_TRUE);
122         if (!tsx) {
123                 ast_debug(1, "Could not find %.*s transaction for %d response.\n",
124                         (int) pj_strlen(&rdata->msg_info.cseq->method.name),
125                         pj_strbuf(&rdata->msg_info.cseq->method.name),
126                         rdata->msg_info.msg->line.status.code);
127                 return NULL;
128         }
129
130         if (tsx->last_tx) {
131                 const char *serializer_name;
132
133                 serializer_name = tsx->last_tx->mod_data[distributor_mod.id];
134                 if (!ast_strlen_zero(serializer_name)) {
135                         serializer = ast_taskprocessor_get(serializer_name, TPS_REF_IF_EXISTS);
136                         if (serializer) {
137                                 ast_debug(3, "Found serializer %s on transaction %s\n",
138                                                 serializer_name, tsx->obj_name);
139                         }
140                 }
141         }
142
143 #ifdef HAVE_PJ_TRANSACTION_GRP_LOCK
144         pj_grp_lock_release(tsx->grp_lock);
145 #else
146         pj_mutex_unlock(tsx->mutex);
147 #endif
148
149         return serializer;
150 }
151
152 /*! Dialog-specific information the distributor uses */
153 struct distributor_dialog_data {
154         /*! Serializer to distribute tasks to for this dialog */
155         struct ast_taskprocessor *serializer;
156         /*! Endpoint associated with this dialog */
157         struct ast_sip_endpoint *endpoint;
158 };
159
160 /*!
161  * \internal
162  *
163  * \note Call this with the dialog locked
164  */
165 static struct distributor_dialog_data *distributor_dialog_data_alloc(pjsip_dialog *dlg)
166 {
167         struct distributor_dialog_data *dist;
168
169         dist = PJ_POOL_ZALLOC_T(dlg->pool, struct distributor_dialog_data);
170         pjsip_dlg_set_mod_data(dlg, distributor_mod.id, dist);
171
172         return dist;
173 }
174
175 void ast_sip_dialog_set_serializer(pjsip_dialog *dlg, struct ast_taskprocessor *serializer)
176 {
177         struct distributor_dialog_data *dist;
178         SCOPED_LOCK(lock, dlg, pjsip_dlg_inc_lock, pjsip_dlg_dec_lock);
179
180         dist = pjsip_dlg_get_mod_data(dlg, distributor_mod.id);
181         if (!dist) {
182                 dist = distributor_dialog_data_alloc(dlg);
183         }
184         dist->serializer = serializer;
185 }
186
187 void ast_sip_dialog_set_endpoint(pjsip_dialog *dlg, struct ast_sip_endpoint *endpoint)
188 {
189         struct distributor_dialog_data *dist;
190         SCOPED_LOCK(lock, dlg, pjsip_dlg_inc_lock, pjsip_dlg_dec_lock);
191
192         dist = pjsip_dlg_get_mod_data(dlg, distributor_mod.id);
193         if (!dist) {
194                 dist = distributor_dialog_data_alloc(dlg);
195         }
196         dist->endpoint = endpoint;
197 }
198
199 struct ast_sip_endpoint *ast_sip_dialog_get_endpoint(pjsip_dialog *dlg)
200 {
201         struct distributor_dialog_data *dist;
202         SCOPED_LOCK(lock, dlg, pjsip_dlg_inc_lock, pjsip_dlg_dec_lock);
203
204         dist = pjsip_dlg_get_mod_data(dlg, distributor_mod.id);
205         if (!dist || !dist->endpoint) {
206                 return NULL;
207         }
208         ao2_ref(dist->endpoint, +1);
209         return dist->endpoint;
210 }
211
212 static pjsip_dialog *find_dialog(pjsip_rx_data *rdata)
213 {
214         pj_str_t tsx_key;
215         pjsip_transaction *tsx;
216         pjsip_dialog *dlg;
217         pj_str_t *local_tag;
218         pj_str_t *remote_tag;
219
220         if (!rdata->msg_info.msg) {
221                 return NULL;
222         }
223
224         if (rdata->msg_info.msg->type == PJSIP_REQUEST_MSG) {
225                 local_tag = &rdata->msg_info.to->tag;
226                 remote_tag = &rdata->msg_info.from->tag;
227         } else {
228                 local_tag = &rdata->msg_info.from->tag;
229                 remote_tag = &rdata->msg_info.to->tag;
230         }
231
232         /* We can only call the convenient method for
233          *  1) responses
234          *  2) non-CANCEL requests
235          *  3) CANCEL requests with a to-tag
236          */
237         if (rdata->msg_info.msg->type == PJSIP_RESPONSE_MSG ||
238                         pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_cancel_method) ||
239                         rdata->msg_info.to->tag.slen != 0) {
240                 dlg = pjsip_ua_find_dialog(&rdata->msg_info.cid->id, local_tag,
241                                 remote_tag, PJ_TRUE);
242                 if (dlg) {
243                         return dlg;
244                 }
245         }
246
247         /*
248          * There may still be a matching dialog if this is
249          * 1) an incoming CANCEL request without a to-tag
250          * 2) an incoming response to a dialog-creating request.
251          */
252         if (rdata->msg_info.msg->type == PJSIP_REQUEST_MSG) {
253                 /* CANCEL requests will need to match the INVITE we initially received. Any
254                  * other request type will either have been matched already or is not in
255                  * dialog
256                  */
257                 pjsip_tsx_create_key(rdata->tp_info.pool, &tsx_key, PJSIP_ROLE_UAS,
258                                 pjsip_get_invite_method(), rdata);
259         } else {
260                 pjsip_tsx_create_key(rdata->tp_info.pool, &tsx_key, PJSIP_ROLE_UAC,
261                                 &rdata->msg_info.cseq->method, rdata);
262         }
263
264         tsx = pjsip_tsx_layer_find_tsx(&tsx_key, PJ_TRUE);
265         if (!tsx) {
266                 ast_debug(3, "Could not find matching transaction for %s\n",
267                         pjsip_rx_data_get_info(rdata));
268                 return NULL;
269         }
270
271         dlg = pjsip_tsx_get_dlg(tsx);
272
273 #ifdef HAVE_PJ_TRANSACTION_GRP_LOCK
274         pj_grp_lock_release(tsx->grp_lock);
275 #else
276         pj_mutex_unlock(tsx->mutex);
277 #endif
278
279         if (!dlg) {
280                 return NULL;
281         }
282
283         pjsip_dlg_inc_lock(dlg);
284         return dlg;
285 }
286
287 /*!
288  * \internal
289  * \brief Compute a hash value on a pjlib string
290  * \since 13.10.0
291  *
292  * \param[in] str The pjlib string to add to the hash
293  * \param[in] hash The hash value to add to
294  *
295  * \details
296  * This version of the function is for when you need to compute a
297  * string hash of more than one string.
298  *
299  * This famous hash algorithm was written by Dan Bernstein and is
300  * commonly used.
301  *
302  * \sa http://www.cse.yorku.ca/~oz/hash.html
303  */
304 static int pjstr_hash_add(pj_str_t *str, int hash)
305 {
306         size_t len;
307         const char *pos;
308
309         len = pj_strlen(str);
310         pos = pj_strbuf(str);
311         while (len--) {
312                 hash = hash * 33 ^ *pos++;
313         }
314
315         return hash;
316 }
317
318 /*!
319  * \internal
320  * \brief Compute a hash value on a pjlib string
321  * \since 13.10.0
322  *
323  * \param[in] str The pjlib string to hash
324  *
325  * This famous hash algorithm was written by Dan Bernstein and is
326  * commonly used.
327  *
328  * http://www.cse.yorku.ca/~oz/hash.html
329  */
330 static int pjstr_hash(pj_str_t *str)
331 {
332         return pjstr_hash_add(str, 5381);
333 }
334
335 struct ast_taskprocessor *ast_sip_get_distributor_serializer(pjsip_rx_data *rdata)
336 {
337         int hash;
338         pj_str_t *remote_tag;
339         struct ast_taskprocessor *serializer;
340
341         if (!rdata->msg_info.msg) {
342                 return NULL;
343         }
344
345         if (rdata->msg_info.msg->type == PJSIP_REQUEST_MSG) {
346                 remote_tag = &rdata->msg_info.from->tag;
347         } else {
348                 remote_tag = &rdata->msg_info.to->tag;
349         }
350
351         /* Compute the hash from the SIP message call-id and remote-tag */
352         hash = pjstr_hash(&rdata->msg_info.cid->id);
353         hash = pjstr_hash_add(remote_tag, hash);
354         hash = abs(hash);
355
356         serializer = ao2_bump(distributor_pool[hash % ARRAY_LEN(distributor_pool)]);
357         if (serializer) {
358                 ast_debug(3, "Calculated serializer %s to use for %s\n",
359                         ast_taskprocessor_name(serializer), pjsip_rx_data_get_info(rdata));
360         }
361         return serializer;
362 }
363
364 static pj_bool_t endpoint_lookup(pjsip_rx_data *rdata);
365
366 static pjsip_module endpoint_mod = {
367         .name = {"Endpoint Identifier", 19},
368         .priority = PJSIP_MOD_PRIORITY_TSX_LAYER - 3,
369         .on_rx_request = endpoint_lookup,
370 };
371
372 #define SIP_MAX_QUEUE (AST_TASKPROCESSOR_HIGH_WATER_LEVEL * 3)
373
374 static pj_bool_t distributor(pjsip_rx_data *rdata)
375 {
376         pjsip_dialog *dlg;
377         struct distributor_dialog_data *dist = NULL;
378         struct ast_taskprocessor *serializer = NULL;
379         pjsip_rx_data *clone;
380
381         if (!ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) {
382                 /*
383                  * Ignore everything until we are fully booted.  Let the
384                  * peer retransmit messages until we are ready.
385                  */
386                 return PJ_TRUE;
387         }
388
389         dlg = find_dialog(rdata);
390         if (dlg) {
391                 ast_debug(3, "Searching for serializer on dialog %s for %s\n",
392                         dlg->obj_name, pjsip_rx_data_get_info(rdata));
393                 dist = pjsip_dlg_get_mod_data(dlg, distributor_mod.id);
394                 if (dist) {
395                         serializer = ao2_bump(dist->serializer);
396                         if (serializer) {
397                                 ast_debug(3, "Found serializer %s on dialog %s\n",
398                                         ast_taskprocessor_name(serializer), dlg->obj_name);
399                         }
400                 }
401                 pjsip_dlg_dec_lock(dlg);
402         }
403
404         if (serializer) {
405                 /* We have a serializer so we know where to send the message. */
406         } else if (rdata->msg_info.msg->type == PJSIP_RESPONSE_MSG) {
407                 ast_debug(3, "No dialog serializer for response %s. Using request transaction as basis\n",
408                         pjsip_rx_data_get_info(rdata));
409                 serializer = find_request_serializer(rdata);
410                 if (!serializer) {
411                         /*
412                          * Pick a serializer for the unmatched response.  Maybe
413                          * the stack can figure out what it is for, or we really
414                          * should just toss it regardless.
415                          */
416                         serializer = ast_sip_get_distributor_serializer(rdata);
417                 }
418         } else if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_cancel_method)
419                 || !pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_bye_method)) {
420                 /* We have a BYE or CANCEL request without a serializer. */
421                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata,
422                         PJSIP_SC_CALL_TSX_DOES_NOT_EXIST, NULL, NULL, NULL);
423                 return PJ_TRUE;
424         } else {
425                 /* Pick a serializer for the out-of-dialog request. */
426                 serializer = ast_sip_get_distributor_serializer(rdata);
427         }
428
429         pjsip_rx_data_clone(rdata, 0, &clone);
430
431         if (dist) {
432                 clone->endpt_info.mod_data[endpoint_mod.id] = ao2_bump(dist->endpoint);
433         }
434
435         if (ast_sip_threadpool_queue_size() > SIP_MAX_QUEUE) {
436                 /* When the threadpool is backed up this much, there is a good chance that we have encountered
437                  * some sort of terrible condition and don't need to be adding more work to the threadpool.
438                  * It's in our best interest to send back a 503 response and be done with it.
439                  */
440                 if (rdata->msg_info.msg->type == PJSIP_REQUEST_MSG) {
441                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 503, NULL, NULL, NULL);
442                 }
443                 ao2_cleanup(clone->endpt_info.mod_data[endpoint_mod.id]);
444                 pjsip_rx_data_free_cloned(clone);
445         } else {
446                 if (ast_sip_push_task(serializer, distribute, clone)) {
447                         ao2_cleanup(clone->endpt_info.mod_data[endpoint_mod.id]);
448                         pjsip_rx_data_free_cloned(clone);
449                 }
450         }
451
452         ast_taskprocessor_unreference(serializer);
453
454         return PJ_TRUE;
455 }
456
457 static struct ast_sip_auth *artificial_auth;
458
459 static int create_artificial_auth(void)
460 {
461         if (!(artificial_auth = ast_sorcery_alloc(
462                       ast_sip_get_sorcery(), SIP_SORCERY_AUTH_TYPE, "artificial"))) {
463                 ast_log(LOG_ERROR, "Unable to create artificial auth\n");
464                 return -1;
465         }
466
467         ast_string_field_set(artificial_auth, realm, default_realm);
468         ast_string_field_set(artificial_auth, auth_user, "");
469         ast_string_field_set(artificial_auth, auth_pass, "");
470         artificial_auth->type = AST_SIP_AUTH_TYPE_ARTIFICIAL;
471         return 0;
472 }
473
474 struct ast_sip_auth *ast_sip_get_artificial_auth(void)
475 {
476         ao2_ref(artificial_auth, +1);
477         return artificial_auth;
478 }
479
480 static struct ast_sip_endpoint *artificial_endpoint = NULL;
481
482 static int create_artificial_endpoint(void)
483 {
484         if (!(artificial_endpoint = ast_sorcery_alloc(
485                       ast_sip_get_sorcery(), "endpoint", NULL))) {
486                 return -1;
487         }
488
489         AST_VECTOR_INIT(&artificial_endpoint->inbound_auths, 1);
490         /* Pushing a bogus value into the vector will ensure that
491          * the proper size of the vector is returned. This value is
492          * not actually used anywhere
493          */
494         AST_VECTOR_APPEND(&artificial_endpoint->inbound_auths, ast_strdup("artificial-auth"));
495         return 0;
496 }
497
498 struct ast_sip_endpoint *ast_sip_get_artificial_endpoint(void)
499 {
500         ao2_ref(artificial_endpoint, +1);
501         return artificial_endpoint;
502 }
503
504 static void log_failed_request(pjsip_rx_data *rdata, char *msg, unsigned int count, unsigned int period)
505 {
506         char from_buf[PJSIP_MAX_URL_SIZE];
507         char callid_buf[PJSIP_MAX_URL_SIZE];
508         char method_buf[PJSIP_MAX_URL_SIZE];
509         pjsip_uri_print(PJSIP_URI_IN_FROMTO_HDR, rdata->msg_info.from->uri, from_buf, PJSIP_MAX_URL_SIZE);
510         ast_copy_pj_str(callid_buf, &rdata->msg_info.cid->id, PJSIP_MAX_URL_SIZE);
511         ast_copy_pj_str(method_buf, &rdata->msg_info.msg->line.req.method.name, PJSIP_MAX_URL_SIZE);
512         if (count) {
513                 ast_log(LOG_NOTICE, "Request '%s' from '%s' failed for '%s:%d' (callid: %s) - %s"
514                         " after %u tries in %.3f ms\n",
515                         method_buf, from_buf, rdata->pkt_info.src_name, rdata->pkt_info.src_port, callid_buf, msg, count, period / 1000.0);
516         } else {
517                 ast_log(LOG_NOTICE, "Request '%s' from '%s' failed for '%s:%d' (callid: %s) - %s\n",
518                         method_buf, from_buf, rdata->pkt_info.src_name, rdata->pkt_info.src_port, callid_buf, msg);
519         }
520 }
521
522 static void check_endpoint(pjsip_rx_data *rdata, struct unidentified_request *unid,
523         const char *name)
524 {
525         int64_t ms = ast_tvdiff_ms(ast_tvnow(), unid->first_seen);
526
527         ao2_wrlock(unid);
528         unid->count++;
529
530         if (ms < (unidentified_period * 1000) && unid->count >= unidentified_count) {
531                 log_failed_request(rdata, "No matching endpoint found", unid->count, ms);
532                 ast_sip_report_invalid_endpoint(name, rdata);
533         }
534         ao2_unlock(unid);
535 }
536
537 static pj_bool_t endpoint_lookup(pjsip_rx_data *rdata)
538 {
539         struct ast_sip_endpoint *endpoint;
540         struct unidentified_request *unid;
541         int is_ack = rdata->msg_info.msg->line.req.method.id == PJSIP_ACK_METHOD;
542
543         endpoint = rdata->endpt_info.mod_data[endpoint_mod.id];
544         if (endpoint) {
545                 /*
546                  * ao2_find with OBJ_UNLINK always write locks the container before even searching
547                  * for the object.  Since the majority case is that the object won't be found, do
548                  * the find without OBJ_UNLINK to prevent the unnecessary write lock, then unlink
549                  * if needed.
550                  */
551                 if ((unid = ao2_find(unidentified_requests, rdata->pkt_info.src_name, OBJ_SEARCH_KEY))) {
552                         ao2_unlink(unidentified_requests, unid);
553                         ao2_ref(unid, -1);
554                 }
555                 return PJ_FALSE;
556         }
557
558         endpoint = ast_sip_identify_endpoint(rdata);
559         if (endpoint) {
560                 if ((unid = ao2_find(unidentified_requests, rdata->pkt_info.src_name, OBJ_SEARCH_KEY))) {
561                         ao2_unlink(unidentified_requests, unid);
562                         ao2_ref(unid, -1);
563                 }
564         }
565
566         if (!endpoint && !is_ack) {
567                 char name[AST_UUID_STR_LEN] = "";
568                 pjsip_uri *from = rdata->msg_info.from->uri;
569
570                 /* always use an artificial endpoint - per discussion no reason
571                    to have "alwaysauthreject" as an option.  It is felt using it
572                    was a bug fix and it is not needed since we are not worried about
573                    breaking old stuff and we really don't want to enable the discovery
574                    of SIP accounts */
575                 endpoint = ast_sip_get_artificial_endpoint();
576
577                 if (PJSIP_URI_SCHEME_IS_SIP(from) || PJSIP_URI_SCHEME_IS_SIPS(from)) {
578                         pjsip_sip_uri *sip_from = pjsip_uri_get_uri(from);
579                         ast_copy_pj_str(name, &sip_from->user, sizeof(name));
580                 }
581
582                 if ((unid = ao2_find(unidentified_requests, rdata->pkt_info.src_name, OBJ_SEARCH_KEY))) {
583                         check_endpoint(rdata, unid, name);
584                         ao2_ref(unid, -1);
585                 } else if (using_auth_username) {
586                         ao2_wrlock(unidentified_requests);
587                         /* The check again with the write lock held allows us to eliminate the DUPS_REPLACE and sort_fn */
588                         if ((unid = ao2_find(unidentified_requests, rdata->pkt_info.src_name, OBJ_SEARCH_KEY | OBJ_NOLOCK))) {
589                                 check_endpoint(rdata, unid, name);
590                         } else {
591                                 unid = ao2_alloc_options(sizeof(*unid) + strlen(rdata->pkt_info.src_name) + 1, NULL,
592                                         AO2_ALLOC_OPT_LOCK_RWLOCK);
593                                 if (!unid) {
594                                         ao2_unlock(unidentified_requests);
595                                         return PJ_TRUE;
596                                 }
597                                 strcpy(unid->src_name, rdata->pkt_info.src_name); /* Safe */
598                                 unid->first_seen = ast_tvnow();
599                                 unid->count = 1;
600                                 ao2_link_flags(unidentified_requests, unid, OBJ_NOLOCK);
601                         }
602                         ao2_ref(unid, -1);
603                         ao2_unlock(unidentified_requests);
604                 } else {
605                         log_failed_request(rdata, "No matching endpoint found", 0, 0);
606                         ast_sip_report_invalid_endpoint(name, rdata);
607                 }
608         }
609         rdata->endpt_info.mod_data[endpoint_mod.id] = endpoint;
610         return PJ_FALSE;
611 }
612
613 static int apply_endpoint_acl(pjsip_rx_data *rdata, struct ast_sip_endpoint *endpoint)
614 {
615         struct ast_sockaddr addr;
616
617         if (ast_acl_list_is_empty(endpoint->acl)) {
618                 return 0;
619         }
620
621         memset(&addr, 0, sizeof(addr));
622         ast_sockaddr_parse(&addr, rdata->pkt_info.src_name, PARSE_PORT_FORBID);
623         ast_sockaddr_set_port(&addr, rdata->pkt_info.src_port);
624
625         if (ast_apply_acl(endpoint->acl, &addr, "SIP ACL: ") != AST_SENSE_ALLOW) {
626                 log_failed_request(rdata, "Not match Endpoint ACL", 0, 0);
627                 ast_sip_report_failed_acl(endpoint, rdata, "not_match_endpoint_acl");
628                 return 1;
629         }
630         return 0;
631 }
632
633 static int extract_contact_addr(pjsip_contact_hdr *contact, struct ast_sockaddr **addrs)
634 {
635         pjsip_sip_uri *sip_uri;
636         char host[256];
637
638         if (!contact || contact->star) {
639                 *addrs = NULL;
640                 return 0;
641         }
642         if (!PJSIP_URI_SCHEME_IS_SIP(contact->uri) && !PJSIP_URI_SCHEME_IS_SIPS(contact->uri)) {
643                 *addrs = NULL;
644                 return 0;
645         }
646         sip_uri = pjsip_uri_get_uri(contact->uri);
647         ast_copy_pj_str(host, &sip_uri->host, sizeof(host));
648         return ast_sockaddr_resolve(addrs, host, PARSE_PORT_FORBID, AST_AF_UNSPEC);
649 }
650
651 static int apply_endpoint_contact_acl(pjsip_rx_data *rdata, struct ast_sip_endpoint *endpoint)
652 {
653         int num_contact_addrs;
654         int forbidden = 0;
655         struct ast_sockaddr *contact_addrs;
656         int i;
657         pjsip_contact_hdr *contact = (pjsip_contact_hdr *)&rdata->msg_info.msg->hdr;
658
659         if (ast_acl_list_is_empty(endpoint->contact_acl)) {
660                 return 0;
661         }
662
663         while ((contact = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_CONTACT, contact->next))) {
664                 num_contact_addrs = extract_contact_addr(contact, &contact_addrs);
665                 if (num_contact_addrs <= 0) {
666                         continue;
667                 }
668                 for (i = 0; i < num_contact_addrs; ++i) {
669                         if (ast_apply_acl(endpoint->contact_acl, &contact_addrs[i], "SIP Contact ACL: ") != AST_SENSE_ALLOW) {
670                                 log_failed_request(rdata, "Not match Endpoint Contact ACL", 0, 0);
671                                 ast_sip_report_failed_acl(endpoint, rdata, "not_match_endpoint_contact_acl");
672                                 forbidden = 1;
673                                 break;
674                         }
675                 }
676                 ast_free(contact_addrs);
677                 if (forbidden) {
678                         /* No use checking other contacts if we already have failed ACL check */
679                         break;
680                 }
681         }
682
683         return forbidden;
684 }
685
686 static pj_bool_t authenticate(pjsip_rx_data *rdata)
687 {
688         RAII_VAR(struct ast_sip_endpoint *, endpoint, ast_pjsip_rdata_get_endpoint(rdata), ao2_cleanup);
689         int is_ack = rdata->msg_info.msg->line.req.method.id == PJSIP_ACK_METHOD;
690
691         ast_assert(endpoint != NULL);
692
693         if (endpoint!=artificial_endpoint) {
694                 if (apply_endpoint_acl(rdata, endpoint) || apply_endpoint_contact_acl(rdata, endpoint)) {
695                         if (!is_ack) {
696                                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 403, NULL, NULL, NULL);
697                         }
698                         return PJ_TRUE;
699                 }
700         }
701
702         if (!is_ack && ast_sip_requires_authentication(endpoint, rdata)) {
703                 pjsip_tx_data *tdata;
704                 struct unidentified_request *unid;
705
706                 pjsip_endpt_create_response(ast_sip_get_pjsip_endpoint(), rdata, 401, NULL, &tdata);
707                 switch (ast_sip_check_authentication(endpoint, rdata, tdata)) {
708                 case AST_SIP_AUTHENTICATION_CHALLENGE:
709                         /* Send the 401 we created for them */
710                         ast_sip_report_auth_challenge_sent(endpoint, rdata, tdata);
711                         pjsip_endpt_send_response2(ast_sip_get_pjsip_endpoint(), rdata, tdata, NULL, NULL);
712                         return PJ_TRUE;
713                 case AST_SIP_AUTHENTICATION_SUCCESS:
714                         /* See note in endpoint_lookup about not holding an unnecessary write lock */
715                         if ((unid = ao2_find(unidentified_requests, rdata->pkt_info.src_name, OBJ_SEARCH_KEY))) {
716                                 ao2_unlink(unidentified_requests, unid);
717                                 ao2_ref(unid, -1);
718                         }
719                         ast_sip_report_auth_success(endpoint, rdata);
720                         pjsip_tx_data_dec_ref(tdata);
721                         return PJ_FALSE;
722                 case AST_SIP_AUTHENTICATION_FAILED:
723                         log_failed_request(rdata, "Failed to authenticate", 0, 0);
724                         ast_sip_report_auth_failed_challenge_response(endpoint, rdata);
725                         pjsip_endpt_send_response2(ast_sip_get_pjsip_endpoint(), rdata, tdata, NULL, NULL);
726                         return PJ_TRUE;
727                 case AST_SIP_AUTHENTICATION_ERROR:
728                         log_failed_request(rdata, "Error to authenticate", 0, 0);
729                         ast_sip_report_auth_failed_challenge_response(endpoint, rdata);
730                         pjsip_tx_data_dec_ref(tdata);
731                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
732                         return PJ_TRUE;
733                 }
734         }
735
736         return PJ_FALSE;
737 }
738
739 static pjsip_module auth_mod = {
740         .name = {"Request Authenticator", 21},
741         .priority = PJSIP_MOD_PRIORITY_APPLICATION - 2,
742         .on_rx_request = authenticate,
743 };
744
745 static int distribute(void *data)
746 {
747         static pjsip_process_rdata_param param = {
748                 .start_mod = &distributor_mod,
749                 .idx_after_start = 1,
750         };
751         pj_bool_t handled;
752         pjsip_rx_data *rdata = data;
753         int is_request = rdata->msg_info.msg->type == PJSIP_REQUEST_MSG;
754         int is_ack = is_request ? rdata->msg_info.msg->line.req.method.id == PJSIP_ACK_METHOD : 0;
755         struct ast_sip_endpoint *endpoint;
756
757         pjsip_endpt_process_rx_data(ast_sip_get_pjsip_endpoint(), rdata, &param, &handled);
758         if (!handled && is_request && !is_ack) {
759                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 501, NULL, NULL, NULL);
760         }
761
762         /* The endpoint_mod stores an endpoint reference in the mod_data of rdata. This
763          * is the only appropriate spot to actually decrement the reference.
764          */
765         endpoint = rdata->endpt_info.mod_data[endpoint_mod.id];
766         ao2_cleanup(endpoint);
767         pjsip_rx_data_free_cloned(rdata);
768         return 0;
769 }
770
771 struct ast_sip_endpoint *ast_pjsip_rdata_get_endpoint(pjsip_rx_data *rdata)
772 {
773         struct ast_sip_endpoint *endpoint = rdata->endpt_info.mod_data[endpoint_mod.id];
774         if (endpoint) {
775                 ao2_ref(endpoint, +1);
776         }
777         return endpoint;
778 }
779
780 static int suspects_sort(const void *obj, const void *arg, int flags)
781 {
782         const struct unidentified_request *object_left = obj;
783         const struct unidentified_request *object_right = arg;
784         const char *right_key = arg;
785         int cmp;
786
787         switch (flags & OBJ_SEARCH_MASK) {
788         case OBJ_SEARCH_OBJECT:
789                 right_key = object_right->src_name;
790                 /* Fall through */
791         case OBJ_SEARCH_KEY:
792                 cmp = strcmp(object_left->src_name, right_key);
793                 break;
794         case OBJ_SEARCH_PARTIAL_KEY:
795                 cmp = strncmp(object_left->src_name, right_key, strlen(right_key));
796                 break;
797         default:
798                 cmp = 0;
799                 break;
800         }
801         return cmp;
802 }
803
804 static int suspects_compare(void *obj, void *arg, int flags)
805 {
806         const struct unidentified_request *object_left = obj;
807         const struct unidentified_request *object_right = arg;
808         const char *right_key = arg;
809         int cmp = 0;
810
811         switch (flags & OBJ_SEARCH_MASK) {
812         case OBJ_SEARCH_OBJECT:
813                 right_key = object_right->src_name;
814                 /* Fall through */
815         case OBJ_SEARCH_KEY:
816                 if (strcmp(object_left->src_name, right_key) == 0) {
817                         cmp = CMP_MATCH | CMP_STOP;
818                 }
819                 break;
820         case OBJ_SEARCH_PARTIAL_KEY:
821                 if (strncmp(object_left->src_name, right_key, strlen(right_key)) == 0) {
822                         cmp = CMP_MATCH;
823                 }
824                 break;
825         default:
826                 cmp = 0;
827                 break;
828         }
829         return cmp;
830 }
831
832 static int suspects_hash(const void *obj, int flags) {
833         const struct unidentified_request *object_left = obj;
834
835         if (flags & OBJ_SEARCH_OBJECT) {
836                 return ast_str_hash(object_left->src_name);
837         } else if (flags & OBJ_SEARCH_KEY) {
838                 return ast_str_hash(obj);
839         }
840         return -1;
841 }
842
843 static struct ao2_container *cli_unid_get_container(const char *regex)
844 {
845         struct ao2_container *s_container;
846
847         s_container = ao2_container_alloc_list(AO2_ALLOC_OPT_LOCK_NOLOCK, 0,
848                 suspects_sort, suspects_compare);
849         if (!s_container) {
850                 return NULL;
851         }
852
853         if (ao2_container_dup(s_container, unidentified_requests, 0)) {
854                 ao2_ref(s_container, -1);
855                 return NULL;
856         }
857
858         return s_container;
859 }
860
861 static int cli_unid_iterate(void *container, ao2_callback_fn callback, void *args)
862 {
863         ao2_callback(container, 0, callback, args);
864
865         return 0;
866 }
867
868 static void *cli_unid_retrieve_by_id(const char *id)
869 {
870         return ao2_find(unidentified_requests, id, OBJ_SEARCH_KEY);
871 }
872
873 static const char *cli_unid_get_id(const void *obj)
874 {
875         const struct unidentified_request *unid = obj;
876
877         return unid->src_name;
878 }
879
880 static int cli_unid_print_header(void *obj, void *arg, int flags)
881 {
882         struct ast_sip_cli_context *context = arg;
883         RAII_VAR(struct ast_sip_cli_formatter_entry *, formatter_entry, NULL, ao2_cleanup);
884
885         int indent = CLI_INDENT_TO_SPACES(context->indent_level);
886         int filler = CLI_LAST_TABSTOP - indent - 7;
887
888         ast_assert(context->output_buffer != NULL);
889
890         ast_str_append(&context->output_buffer, 0,
891                 "%*s:  <IP Address%*.*s>  <Count> <Age(sec)>\n",
892                 indent, "Request", filler, filler, CLI_HEADER_FILLER);
893
894         return 0;
895 }
896
897 static int cli_unid_print_body(void *obj, void *arg, int flags)
898 {
899         struct unidentified_request *unid = obj;
900         struct ast_sip_cli_context *context = arg;
901         int indent;
902         int flexwidth;
903         int64_t ms = ast_tvdiff_ms(ast_tvnow(), unid->first_seen);
904
905         ast_assert(context->output_buffer != NULL);
906
907         indent = CLI_INDENT_TO_SPACES(context->indent_level);
908         flexwidth = CLI_LAST_TABSTOP - 4;
909
910         ast_str_append(&context->output_buffer, 0, "%*s:  %-*.*s  %7d %10.3f\n",
911                 indent,
912                 "Request",
913                 flexwidth, flexwidth,
914                 unid->src_name, unid->count,  ms / 1000.0);
915
916         return 0;
917 }
918
919 static struct ast_cli_entry cli_commands[] = {
920         AST_CLI_DEFINE(ast_sip_cli_traverse_objects, "Show PJSIP Unidentified Requests",
921                 .command = "pjsip show unidentified_requests",
922                 .usage = "Usage: pjsip show unidentified_requests\n"
923                                 "       Show the PJSIP Unidentified Requests\n"),
924 };
925
926 struct ast_sip_cli_formatter_entry *unid_formatter;
927
928 static int expire_requests(void *object, void *arg, int flags)
929 {
930         struct unidentified_request *unid = object;
931         int *maxage = arg;
932         int64_t ms = ast_tvdiff_ms(ast_tvnow(), unid->first_seen);
933
934         if (ms > (*maxage) * 2 * 1000) {
935                 return CMP_MATCH;
936         }
937
938         return 0;
939 }
940
941 static int prune_task(const void *data)
942 {
943         unsigned int maxage;
944
945         ast_sip_get_unidentified_request_thresholds(&unidentified_count, &unidentified_period, &unidentified_prune_interval);
946         maxage = unidentified_period * 2;
947         ao2_callback(unidentified_requests, OBJ_MULTIPLE | OBJ_NODATA | OBJ_UNLINK, expire_requests, &maxage);
948
949         return unidentified_prune_interval * 1000;
950 }
951
952 static int clean_task(const void *data)
953 {
954         return 0;
955 }
956
957 static void global_loaded(const char *object_type)
958 {
959         char *identifier_order = ast_sip_get_endpoint_identifier_order();
960         char *io_copy = ast_strdupa(identifier_order);
961         char *identify_method;
962
963         ast_free(identifier_order);
964         using_auth_username = 0;
965         while ((identify_method = ast_strip(strsep(&io_copy, ",")))) {
966                 if (!strcmp(identify_method, "auth_username")) {
967                         using_auth_username = 1;
968                         break;
969                 }
970         }
971
972         ast_sip_get_default_realm(default_realm, sizeof(default_realm));
973         ast_sip_get_unidentified_request_thresholds(&unidentified_count, &unidentified_period, &unidentified_prune_interval);
974
975         /* Clean out the old task, if any */
976         ast_sched_clean_by_callback(prune_context, prune_task, clean_task);
977         if (ast_sched_add_variable(prune_context, unidentified_prune_interval * 1000, prune_task, NULL, 1) < 0) {
978                 return;
979         }
980 }
981
982 /*! \brief Observer which is used to update our interval and default_realm when the global setting changes */
983 static struct ast_sorcery_observer global_observer = {
984         .loaded = global_loaded,
985 };
986
987 /*!
988  * \internal
989  * \brief Shutdown the serializers in the distributor pool.
990  * \since 13.10.0
991  *
992  * \return Nothing
993  */
994 static void distributor_pool_shutdown(void)
995 {
996         int idx;
997
998         for (idx = 0; idx < ARRAY_LEN(distributor_pool); ++idx) {
999                 ast_taskprocessor_unreference(distributor_pool[idx]);
1000                 distributor_pool[idx] = NULL;
1001         }
1002 }
1003
1004 /*!
1005  * \internal
1006  * \brief Setup the serializers in the distributor pool.
1007  * \since 13.10.0
1008  *
1009  * \retval 0 on success.
1010  * \retval -1 on error.
1011  */
1012 static int distributor_pool_setup(void)
1013 {
1014         char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
1015         int idx;
1016
1017         for (idx = 0; idx < ARRAY_LEN(distributor_pool); ++idx) {
1018                 /* Create name with seq number appended. */
1019                 ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/distributor");
1020
1021                 distributor_pool[idx] = ast_sip_create_serializer(tps_name);
1022                 if (!distributor_pool[idx]) {
1023                         return -1;
1024                 }
1025         }
1026         return 0;
1027 }
1028
1029 int ast_sip_initialize_distributor(void)
1030 {
1031         unidentified_requests = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_RWLOCK, 0,
1032                 DEFAULT_SUSPECTS_BUCKETS, suspects_hash, NULL, suspects_compare);
1033         if (!unidentified_requests) {
1034                 return -1;
1035         }
1036
1037         if (distributor_pool_setup()) {
1038                 ast_sip_destroy_distributor();
1039                 return -1;
1040         }
1041
1042         prune_context = ast_sched_context_create();
1043         if (!prune_context) {
1044                 ast_sip_destroy_distributor();
1045                 return -1;
1046         }
1047
1048         if (ast_sched_start_thread(prune_context)) {
1049                 ast_sip_destroy_distributor();
1050                 return -1;
1051         }
1052
1053         ast_sorcery_observer_add(ast_sip_get_sorcery(), "global", &global_observer);
1054         ast_sorcery_reload_object(ast_sip_get_sorcery(), "global");
1055
1056         if (create_artificial_endpoint() || create_artificial_auth()) {
1057                 ast_sip_destroy_distributor();
1058                 return -1;
1059         }
1060
1061         if (internal_sip_register_service(&distributor_mod)) {
1062                 ast_sip_destroy_distributor();
1063                 return -1;
1064         }
1065         if (internal_sip_register_service(&endpoint_mod)) {
1066                 ast_sip_destroy_distributor();
1067                 return -1;
1068         }
1069         if (internal_sip_register_service(&auth_mod)) {
1070                 ast_sip_destroy_distributor();
1071                 return -1;
1072         }
1073
1074         unid_formatter = ao2_alloc_options(sizeof(struct ast_sip_cli_formatter_entry), NULL,
1075                 AO2_ALLOC_OPT_LOCK_NOLOCK);
1076         if (!unid_formatter) {
1077                 ast_sip_destroy_distributor();
1078                 ast_log(LOG_ERROR, "Unable to allocate memory for unid_formatter\n");
1079                 return -1;
1080         }
1081         unid_formatter->name = "unidentified_request";
1082         unid_formatter->print_header = cli_unid_print_header;
1083         unid_formatter->print_body = cli_unid_print_body;
1084         unid_formatter->get_container = cli_unid_get_container;
1085         unid_formatter->iterate = cli_unid_iterate;
1086         unid_formatter->get_id = cli_unid_get_id;
1087         unid_formatter->retrieve_by_id = cli_unid_retrieve_by_id;
1088         ast_sip_register_cli_formatter(unid_formatter);
1089
1090         ast_cli_register_multiple(cli_commands, ARRAY_LEN(cli_commands));
1091
1092         return 0;
1093 }
1094
1095 void ast_sip_destroy_distributor(void)
1096 {
1097         ast_cli_unregister_multiple(cli_commands, ARRAY_LEN(cli_commands));
1098         ast_sip_unregister_cli_formatter(unid_formatter);
1099
1100         internal_sip_unregister_service(&auth_mod);
1101         internal_sip_unregister_service(&endpoint_mod);
1102         internal_sip_unregister_service(&distributor_mod);
1103
1104         ao2_cleanup(artificial_auth);
1105         ao2_cleanup(artificial_endpoint);
1106
1107         ast_sorcery_observer_remove(ast_sip_get_sorcery(), "global", &global_observer);
1108
1109         if (prune_context) {
1110                 ast_sched_context_destroy(prune_context);
1111         }
1112
1113         distributor_pool_shutdown();
1114
1115         ao2_cleanup(unidentified_requests);
1116 }